Tuesday, October 19, 2010

Message queues with Python

A while back, I wanted to build a web front end for a long-running python script. I started with a basic front end using Django. Django is a pleasantly straight-forward web framework, quite similar to Rails, easy to learn (with the help of the excellent and free Django book), and generally trouble-free. Pylons is an alternate choice.

Because the computation was fairly resource intensive, I thought to introduce a queue. The web-app could then concentrate on collecting the necessary input from the user and dumping a job on the queue, leaving the heavy lifting to a worker process (or several). We'd redirect the user to a status page where s/he could monitor progress and get results upon completion. Sounds simple enough, right? I figured my worker processes would look something like this:

big_chunka_data = load_big_chunka_data()
mo_data = load_mo_data()
queue = init_queue("http://myserver.com:12345", "user", "pw", "etc")

while <not-done>:
    try:
        message = queue.block_until_we_can_take_a_message()
        if message says shutdown: shutdown
        big_computation(message['param'],
                        message['foo'],
                        big_chunka_data,
                        mo_data)
    except e:
        log_errors(e)

...and the whole pile-o-junk would look like this:

Start up a handful of workers and a nice load balancing effect comes for free. Slow heavily loaded workers will take fewer jobs, while faster workers take more. I was also hoping for a good answer to the question, "What happens when one of our workers dies?"

Options

There are a ridiculous number of message queues to choose from. I looked at beanstalk which is nice and simple, but its python binding, pybeanstalk seems to be out of date. There's gearman, from Danga the source of memcached. That looked fairly straight forward as well, although be careful to get the newer python binding. Python, itself, now offers the multiprocessing module which has a queue.

One intriguing option is ZeroMQ (aka 0MQ). It's message queueing without a queue. It's brokerless, meaning there's no external queue server process. Messages are routed in common MQ patterns right down at the network level. Of course, if you want store and forward, you're on your own for the persistence part. Still, very cool... Python bindings for ZeroMQ are found in pyzmq.

Several on the seattle python mailing list recommended Celery. After a (superficial) look, Celery seemed too RPC-ish for my taste. I'm probably being up-tight, but when using a queue, I'd rather think in terms of sending a message than calling a function. That seems more decoupled and avoids making assumptions about the structure of the conversation and what's on the other side. I should probably lighten up. Celery is built on top of RabbitMQ, although they support other options.

RabbitMQ and Carrot

RabbitMQ, now part of the SpringSource empire (in turn owned by VMWare), aims to compete with Apache ActiveMQ as a full on enterprise messaging system based on the AMQP spec. I installed RabbitMQ using MacPorts, where you'll notice that RabbitMQ pulls in an absurd amount of dependencies.

sudo port selfupdate
sudo port install rabbitmq-server

For getting python to talk to RabbitMQ, Carrot is a nice option. It was a bit confusing at first, but some nice folks on the carrot-users mailing list set me straight. Apparently, Carrot's author is working on a rewrite called Kombu.

Here's what worked for me. A producer sends Python dictionary objects, which get turned into JSON. My example code is only slightly modified from Creating a Connection in the Carrot documentation. You'll need a little RabbitMQ terminology to understand the connection methods.

  • queues are addresses of receivers
  • exchanges are routers with their own process
  • virtual hosts are the unit of security

Producer

from carrot.connection import BrokerConnection
from carrot.messaging import Publisher

conn = BrokerConnection(hostname="localhost", port=5672,
                          userid="guest", password="guest",
                          virtual_host="/")

publisher = Publisher(connection=conn,
                    exchange="feed", routing_key="importer")

for i in range(30):
   publisher.send({"name":"foo", "i":i})
publisher.close()

The consumers print out the messages as they arrive, then sleep for a bit to simulate long-running tasks. I tested by starting two consumers, one with a longer sleep time. Then I started a producer and saw that the slower consumer got fewer messages, which is what I expected. Note that setting prefetch_count to 1 is necessary to achieve this low-budget load balancing effect.

Consumer

import time
import sys
from carrot.connection import BrokerConnection
from carrot.messaging import Consumer

# supply an integer argument for sleep time to simulate long-running tasks
if (len(sys.argv) > 1):
    sleep_time = int(sys.argv[1])
else:
    sleep_time = 1

connection = BrokerConnection(hostname="localhost", port=5672,
                          userid="guest", password="guest",
                          virtual_host="/")

consumer = Consumer(connection=connection, queue="feed",
                    exchange="feed", routing_key="importer")

def import_feed_callback(message_data, message):
    print "-" * 80
    print message_data
    print message
    message.ack()
    print "-" * 80
    time.sleep(sleep_time)

consumer.register_callback(import_feed_callback)
consumer.qos(prefetch_count=1)

consumer.consume() 
while True: 
    connection.drain_events()

The project remains incomplete and I'm not at all ready to say this is the best way to go about it. It's just the first thing I got working. RabbitMQ seems maybe a little heavy for a simple task queue, but it's also well supported and documented.

It seems like this sort of thing is less mature in the Python world than in Java. It's moving fast though.

Links, links, links

Obsolete note: The version of RabbitMQ in MacPorts (1.7.2) at the time was a version behind and broken. I had to dig through the compiler error log and add a closing paren in line 100 of rabbit_exchange.erl.