Reading queues asynchronously in Python’s multiprocessing module
Python 2.6 will add a multiprocessing module that supports fork/join semantics and management of process pools. However even in this multiprocessor environment it is still necesary to do some asynchronous. The main way that processes communicate in this environment are through shared queues (implemented as socket-pairs or unix pipes under the hood), but we need a method for the UI to be notified about updates to any queues it is listening on without blocking (for example in a UI). Luckily it is possible to get at underlying file-descriptor, which can then be managed with select() or some abstraction thereof. Unfortunately there's no official support for this but we can still poke around inside the implementation class to get at the details. The code below shows how to do this with GTK. This just spawns off a sub-process that counts up and sends the current number to the UI via a queue, which then displays the number:
-
from __future__ import print_function
-
import gtk, gobject, time, multiprocessing, Queue
-
-
-
class UI:
-
-
def destroy(self, widget, data=None):
-
self.outq.put("quit")
-
gtk.main_quit()
-
-
def __init__(self):
-
self.window = gtk.Window(gtk.WINDOW_TOPLEVEL)
-
self.window.connect("destroy", self.destroy)
-
self.window.set_border_width(10)
-
-
self.label = gtk.Label("Not Set!")
-
self.window.add(self.label)
-
-
self.label.show()
-
self.window.show()
-
-
def update(self, fd, cond):
-
val = self.inq.get()
-
self.label.set_text("Value is %s" % val)
-
return True
-
-
def main(self, outq, inq):
-
self.outq = outq
-
self.inq = inq
-
-
# This is the tricky bit, we get the socket FD of the
-
# underlying pipe from the queue object. We hope that the
-
# internal reader variable doesn't change.
-
fd = inq._reader.fileno()
-
-
# This tells GTK to watch this FD and notify us when data is
-
# available. We use newer API, gdk_input_read is deprecated
-
gobject.io_add_watch(fd, gobject.IO_IN, self.update)
-
gtk.main()
-
-
-
def counter(inq, outq, interval):
-
count = 1
-
again = True
-
while again:
-
print("Putting %s on queue" % count)
-
outq.put(count)
-
count += 1
-
-
try:
-
print("Reading from queue...")
-
ret = inq.get(True, interval)
-
if ret == "quit":
-
print("Got quit message")
-
again = False
-
except Queue.Empty:
-
# Timed-out, carry on
-
pass
-
-
-
if __name__ == "__main__":
-
q1 = multiprocessing.Queue()
-
q2 = multiprocessing.Queue()
-
-
proc = multiprocessing.Process(target=counter, args=(q1, q2, 1,))
-
proc.start()
-
-
ui = UI()
-
ui.main(q1, q2)
-
-
proc.join()
Obviously if the underlying implementation variables change this will break; hopefully a more standard way of getting at this information will be added to the API.
This entry was posted on Sunday, September 7th, 2008 at 6:16 pm and is filed under Python. You can follow any responses to this entry through the RSS 2.0 feed. You can leave a response, or trackback from your own site.
on September 9, 2008 at 11:03 pm Jesse Noller wrote:
If you want an enhancement to the API, you need to file a bug report at http://bugs.python.org – it won’t make it into 2.6 or 3.0, but if the enhancement has merit, it might make it into 2.7/3.1
on September 11, 2008 at 11:16 am tarka wrote:
Good point, I’ve raised 3831.