[HL-24] How to route tasks in Celery + RabbitMQ
Continue to my last post. How should we route tasks to different queues in Celery?
Here is the example:
We can specify queue, exchange, routing_key when calling apply_async
. However, it is very confusing that when calling apply_async(routing_key='xxx')
, the message goes to the default queue. If you don’t set app.conf.task_default_queue
, it will create a queue with name celery
for you.
Actually, the trick is to specify either queue
or exchange + routing_key
. I tried the following examples:
add.apply_async((1,2))
=> default_queue
add.apply_async((1,2), routing_key='moon')
=> default_queue
add.apply_async((1,2), routing_key='sunshine')
=> default_queue
add.apply_async((1,2), exchange='default', routing_key='moon')
=> moon_queue
add.apply_async((1,2), exchange='default', routing_key='sunshine')
=> sunshine_queue
add.apply_async((1,2), queue='sunshine')
=> sunshine_queue
add.apply_async((1,2), queue='moon')
=> moon_queue
So there are two options to route tasks to specific queues in celery:
- queue
- exchange + routing_key
Routing tasks in celery chaining
It is a bit special how to route route tasks in celery chaining. Originally, I thought the following code would work:
task = chain(add.s(1, 2), div.s(3))
task.apply_async(queue='moon')
Unexpectedly, add
task will go to moon
queue but div
task will be forwarded to the default
queue. What I expect is to route both tasks to moon
queue.
I find an answer:
chain(
add.s(1, 2).set(queue='moon'),
div.s(3).set(queue='moon')
).apply_async()
Now, all tasks will be forwarded to moon
queue. It is very inconvenient but this is what it is.