Tuesday, October 19, 2010

Message queues with Python

A while back, I wanted to build a web front end for a long-running python script. I started with a basic front end using Django. Django is a pleasantly straight-forward web framework, quite similar to Rails, easy to learn (with the help of the excellent and free Django book), and generally trouble-free. Pylons is an alternate choice.

Because the computation was fairly resource intensive, I thought to introduce a queue. The web-app could then concentrate on collecting the necessary input from the user and dumping a job on the queue, leaving the heavy lifting to a worker process (or several). We'd redirect the user to a status page where s/he could monitor progress and get results upon completion. Sounds simple enough, right? I figured my worker processes would look something like this:

big_chunka_data = load_big_chunka_data()
mo_data = load_mo_data()
queue = init_queue("http://myserver.com:12345", "user", "pw", "etc")

while <not-done>:
    try:
        message = queue.block_until_we_can_take_a_message()
        if message says shutdown: shutdown
        big_computation(message['param'],
                        message['foo'],
                        big_chunka_data,
                        mo_data)
    except e:
        log_errors(e)

...and the whole pile-o-junk would look like this:

Start up a handful of workers and a nice load balancing effect comes for free. Slow heavily loaded workers will take fewer jobs, while faster workers take more. I was also hoping for a good answer to the question, "What happens when one of our workers dies?"

Options

There are a ridiculous number of message queues to choose from. I looked at beanstalk which is nice and simple, but its python binding, pybeanstalk seems to be out of date. There's gearman, from Danga the source of memcached. That looked fairly straight forward as well, although be careful to get the newer python binding. Python, itself, now offers the multiprocessing module which has a queue.

One intriguing option is ZeroMQ (aka 0MQ). It's message queueing without a queue. It's brokerless, meaning there's no external queue server process. Messages are routed in common MQ patterns right down at the network level. Of course, if you want store and forward, you're on your own for the persistence part. Still, very cool... Python bindings for ZeroMQ are found in pyzmq.

Several on the seattle python mailing list recommended Celery. After a (superficial) look, Celery seemed too RPC-ish for my taste. I'm probably being up-tight, but when using a queue, I'd rather think in terms of sending a message than calling a function. That seems more decoupled and avoids making assumptions about the structure of the conversation and what's on the other side. I should probably lighten up. Celery is built on top of RabbitMQ, although they support other options.

RabbitMQ and Carrot

RabbitMQ, now part of the SpringSource empire (in turn owned by VMWare), aims to compete with Apache ActiveMQ as a full on enterprise messaging system based on the AMQP spec. I installed RabbitMQ using MacPorts, where you'll notice that RabbitMQ pulls in an absurd amount of dependencies.

sudo port selfupdate
sudo port install rabbitmq-server

For getting python to talk to RabbitMQ, Carrot is a nice option. It was a bit confusing at first, but some nice folks on the carrot-users mailing list set me straight. Apparently, Carrot's author is working on a rewrite called Kombu.

Here's what worked for me. A producer sends Python dictionary objects, which get turned into JSON. My example code is only slightly modified from Creating a Connection in the Carrot documentation. You'll need a little RabbitMQ terminology to understand the connection methods.

  • queues are addresses of receivers
  • exchanges are routers with their own process
  • virtual hosts are the unit of security

Producer

from carrot.connection import BrokerConnection
from carrot.messaging import Publisher

conn = BrokerConnection(hostname="localhost", port=5672,
                          userid="guest", password="guest",
                          virtual_host="/")

publisher = Publisher(connection=conn,
                    exchange="feed", routing_key="importer")

for i in range(30):
   publisher.send({"name":"foo", "i":i})
publisher.close()

The consumers print out the messages as they arrive, then sleep for a bit to simulate long-running tasks. I tested by starting two consumers, one with a longer sleep time. Then I started a producer and saw that the slower consumer got fewer messages, which is what I expected. Note that setting prefetch_count to 1 is necessary to achieve this low-budget load balancing effect.

