Skip to content

SubscribeAsync / UnsubscribeAsync does not check PacketIdentifier of response, just uses first received package #222

Closed
@JaggerJo

Description

@JaggerJo

🐛 Bug Report

Image

It seems like there is no check in place to ensure the SubAckPacket is the response to the SubscribePacket that is sent. From what I see it could be just any SubAckPacket .

public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
{
// Fire the corresponding event
this.BeforeSubscribeEventLauncher(options);
// FIXME: We should only ever have one subscribe in flight at any time (for now)
// Construct the MQTT Subscribe packet
var packetIdentifier = await this.Connection.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false);
var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier);
// Setup the task completion source to wait for the SUBACK
var taskCompletionSource = new TaskCompletionSource<SubAckPacket>();
void TaskHandler(object? sender, OnSubAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.SubAckPacket);
EventHandler<OnSubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnSubAckReceived += eventHandler;
// Queue the constructed packet to be sent on the wire
this.Connection.SendQueue.Enqueue(subscribePacket);
SubAckPacket subAck;
SubscribeResult subscribeResult;
try
{
subAck = await taskCompletionSource.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.ConfigureAwait(false);
}
catch (TimeoutException)
{
Logger.Error("Subscribe timeout. No SUBACK response received in time.");
throw;
}
finally
{
// Remove the event handler
this.OnSubAckReceived -= eventHandler;
}
subscribeResult = new SubscribeResult(options, subAck);

So sometimes the wrong SubAckPacket ends up in here paired with the wrong SubscribeOptions. This leads to an ArgumentOutOfRangeException here:

internal SubscribeResult(SubscribeOptions options, SubAckPacket subAckPacket)
{
this.Options = options;
this.Properties = subAckPacket.Properties;
this.Subscriptions = new List<Subscription>();
for (var i = 0; i < subAckPacket.ReasonCodes.Count; i++)
{
var reasonCode = subAckPacket.ReasonCodes[i];
var topicFilter = options.TopicFilters[i];
var subscription = new Subscription(topicFilter)
{
SubscribeReasonCode = reasonCode,
};
this.Subscriptions.Add(subscription);
}
}

🔬 How To Reproduce

Steps to reproduce the behavior:

Call Subscribe a lot of times in a concurrent manner with a different number of topics.

class Program
{
    static async Task Main(string[] args)
    {
        var client =
            new HiveMQClient(
                new HiveMQClientOptionsBuilder()
                    .WithClientId("ConcurrentSubscribe")
                    .WithAutomaticReconnect(true)
                    .Build()
            );

        var connectResult = await client.ConnectAsync().ConfigureAwait(false);

        Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
        Assert.True(client.IsConnected());

        foreach (var worker in Enumerable.Range(0, 50))
        {
            Task.Run(async () =>
            {
                var call = 0;

                while (true)
                {
                    var topicPrefix = $"/test/topic/{worker}/{call}";

                    var allTopics = new[]
                    {
                        $"{topicPrefix}/a",
                        $"{topicPrefix}/b",
                        $"{topicPrefix}/c",
                        $"{topicPrefix}/d",
                        $"{topicPrefix}/e",
                        $"{topicPrefix}/f",
                        $"{topicPrefix}/g"
                    };

                    var randomTopics =
                        allTopics
                            .OrderBy(x => Guid.NewGuid())
                            .Take(Random.Shared.Next(1, allTopics.Length))
                            .ToArray();

                    try
                    {
                        var subscribeResult =
                            await client
                                .SubscribeAsync(new SubscribeOptions()
                                {
                                    TopicFilters = randomTopics.Select(x => new TopicFilter(x)).ToList()
                                })
                                .ConfigureAwait(false);

                        var unsubscribeResult =
                            await client
                                .UnsubscribeAsync(new UnsubscribeOptions()
                                {
                                    Subscriptions = subscribeResult.Subscriptions
                                })
                                .ConfigureAwait(false);
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }

                    call += 1;
                }
            });
        }

        await Task.Delay(-1).ConfigureAwait(false);
    }
}

Code sample

Environment

Where are you running/using this client?

Windows, Linux, MacOS.

Reproduces with HiveMQtt 0.24.0 and current master.

Screenshots

📈 Expected behavior

📎 Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions