Custom Celery Tasks: including the enqueuing request_id to a task_id in Django

Adrienne Domingus
5 min readAug 22, 2019
https://unsplash.com/photos/ZnMTiwDYXao

Debugging background tasks can be challenging, given they’re not necessarily executed in the order they’re enqueued (assuming you have multiple workers processing them), may be retried if they fail the first time through, and have no direct link to the web request that enqueued them.

Including the id of the web request that enqueued them in the id of the task itself can help alleviate a lot of these debugging issues, and deobfuscate some of the “what happened, and why?” questions that may arise.

I’m using the celery library with Django — this implementation is specific to that combination.

The primary implementation outlined below was first written up in this Rover blog — I found this article super helpful, but still had a couple hurdles to figure out, so thought I’d go into more detail here! Specifically, things that weren’t covered in that blog post that I’ll cover here include:

  • What if you’re enqueuing your tasks via .delay instead of .apply_async?
  • Options for the id of background tasks that are themselves enqueued by another background task.
  • How to use the custom task class you’ve defined

High Level Strategy

The apply_async function of a celery Task takes a keyword argument called task_id, which it then passes on to the send_task method. If a task_id is not provided, within send_task, we see:

task_id = task_id or uuid()

In order to generalize our tasks to ensure that they always define their own task_id that also includes the id of the request that enqueued it, we can define a custom task class that handles this for us. Let’s look at the parts and prerequisites for this.

Generating the custom task_id

Of course, this strategy assumes you already have a request_id attribute available to you on all of your request objects. I covered this in some detail in a previous blog post — assuming you’ve done something similar, you can create a helper function such as:

import threading
local = threading.local()