Migrating RabbitMQ Tasks with Python & Kombu

  • Spin up a new plan/instance and start routing new tasks to this as they’re produced
  • Migrate tasks that didn’t get consumed over to the new plan for consumption
  • Remove the old plan

Setup — Getting CLI Input

This was written as a Python script — here are the docs for argparse, if you haven’t created a Python script with command line arguments before.

def _get_cli_input():
parser = argparse.ArgumentParser()
parser.add_argument('source_broker')
parser.add_argument('destination_broker')
parser.add_argument('--queues', required=True, help='queues comma separated')

args = parser.parse_args()
brokers = [args.source_broker, args.destination_broker]
queues = args.queues.split(',')

return brokers, queues

Connection

BROKERS, QUEUES = _get_cli_input()
SRC_BROKER = BROKERS[0]
DEST_BROKER = BROKERS[1]

src_conn = Connection(SRC_BROKER)
dest_conn = Connection(DEST_BROKER)

Consuming from the source queues

First the code:

for q in QUEUES:
src_channel = src_conn.channel()
src_exchange = Exchange(q, 'direct')
src_queue = Queue(q, exchange=src_exchange, routing_key=q)

dest_channel = dest_conn.channel()
dest_exchange = Exchange(q, 'direct')
producer = Producer(dest_channel, exchange=dest_exchange, serializer='json', routing_key=q)

consumer = Consumer(src_channel, src_queue, callbacks=[_process_message(producer)])
consumer.consume()

The callback

The signature of a callback is always a body and a message — because we also need access to the producer for the destination broker, we’ve defined this as a class that’s initialized with a producer — the __call__ function takes the default arguments and runs after initialization.

class _process_message():
def __init__(self, producer):
self.producer = producer

def __call__(self, body, message):
task = body.get('task')
eta = body.get('eta')
kwargs = body.get('kwargs')
dest_queue = Queue(q, exchange=self.producer.exchange, routing_key=q)
self.producer.publish(body, declare=[dest_queue])
message.ack()

The loop

Finally, we create a loop that times out after 10 seconds and exits if there aren’t any more messages to move over. This of course assumes new messages aren’t being published to your source broker anymore — the script won’t ever exit if they are!

while True:
timeout = 10
try:
src_conn.drain_events(timeout=10)
except socket.timeout:
src_conn.release()
exit(0)

--

--

Senior Software Engineer | www.adriennedomingus.com

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store