Actually, this post is related to my previous one about creating a dead letter queue. Now, the problem is how to read from this queue and re-publish celery tasks to a queue.

How to read messages from RabbitMQ

Three ways that I found can read messages from rabbitmq:

  • pika
  • kombu
  • rabbitmq management plugin’s http api

Pika

pika is a pure rabbitmq client library in Python.

The following code allows you to read messages from queues in rabbitmq from RabbitMQ Tutorial - Work Queues:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='default.deadletter', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % ch)
    print(" [x] Received %r" % method)
    print(" [x] Received %r" % body)
    print(" [x] Received %r" % properties)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

    # the following code will delete the message from the queue
    ch.basic_ack(delivery_tag=method.delivery_tag)


# Fair dispatch instead of the default round-robin dispatch
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='default.deadletter')

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

properties has all the task information which can be used for retrying the dead task in Celery. This consumer is based on defining a callback function to process incoming messages, which is a good pattern if you want to write a worker. But I hope to find a more active way to fetch a certain number of messages.

Kombu

kombu is a messaging library for Python. With kombu, you can use different transports, such as redis, mongodb, SQS, etc. Celery uses kombu as the underlying messaging library.

broker_url = f'amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOSTNAME}:{RABBITMQ_PORT}/{RABBITMQ_VHOST_PACMAN}'

with Connection(broker_url) as conn:
    simple_queue = conn.SimpleQueue(queue)
    queue_size = simple_queue.qsize()
    messages = []

    for i in range(queue_size):
        message = simple_queue.get(block=False)
        messages.append(message)

    print(f"Delete all ({len(messages)}) tasks?")
    input("Press Enter to continue...")

    for i, message in enumerate(messages):
        print(f'Deleting {i+1}/{len(messages)} task: {message.headers["task"]}, task_id: {message.headers["id"]}')
        # delete the message from the queue
        message.ack()

    simple_queue.close()

This is already a simple and clear way, but we can find a better one with HTTP API.

Rabbitmq management plugin’s HTTP API

This HTTP api is very easy to use. But two points should be noticed:

  • It should only be used for analysis purpose, instead of reading massive messages from rabbitmq.
  • Be careful about the requeue parameter. When requeue sets to false, messages are deleted from the queue. If your program exits accidentally and requeue sets to false, then data is lost. To solve this issue, my script reads twice, first time to requeue and second time to clean up the messages.

The code shows as follows:

endpoint = f'/api/queues/{urllib.parse.quote_plus(vhost)}/{queue}/get'
payload = {
    "count": size,
    "requeue": requeue,
    "encoding": "auto",
    "truncate": 50000
}

resp = requests.post(
    url.strip('/') + endpoint,
    auth=HTTPBasicAuth(RABBITMQ_USERNAME, RABBITMQ_PASSWORD),
    json=payload)
print(resp.json())

Re-publish celery task messages

Of course, you can use celery code to start a new task. However, usually, workers are in a private network and you have to hop to one of the machine where workers are running and it has related code. If you run workers in containers, you have to run the command with containers. Basically, you need to

  • ssh to the machine in the public network where can ssh to the machine in private network
  • ssh to the private machine
  • go into the containers, docker exec -it [container_id] bash
  • open a python or ipython interactive interpreter
  • run the command

Ideally, the script should be able to run on local dev. Of course, safty is a concern that you don’t want to purge a production queue mistakenly.

The following code will re-publish the message to an exchange with a given routing key:

# use rabbitmq management plugin's http api
del old_payload['properties']['headers']['eta']
del old_payload['properties']['headers']['expires']
del old_payload['properties']['headers']['timelimit']

endpoint = f'/api/exchanges/{urllib.parse.quote_plus(vhost)}/{exchange}/publish'
payload = {
    "properties": old_payload['properties'],
    "routing_key": routing_key,
    "payload": old_payload['payload'],
    "payload_encoding": "string"
}

resp = requests.post(
    url.strip('/') + endpoint,
    auth=HTTPBasicAuth(RABBITMQ_USERNAME, RABBITMQ_PASSWORD),
    json=payload)