Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delivery tags in publisher confirm mode are not handled properly #101

Open
WGH- opened this issue Apr 2, 2019 · 0 comments
Open

Delivery tags in publisher confirm mode are not handled properly #101

WGH- opened this issue Apr 2, 2019 · 0 comments

Comments

@WGH-
Copy link

WGH- commented Apr 2, 2019

Consider the following scenario: the publisher confirm mode is enabled. It means the broker responds with either ack or nack for every message published.

Now, the client sends two messages which get delivery tags 1 and 2.

The broker then ACKs "2", and then NACKs "1" with multiple bit set (which is a bit counter-intuitive, but it actually happens with RabbitMQ).

def _recv_ack(self, method_frame):
'''Receive an ack from the broker.'''
if self._ack_listener:
delivery_tag = method_frame.args.read_longlong()
multiple = method_frame.args.read_bit()
if multiple:
while self._last_ack_id < delivery_tag:
self._last_ack_id += 1
self._ack_listener(self._last_ack_id)
else:
self._last_ack_id = delivery_tag
self._ack_listener(self._last_ack_id)

The code will update _last_ack_id to 2 first, and when NACK arrives, this loop - while self._last_ack_id < delivery_tag - doesn't execute, and NACK will be lost.

The following program reproduces the issue: not a single NACK is printed, even though NACKs are clearly visible in wireshark.

#!/usr/bin/env python2

from __future__ import print_function

import sys

import gevent

import haigha
from haigha.connections.rabbit_connection import RabbitConnection

def main():
    conn = RabbitConnection(
        synchronous=True,
        transport='gevent',
    )

    chan = conn.channel()

    def handle_ack(delivery_tag):
        print("ACK", repr(delivery_tag))

    def handle_nack(delivery_tag, requeue):
        print("NACK", repr(delivery_tag), requeue)

    chan.confirm.select(nowait=False)

    chan.basic.set_nack_listener(handle_nack)
    chan.basic.set_ack_listener(handle_ack)

    # create queue with low limits
    queue_name, _, _ = chan.queue.declare(
        "",
        durable=False,
        nowait=False,
        auto_delete=True,
        exclusive=True,
        arguments={
            "x-max-length": 1,
            "x-overflow": "reject-publish",
        },
    )
    print("Declared queue", queue_name, file=sys.stderr)

    chan.queue.bind(queue_name, 'amq.topic', "key1", nowait=False)

    def reader():
        while not chan.closed:
            conn.read_frames()
    g = gevent.spawn(reader)

    for i in range(10):
        print(i)
        chan.basic.publish(
            haigha.message.Message(""),
            exchange='amq.topic',
            routing_key='key1'
        )
        chan.basic.publish(
            haigha.message.Message(""),
            exchange='amq.topic',
            routing_key='key2'
        )
        gevent.sleep(0.1)

    gevent.sleep(1)

if __name__ == "__main__":
    main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant