Making Celery work nicely with Django transactions
I’ve been using Celery in my Django projects for about 5 years now. Along the way, I made some mistakes, learned from them and picked up a few best practices. I will try to share a few of them here.
Avoid serialising complex objects
This is a pretty simple best practice, but was easy to miss as the default serialization was pickle in before v4 of Celery, the default is now JSON and enforces this. Failing that may cause hard to catch issues like when the pickle protocol changes. This happened when we migrated from Python 2 to 3 for instance. It’s pretty much impossible to catch by your test suite, but will happen for sure in production.
Worker throwing ModelInstanceDoesNotExist
In a Django project, a complex object is very likely to be a model instance, that’s where your data lives after all. The usual way to work with the previous best practice is to pass the id
of a model instance when submitting the task and fetch the instance again from DB when handling the task on the worker.
# Send task as
your_task.delay(user_id=user.id)
# and run as
@app.task()
def your_task(user_id):
user = User.objects.get(
id=user_id
)
...
However, when doing this in a naive way, this can lead to exceptions due to missing instances. It took me a while to get this one the first time it happened to me, especially given that the instance with the offending id
was always there when checking manually, and there was no issues in my unit tests.
Experienced readers probably know the answer: transactions. Basically, the task is picked up by the worker before the main transaction is committed in the database. We need to delay task submission until the transaction is committed. Luckily, Django provides a hook just for that type of things: transaction.on_commit()
. This is well documented in the Celery and the since 1.10 even Django uses Celery as an example in their documentation.
Problems with this solution
This hook is great, but causes a bit of boilerplate. Each time you want to use that, you need to define a lambda
when you call the task. It can get quite repetitive when you start to have many tasks. Not only its longer to write and hurts readability, it may also cause some weird bugs depending on how it’s used. Here is a very simplified example:
# tasks.py
@app.task()
def print_value(val):
print(val)
# views.py
def my_view(request):
for index in range(5):
transaction.on_commit(
lambda: print_value.delay(index)
)
What would this print on the worker process? The answer is 5 times the number ‘4’. If you found it, well done! It’s a documented behaviour of lambdas, but in a more realistic code example with longer functions and more complex parameters, it would easily be missed.
A better API
As of Celery v5.4, released 17th April 2024, this functionality has now been released upstream. See the Celery docs for more information.
One idea that may come to mind is to wrap this in a decorator. This was suggested as an improvement to Django but was rejected after some discussion. I agree with the counter points: a decorator would hurt the readability of the code calling the decorated function.
What if we could wrap this boilerplate into a helper function to handle this nicely? Or better, attach this helper to our tasks? Ideally, we want to be able to call our tasks like this:
print_value.delay_on_commit(5)
That’s what I’m talking about. A simple API that is easy to use, and clear to read. This also addresses the drawbacks of the previous decorator idea. The good news is that Celery provides a proper way to extend the default behaviour making it not very hard to implement.
Under the hood, the @task
decorator run your task body in the run()
method of the Celery base task class. This class is where the delay()
callable comes from. It’s possible to override the base class either when defining the task function or more globally, when defining the Celery app:
from celery import Celery
app = Celery(
'tasks',
task_cls='your_task:BetterTask',
)
And your base task would look like this:
from celery import Task
from django.db import transaction
class BetterTask(Task):
def delay_on_commit(self, *args, **kwargs):
return transaction.on_commit(
lambda: self.delay(*args, **kwargs)
)
And that’s all you need! Now you have a nicer API, less error prone, you just need to remember to use it. How would you use it? If we modify our previous view, we would get the following:
def my_view(request):
for index in range(5):
print_value.delay_on_commit(index)
If we run it, we can confirm that it works now as intended, printing the numbers 1 to 5. Nice!
Closing words
As Django developers, we are generally less used to the asynchronous way of thinking than in other programming languages, as Django does not requires us to think about that (yet). On the other hand, Celery is asynchronous by its very nature, and it’s easy to cause some unusual bugs in your application, if not careful. By adding a wrapper around a common pattern, to expose a better API, we can make our code easier to write, read and avoid bugs.