Reading queues asynchronously in Python’s multiprocessing module

Python 2.6 will add a multiprocessing module that supports fork/join semantics and management of process pools. However even in this multiprocessor environment it is still necesary to do some asynchronous. The main way that processes communicate in this environment are through shared queues (implemented as socket-pairs or unix pipes under the hood), but we need a method for the UI to be notified about updates to any queues it is listening on without blocking (for example in a UI). Luckily it is possible to get at underlying file-descriptor, which can then be managed with select() or some abstraction thereof. Unfortunately there's no official support for this but we can still poke around inside the implementation class to get at the details. The code below shows how to do this with GTK. This just spawns off a sub-process that counts up and sends the current number to the UI via a queue, which then displays the number:

PYTHON:
  1. from __future__ import print_function
  2. import gtk, gobject, time, multiprocessing, Queue
  3.  
  4.  
  5. class UI:
  6.  
  7.     def destroy(self, widget, data=None):
  8.         self.outq.put("quit")
  9.         gtk.main_quit()
  10.  
  11.     def __init__(self):
  12.         self.window = gtk.Window(gtk.WINDOW_TOPLEVEL)
  13.         self.window.connect("destroy", self.destroy)
  14.         self.window.set_border_width(10)
  15.    
  16.         self.label = gtk.Label("Not Set!")
  17.         self.window.add(self.label)
  18.    
  19.         self.label.show()
  20.         self.window.show()
  21.  
  22.     def update(self, fd, cond):
  23.         val = self.inq.get()
  24.         self.label.set_text("Value is %s" % val)
  25.         return True
  26.  
  27.     def main(self, outq, inq):
  28.         self.outq = outq
  29.         self.inq = inq
  30.  
  31.         # This is the tricky bit, we get the socket FD of the
  32.         # underlying pipe from the queue object.  We hope that the
  33.         # internal reader variable doesn't change.
  34.         fd = inq._reader.fileno()
  35.  
  36.         # This tells GTK to watch this FD and notify us when data is
  37.         # available.  We use newer API, gdk_input_read is deprecated
  38.         gobject.io_add_watch(fd, gobject.IO_IN, self.update)
  39.         gtk.main()
  40.  
  41.  
  42. def counter(inq, outq, interval):
  43.     count = 1
  44.     again = True
  45.     while again:
  46.         print("Putting %s on queue" % count)
  47.         outq.put(count)
  48.         count += 1
  49.  
  50.         try:
  51.             print("Reading from queue...")
  52.             ret = inq.get(True, interval)
  53.             if ret == "quit":
  54.                 print("Got quit message")
  55.                 again = False
  56.         except Queue.Empty:
  57.             # Timed-out, carry on
  58.             pass
  59.  
  60.  
  61. if __name__ == "__main__":
  62.     q1 = multiprocessing.Queue()
  63.     q2 = multiprocessing.Queue()
  64.  
  65.     proc = multiprocessing.Process(target=counter, args=(q1, q2, 1,))
  66.     proc.start()
  67.  
  68.     ui = UI()
  69.     ui.main(q1, q2)
  70.  
  71.     proc.join()

Obviously if the underlying implementation variables change this will break; hopefully a more standard way of getting at this information will be added to the API.

  1. Archives

  2. Categories

  3. Twitter

    • Just got pictures of the earthquake damage from my sister in Kaiapoi. Real "the ground opened up" stuff. 31 minutes ago
    • Ooo, the beta of Angry Birds is out on Android. 17 hours ago
    • Released a new version of my Android Internode widget; fixes the problem with Internode's new SSL cert. 22 hours ago
    • @wangjammer5: Cool, here's something to get you started: http://is.gd/eT33b :) 1 day ago
    • So, if iTune Ping is Apple's social network will the perpetual Apple tweet circle-jerk move there now? 1 day ago
  4. RSS Google Reader Shared Items