Consumer

import time
import sys
from carrot.connection import BrokerConnection
from carrot.messaging import Consumer

# supply an integer argument for sleep time to simulate long-running tasks
if (len(sys.argv) > 1):
    sleep_time = int(sys.argv[1])
else:
    sleep_time = 1

connection = BrokerConnection(hostname="localhost", port=5672,
                          userid="guest", password="guest",
                          virtual_host="/")

consumer = Consumer(connection=connection, queue="feed",
                    exchange="feed", routing_key="importer")

def import_feed_callback(message_data, message):
    print "-" * 80
    print message_data
    print message
    message.ack()
    print "-" * 80
    time.sleep(sleep_time)

consumer.register_callback(import_feed_callback)
consumer.qos(prefetch_count=1)

consumer.consume() 
while True: 
    connection.drain_events()

The project remains incomplete and I'm not at all ready to say this is the best way to go about it. It's just the first thing I got working. RabbitMQ seems maybe a little heavy for a simple task queue, but it's also well supported and documented.

It seems like this sort of thing is less mature in the Python world than in Java. It's moving fast though.

Links, links, links

Obsolete note: The version of RabbitMQ in MacPorts (1.7.2) at the time was a version behind and broken. I had to dig through the compiler error log and add a closing paren in line 100 of rabbit_exchange.erl.

Tuesday, October 05, 2010

Economics meets computer science

I saw an interesting lecture by Vijay Vazirani on the cross-over between computer science and economics: The "Invisible Hand of the Market": Algorithmic Ratification and the Digital Economy. Starting with Adam Smith, he covered the development of algorithmic theories of markets and market equilibrium.

The first step was a proof that simple market models have equilibria. A very computational next question, then, is, "OK, these equilibria exist. How hard is it to find them?" Enter complexity theory.

Apparently, algorithmic game theory has been used to derive Equilibrium Pricing of Digital Goods via a New Market Model and was applied to cook up the pricing algorithm for Google's TV ads.

Attempts to compute equilibrium prices led Irving Fisher to develop his "Price Machine" in the early 1890's, which was essentially a hydraulic computer.

Vazirani wrote a book on approximation algorithms. I kept expecting him to introduce the idea that, while finding equilibria is very hard, markets approximate a solution. Maybe, given a set of conditions we could show that the approximation was within certain bounds of the true optimum. But, he didn't.

A few unanswered questions come to mind:

  • Can irrationality be modeled?
  • Do big differences in wealth or other asymmetries illustrate any divergence between the model and the real world?
  • Even if markets reach equilibrium in some theoretical long run, in the real world they're constantly buffeted by exogenous shocks, and therefore disequilibrium must be pretty common. He answered a question like this saying that no attempt to incorporate dynamics has been very successful.
  • Digital marketplaces like eBay or Google's Ad auction must provide a ridiculous treasure trove of data to be mined for empirical economics.

The Q&A session degenerated into politics pretty quickly. People asked about how the financial crisis might have challenged his models. One questioner asked about "animal spirits". Vazirani defended the neoclassical line and stated that their algorithms had "ratified the free market". The questioner responded that "The markets have ratified P.T. Barnum". Another audience member added "...because of government interference". It's surprising that people aren't able to differentiate between a theoretical result based on a carefully constructed model and political opinions in the real world, but that stuff belongs over here.

Monday, October 04, 2010

Worse than linear performance in Rails ActiveRecords with associations

The other day I was doing a task that required iterating through 142 thousand active record objects and computing simple aggregate statistics. Taking convenience over efficiency, I thought I'd just whip up a script to run in a the Ruby on Rails script runner. I usually put little print-line debugging statements in programs like that as a poor man's progress bar. I noticed that as the script ran, it seemed to get slower and slower.

My ActiveRecord abjects are linked up as shown below. The names are changed to protect the guilty, so I might be neglecting something important.

