Multiple Concurrent Connections with py-amqplib and Eventlet

AMQP is a nice and efficient standard for message queueing applications.  This is a pretty cool little sector of applications that has been growing lately; there’s a huge demand for the sort of fire-and-forget application model that MQ-style software supports.  Last year we evaluated a bunch of them.

Anyhow, our favorite Python client is py-amqplib.  It’s small, it’s fast-starting, and its interface is as clean as any AMQP client can be.  One use case I ran into recently was that I needed to consume queues from two separate hosts at the same time.  Both were meant to handle high volumes of traffic, so I wanted it to be as efficient as possible.

Enter Eventlet!  I think this illustrates how convenient it is to use Eventlet for this sort of use case.  There’s about three lines of Eventlet-specific code in this, and it instantly transforms this “non-asynchronous” library into an event-driven, asynchronous, yet still easy-to-use one.  Don’t bother switching to a different library, just use Eventlet with the library you like.  It amazes me every time.

My code, let me show you it.

from eventlet import patcher
amqp = patcher.import_patched('amqplib.client_0_8')
import eventlet

EXCHANGE='example'

def main_callback(msg):
    msg.channel.basic_ack(msg.delivery_tag)
    print "Got message", msg.routing_key, msg.body

def connect_to_host(hostname, keys):
    print "Connecting to ", hostname
    conn = amqp.Connection(HOST_B, userid='guest', password='guest', ssl=False, insist=True)
    ch = conn.channel()
    ch.access_request('/data', active=True, read=True)
    ch.exchange_declare(EXCHANGE, 'topic', auto_delete=True, durable=False)
    qname, _, _ = ch.queue_declare(hostname + '_test_queue', auto_delete=True, durable=False)
    for key in keys:
        ch.queue_bind(qname, EXCHANGE, key)
    ch.basic_consume(qname, callback=main_callback)
    while ch.callbacks:
        ch.wait()
if __name__ == '__main__':
    eventlet.spawn(connect_to_host, 'host_a.example.com', ['key_a'])
    connect_to_host('host_b.lindenlab.com', ['key_b'])

This code was written using the development version of Eventlet, which you can get yourself using the instructions in the sidebar on the right.  If you’re curious about that patcher module, check out the patcher documentation.

About these ads

2 responses to this post.

  1. Posted by Ale on February 16, 2010 at 2:57 pm

    Nice!

    But, does this mean a patched version of py-amqplib with eventlet is similar to txamqp? Could I just replace txamqp with this? My code has several consumers of differents queues using one channel, and producers too, all sending through the same channel.

    Thanks,

  2. Posted by Which Linden on February 16, 2010 at 7:54 pm

    Sure; I haven’t tested this with multiple producers myself, but it seems like it ought to work.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.