[HL-25] How to read and retry Celery task messages from RabbitMQ
Actually, this post is related to my previous one about creating a dead letter queue. Now, the problem is how to read from this queue and re-publish celery tasks to a queue.
How to read messages from RabbitMQ
Three ways that I found can read messages from rabbitmq:
- pika
- kombu
- rabbitmq management plugin’s http api
Pika
pika is a pure rabbitmq client library in Python.
The following code allows you to read messages from queues in rabbitmq from RabbitMQ Tutorial - Work Queues:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='default.deadletter', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % ch)
print(" [x] Received %r" % method)
print(" [x] Received %r" % body)
print(" [x] Received %r" % properties)
time.sleep(body.count(b'.'))
print(" [x] Done")
# the following code will delete the message from the queue
ch.basic_ack(delivery_tag=method.delivery_tag)
# Fair dispatch instead of the default round-robin dispatch
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='default.deadletter')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
properties
has all the task information which can be used for retrying the dead task in Celery. This consumer is based on defining a callback function to process incoming messages, which is a good pattern if you want to write a worker. But I hope to find a more active way to fetch a certain number of messages.
Kombu
kombu is a messaging library for Python. With kombu, you can use different transports, such as redis, mongodb, SQS, etc. Celery uses kombu as the underlying messaging library.
broker_url = f'amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOSTNAME}:{RABBITMQ_PORT}/{RABBITMQ_VHOST_PACMAN}'
with Connection(broker_url) as conn:
simple_queue = conn.SimpleQueue(queue)
queue_size = simple_queue.qsize()
messages = []
for i in range(queue_size):
message = simple_queue.get(block=False)
messages.append(message)
print(f"Delete all ({len(messages)}) tasks?")
input("Press Enter to continue...")
for i, message in enumerate(messages):
print(f'Deleting {i+1}/{len(messages)} task: {message.headers["task"]}, task_id: {message.headers["id"]}')
# delete the message from the queue
message.ack()
simple_queue.close()
This is already a simple and clear way, but we can find a better one with HTTP API.
Rabbitmq management plugin’s HTTP API
This HTTP api is very easy to use. But two points should be noticed:
- It should only be used for analysis purpose, instead of reading massive messages from rabbitmq.
- Be careful about the
requeue
parameter. Whenrequeue
sets tofalse
, messages are deleted from the queue. If your program exits accidentally andrequeue
sets tofalse
, then data is lost. To solve this issue, my script reads twice, first time to requeue and second time to clean up the messages.
The code shows as follows:
endpoint = f'/api/queues/{urllib.parse.quote_plus(vhost)}/{queue}/get'
payload = {
"count": size,
"requeue": requeue,
"encoding": "auto",
"truncate": 50000
}
resp = requests.post(
url.strip('/') + endpoint,
auth=HTTPBasicAuth(RABBITMQ_USERNAME, RABBITMQ_PASSWORD),
json=payload)
print(resp.json())
Re-publish celery task messages
Of course, you can use celery code to start a new task. However, usually, workers are in a private network and you have to hop to one of the machine where workers are running and it has related code. If you run workers in containers, you have to run the command with containers. Basically, you need to
- ssh to the machine in the public network where can ssh to the machine in private network
- ssh to the private machine
- go into the containers,
docker exec -it [container_id] bash
- open a python or ipython interactive interpreter
- run the command
Ideally, the script should be able to run on local dev. Of course, safty is a concern that you don’t want to purge a production queue mistakenly.
The following code will re-publish the message to an exchange with a given routing key:
# use rabbitmq management plugin's http api
del old_payload['properties']['headers']['eta']
del old_payload['properties']['headers']['expires']
del old_payload['properties']['headers']['timelimit']
endpoint = f'/api/exchanges/{urllib.parse.quote_plus(vhost)}/{exchange}/publish'
payload = {
"properties": old_payload['properties'],
"routing_key": routing_key,
"payload": old_payload['payload'],
"payload_encoding": "string"
}
resp = requests.post(
url.strip('/') + endpoint,
auth=HTTPBasicAuth(RABBITMQ_USERNAME, RABBITMQ_PASSWORD),
json=payload)