class Basket < ActiveRecord::Base
  has_many :basket_items
  has_many :items, :through => :basket_items
end

class Item < ActiveRecord::Base
  belongs_to :category
  has_many :basket_items
  has_many :baskets, :through => :basket_items
end
Basket.find(:all).each do |basket|
  puts "basket.id = #{basket.id-20000}\t#{Time.now - start_time}" if basket.id % 1000 == 0
end

This part is super fast, 'cause nothing much is done inside the loop other than printing. I don't know exactly how Rails does the conversion from DB fields to objects. That bit seems to be taking place outside the loop. Anyway, what happens if we access the associated items?

counter = 0
Basket.find(:all).each do |basket|
  puts "basket.id = #{basket.id-20000}\t#{Time.now - start_time}" if basket.id % 1000 == 0
  counter += basket.items.length
end

In the second case, we trigger the lazy-load of the list of items. Baskets average 18.7 items. The distribution of item count within the data is more or less random and, on average, flat. Now, we see the timings below.

In other words, this is an nxm operation, but m (the number of items) is more or less constant. I can't guess why this wouldn't be linear. Garbage collection? Should that level off? Maybe, we're just seeing the cost of maintaining the heap? Items are also associated with baskets. Maybe Rails is spending time fixing up that association?

The real script, only a little more complex that the one above, ran in about 30 hours. I realize this is all a little half-baked and I don't intend to chase it down further, but I'm hoping to nerd snipe someone into figuring it out. I'm probably missing a lot and leaving out too much.

Saturday, October 02, 2010

CouchDB and R

Here are some quick crib notes on getting R talking to CouchDB using Couch's ReSTful HTTP API. We'll do it in two different ways. First, we'll construct HTTP calls with RCurl, then move on to the R4CouchDB package for a higher level interface. I'll assume you've already gotten started with CouchDB and are familiar with the basic ReST actions: GET PUT POST and DELETE.

First install RCurl and RJSONIO. You'll have to download the tar.gz's if you're on a Mac. For the second part, we'll need to install R4CouchDB, which depends on the previous two. I checked it out from GitHub and used R CMD INSTALL.

ReST with RCurl

Ping server

getURL("http://localhost:5984/")
[1] "{\"couchdb\":\"Welcome\",\"version\":\"1.0.1\"}\n"

That's nice, but we want to get the result back as a real R data structure. Try this:

welcome <- fromJSON(getURL("http://localhost:5984/"))
welcome$version
[1] "1.0.1"

Sweet!

PUT

One way to add a new record is with http PUT.

bozo = list(name="Bozo", occupation="clown", shoe.size=100)
getURL("http://localhost:5984/testing123/bozo",
       customrequest="PUT",
       httpheader=c('Content-Type'='application/json'),
       postfields=toJSON(bozo))
[1] "{\"ok\":true,\"id\":\"bozo\",\"rev\":\"1-70f5f59bf227d2d715c214b82330c9e5\"}\n"

Notice that RJSONIO has no high level PUT method, so you have to fake it using the costumrequest parameter. I'd never have figured that out without an example from R4CouchDB's source. The API of libCurl is odd, I have to say, and RCurl mostly just reflects it right into R.

If you don't like the idea of sending a put request with a get function, you could use RCurl's curlPerform. Trouble is, curlPerform returns an integer status code rather than the response body. You're supposed to provide an R function to collect the response body text. Not really worth the bother, unless you're getting into some of the advanced tricks described in the paper, R as a Web Client - the RCurl package.

bim <-  list(
  name="Bim", 
  occupation="clown",
  tricks=c("juggling", "pratfalls", "mocking Bolsheviks"))
reader = basicTextGatherer()
curlPerform(
  url = "http://localhost:5984/testing123/bim",
  httpheader = c('Content-Type'='application/json'),
  customrequest = "PUT",
  postfields = toJSON(bim),
  writefunction = reader$update
)
reader$value()

