Custom Celery Tasks: including the enqueuing request_id to a task_id in Django
Debugging background tasks can be challenging, given they’re not necessarily executed in the order they’re enqueued (assuming you have multiple workers processing them), may be retried if they fail the first time through, and have no direct link to the web request that enqueued them.
Including the id of the web request that enqueued them in the id of the task itself can help alleviate a lot of these debugging issues, and deobfuscate some of the “what happened, and why?” questions that may arise.
I’m using the celery library with Django — this implementation is specific to that combination.
The primary implementation outlined below was first written up in this Rover blog — I found this article super helpful, but still had a couple hurdles to figure out, so thought I’d go into more detail here! Specifically, things that weren’t covered in that blog post that I’ll cover here include:
- What if you’re enqueuing your tasks via
- Options for the id of background tasks that are themselves enqueued by another background task.
- How to use the custom task class you’ve defined
High Level Strategy
apply_async function of a celery
Task takes a keyword argument called
task_id, which it then passes on to the
send_task method. If a
task_id is not provided, within
send_task, we see:
task_id = task_id or uuid()
In order to generalize our tasks to ensure that they always define their own
task_id that also includes the id of the request that enqueued it, we can define a custom task class that handles this for us. Let’s look at the parts and prerequisites for this.
Generating the custom task_id
Of course, this strategy assumes you already have a
request_id attribute available to you on all of your
request objects. I covered this in some detail in a previous blog post — assuming you’ve done something similar, you can create a helper function such as:
local = threading.local()def get_local_request_id():
return getattr(local, 'request_id')
Given this, within our custom task class, we can have a helper function that does something like this:
from uuid import uuid4
from celery import taskdef _generate_task_id():
request_id = get_local_request_id()
task_id = str(uuid4())
if not request_id and task.current and hasattr(task.current, 'request'):
request_id = task.current.request.id
request_id = request_id.split("_")
task_id = '_'.join((request_id, task_id))
Let’s talk through each of those pieces, one by one:
request_id=get_local_request_id()— we talked about this above, this will retrieve the
request_idoff your local thread, assuming it was set there by a piece of middleware.
task_id = str(uuid4())— this just generates our unique
task_idin the same way the celery library would if we didn’t do it ourselves
if not request_id and task.current and hasattr(task.current, 'request'):— this bit handles the scenario where a background task was enqueued by another background task. In this case, the request won’t have gone through our middleware to set the
request_idon the local thread, but the all tasks have the id of the request that enqueued them, we can get it that way:
request_id = task.current.request.id
request_id = request_id.split(“_”)— This is necessary for the same reason the step above may be necessary — tasks can be enqueued from other background tasks. If you want to be able to trace that entire flow back to the beginning, you may consider skipping this step. Doing this means that each
task_idwill only be prepended by the id of the actual web request that led to its enqueuing, but will not include the id of the task that directly enqueued it, if there was one. We chose this approach because, especially once retries are taken into account, there is potential for the
task_idto get really really long if you skip this — this can make it both less useful, and depending on how often you do this, or how deep the enqueuing of tasks from other tasks goes in your application, you could overrun the max length of 255 for a
task_id = '_'.join((request_id, task_id))— this is the final step of prepending our uniquely generated
request_id! Because we’re joining them with an
_, which is not a character that will be found in either of the ids themselves, we know we will always be able to separate one id from the other.
.delay versus .apply_async
Now that we have our custom
task_id…let’s use it! We’ll rely on the fact that celery’s
apply_async takes in a
task_id as a keyword argument. However, this can present a problem if you’re enqueuing your tasks using
.delay instead. You could of course change all of your usages of
.delay to be
apply_async, but that strategy would require both a larger code change than necessary, and also means that any future additions of calls to
.delay by engineers who aren’t aware of the custom task implementation will lose the custom
task_id functionality. Better if we could handle both. Good news — we can!
In celery’s base
Task class, the docstring on the
delay method says this:
Star argument version of :meth:`apply_async`
And the method itself is a handy one-liner that does this:
return self.apply_async(args, kwargs)
So we can apply a similar strategy, but move it up the stack a bit. The
apply_async method on our custom task class will call our helper method to generate a custom
task_id, and pass that into a call to
apply_async, which will take us into celery’s, as we need.
def apply_async(self, args=None, kwargs=None, task_id=None, **options):
task_id = task_id or self._generate_task_id()
return super(CustomTask, self).apply_async(args=args, kwargs=kwargs, task_id=task_id, **options)
And then, our custom task class’s implementation of
.delay will….do the same thing! Instead of calling
super on the
delay function, we’ll call straight into celery’s
apply_async from our
delay function. That’ll look like this:
def delay(self, *args, **kwargs):
task_id = self._generate_task_id()
return super(CustomTask, self).apply_async(args, kwargs, task_id=task_id)
Using the custom task class
Once your custom task is defined, you can use it by adding it as the
base to any of your already defined tasks. So something that used to be defined as:
And we’re off!