Reading queues asynchronously in Python’s multiprocessing module

Python 2.6 will add a multiprocessing module that supports fork/join semantics and management of process pools. However even in this multiprocess environment it is still necessary to do some asynchronous processing of communication updates. The main way that processes communicate in this environment are through shared queues (implemented as socket-pairs or unix pipes under the hood), but we need a method for the UI to be notified about updates to any queues it is listening on without blocking (for example in the main event loop). Luckily it is possible to get at underlying file-descriptor, which can then be managed with select() or some abstraction thereof. Unfortunately there’s no official support for this but we can still poke around inside the implementation class to get at the details. The code below shows how to do this with GTK. This just spawns off a sub-process that counts up and sends the current number to the UI via a queue, which then displays the number:

from __future__ import print_function
import gtk, gobject, time, multiprocessing, Queue


class UI:

    def destroy(self, widget, data=None):
        self.outq.put("quit")
        gtk.main_quit()

    def __init__(self):
        self.window = gtk.Window(gtk.WINDOW_TOPLEVEL)
        self.window.connect("destroy", self.destroy)
        self.window.set_border_width(10)
    
        self.label = gtk.Label("Not Set!")
        self.window.add(self.label)
    
        self.label.show()
        self.window.show()

    def update(self, fd, cond):
        val = self.inq.get()
        self.label.set_text("Value is %s" % val)
        return True

    def main(self, outq, inq):
        self.outq = outq
        self.inq = inq

        # This is the tricky bit, we get the socket FD of the
        # underlying pipe from the queue object.  We hope that the
        # internal reader variable doesn't change.
        fd = inq._reader.fileno()

        # This tells GTK to watch this FD and notify us when data is
        # available.  We use newer API, gdk_input_read is deprecated
        gobject.io_add_watch(fd, gobject.IO_IN, self.update)
        gtk.main()


def counter(inq, outq, interval):
    count = 1
    again = True
    while again:
        print("Putting %s on queue" % count)
        outq.put(count)
        count += 1

        try:
            print("Reading from queue...")
            ret = inq.get(True, interval)
            if ret == "quit":
                print("Got quit message")
                again = False
        except Queue.Empty:
            # Timed-out, carry on
            pass


if __name__ == "__main__":
    q1 = multiprocessing.Queue()
    q2 = multiprocessing.Queue()

    proc = multiprocessing.Process(target=counter, args=(q1, q2, 1,))
    proc.start()

    ui = UI()
    ui.main(q1, q2)

    proc.join()

Obviously if the underlying implementation variables change this will break; hopefully a more standard way of getting at this information will be added to the API.

  1. If you want an enhancement to the API, you need to file a bug report at http://bugs.python.org – it won’t make it into 2.6 or 3.0, but if the enhancement has merit, it might make it into 2.7/3.1

  2. Good point, I’ve raised 3831.

Leave a Comment


NOTE - You can use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

This blog is protected by Dave\\\'s Spam Karma 2: 255938 Spams eaten and counting...