Migrating RabbitMQ Tasks with Python & Kombu

  • Spin up a new plan/instance and start routing new tasks to this as they’re produced
  • Migrate tasks that didn’t get consumed over to the new plan for consumption
  • Remove the old plan

Setup — Getting CLI Input

def _get_cli_input():
parser = argparse.ArgumentParser()
parser.add_argument('source_broker')
parser.add_argument('destination_broker')
parser.add_argument('--queues', required=True, help='queues comma separated')

args = parser.parse_args()
brokers = [args.source_broker, args.destination_broker]
queues = args.queues.split(',')

return brokers, queues

Connection

BROKERS, QUEUES = _get_cli_input()
SRC_BROKER = BROKERS[0]
DEST_BROKER = BROKERS[1]

src_conn = Connection(SRC_BROKER)
dest_conn = Connection(DEST_BROKER)

Consuming from the source queues

for q in QUEUES:
src_channel = src_conn.channel()
src_exchange = Exchange(q, 'direct')
src_queue = Queue(q, exchange=src_exchange, routing_key=q)

dest_channel = dest_conn.channel()
dest_exchange = Exchange(q, 'direct')
producer = Producer(dest_channel, exchange=dest_exchange, serializer='json', routing_key=q)

consumer = Consumer(src_channel, src_queue, callbacks=[_process_message(producer)])
consumer.consume()

The callback

class _process_message():
def __init__(self, producer):
self.producer = producer

def __call__(self, body, message):
task = body.get('task')
eta = body.get('eta')
kwargs = body.get('kwargs')
dest_queue = Queue(q, exchange=self.producer.exchange, routing_key=q)
self.producer.publish(body, declare=[dest_queue])
message.ack()

The loop

while True:
timeout = 10
try:
src_conn.drain_events(timeout=10)
except socket.timeout:
src_conn.release()
exit(0)

Senior Software Engineer | www.adriennedomingus.com

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Adrienne Domingus

Adrienne Domingus

Senior Software Engineer | www.adriennedomingus.com

More from Medium

mysqlclient with Python 3.10 / Docker

How to interact with Memcached using Telnet, Netcat, Python, or Django

Add the slug field inside Django Model

How to Fix Templates and Static Files of the Grocery Bag App Created Using Django