GET

Now that there's something in there, how do we get it back? That's super easy.

bozo2 <- fromJSON(getURL("http://localhost:5984/testing123/bozo"))
bozo2
$`_id`
[1] "bozo"

$`_rev`
[1] "1-646331b58ee010e8df39b5874b196c02"

$name
[1] "Bozo"

$occupation
[1] "clown"

$shoe.size
[1] 100

PUT again for updating

Updating is done by using PUT on an existing document. For example, let's give Bozo, some mad skillz:

getURL(
  "http://localhost:5984/testing123/bozo",
  customrequest="PUT",
  httpheader=c('Content-Type'='application/json'),
  postfields=toJSON(bozo2))

POST

If you POST to the database, you're adding a document and letting CouchDB assign its _id field.

bender = list(
  name='Bender',
  occupation='bending',
  species='robot')
response <- fromJSON(getURL(
  'http://localhost:5984/testing123/',
  customrequest='POST',
  httpheader=c('Content-Type'='application/json'),
  postfields=toJSON(bender)))
response
$ok
[1] TRUE

$id
[1] "2700b1428455d2d822f855e5fc0013fb"

$rev
[1] "1-d6ab7a690acd3204e0839e1aac01ec7a"

DELETE

For DELETE, you pass the doc's revision number in the query string. Sorry, Bender.

response <- fromJSON(getURL("http://localhost:5984/testing123/2700b1428455d2d822f855e5fc0013fb?rev=1-d6ab7a690acd3204e0839e1aac01ec7a",
  customrequest="DELETE"))

CRUD with R4CouchDB

R4CouchDB provides a layer on top of the techniques we've just described.

R4CouchDB uses a slightly strange idiom. You pass a cdb object, really just a list of parameters, into every R4CouchDB call and every call returns that object again, maybe modified. Results are returned in cdb$res. Maybe, they did this because R uses pass by value. Here's how you would initialize the object.

cdb <- cdbIni()
cdb$serverName <- "localhost"
cdb$port <- 5984
cdb$DBName="testing123"

Create

fake.data <- list(
  state='WA',
  population=6664195,
  state.bird='Lady GaGa')
cdb$dataList <- fake.data
cdb$id <- 'fake.data'  ## optional, otherwise an ID is generated
cdb <- cdbAddDoc(cdb)

cdb$res
$ok
[1] TRUE

$id
[1] "fake.data"

$rev
[1] "1-14bc025a194e310e79ac20127507185f"

Read

cdb$id <- 'bozo'
cdb <- cdbGetDoc(cdb)

bozo <- cdb$res
bozo
$`_id`
[1] "bozo"
... etc.

Update

First we take the document id and rev from the existing document. Then, save our revised document back to the DB.

cdb$id <- bozo$`_id`
cdb$rev <- bozo$`_rev`
bozo = list(
  name="Bozo",
  occupation="assassin",
  shoe.size=100,
  skills=c(
    'pranks',
    'honking nose',
    'kung fu',
    'high explosives',
    'sniper',
    'lock picking',
    'safe cracking'))
cdb <- cdbUpdateDoc(bozo)

Delete

Shortly thereafter, Bozo mysteriously disappeared.

cdb$id = bozo$`_id`
cdb <- cdbDeleteDoc(cdb)

More on ReST and CouchDB

  • One issue you'll probably run into is that unfortunately JSON left out NaN and Infinity. And, of course only R knows about NAs.
  • One-off ReST calls are easy using curl from the command line, as described in REST-esting with cURL.
  • I flailed about quite a bit trying to figure out the best way to do HTTP with R.
  • I originally thought R4CouchDB was part of a Google summer of code project to support NoSQL DBs in R. Dirk Eddelbuettel clarified that R4CouchDB was developed independently. In any case, the schema-less approach fits nicely with R's philosophy of exploratory data analysis.