Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SubscribeAsync / UnsubscribeAsync does not check PacketIdentifier of response, just uses first received package #222

Closed
JaggerJo opened this issue Jan 17, 2025 · 1 comment
Labels
bug Something isn't working

Comments

@JaggerJo
Copy link
Contributor

JaggerJo commented Jan 17, 2025

🐛 Bug Report

Image

It seems like there is no check in place to ensure the SubAckPacket is the response to the SubscribePacket that is sent. From what I see it could be just any SubAckPacket .

public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
{
// Fire the corresponding event
this.BeforeSubscribeEventLauncher(options);
// FIXME: We should only ever have one subscribe in flight at any time (for now)
// Construct the MQTT Subscribe packet
var packetIdentifier = await this.Connection.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false);
var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier);
// Setup the task completion source to wait for the SUBACK
var taskCompletionSource = new TaskCompletionSource<SubAckPacket>();
void TaskHandler(object? sender, OnSubAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.SubAckPacket);
EventHandler<OnSubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnSubAckReceived += eventHandler;
// Queue the constructed packet to be sent on the wire
this.Connection.SendQueue.Enqueue(subscribePacket);
SubAckPacket subAck;
SubscribeResult subscribeResult;
try
{
subAck = await taskCompletionSource.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.ConfigureAwait(false);
}
catch (TimeoutException)
{
Logger.Error("Subscribe timeout. No SUBACK response received in time.");
throw;
}
finally
{
// Remove the event handler
this.OnSubAckReceived -= eventHandler;
}
subscribeResult = new SubscribeResult(options, subAck);

So sometimes the wrong SubAckPacket ends up in here paired with the wrong SubscribeOptions. This leads to an ArgumentOutOfRangeException here:

internal SubscribeResult(SubscribeOptions options, SubAckPacket subAckPacket)
{
this.Options = options;
this.Properties = subAckPacket.Properties;
this.Subscriptions = new List<Subscription>();
for (var i = 0; i < subAckPacket.ReasonCodes.Count; i++)
{
var reasonCode = subAckPacket.ReasonCodes[i];
var topicFilter = options.TopicFilters[i];
var subscription = new Subscription(topicFilter)
{
SubscribeReasonCode = reasonCode,
};
this.Subscriptions.Add(subscription);
}
}

🔬 How To Reproduce

Steps to reproduce the behavior:

Call Subscribe a lot of times in a concurrent manner with a different number of topics.

class Program
{
    static async Task Main(string[] args)
    {
        var client =
            new HiveMQClient(
                new HiveMQClientOptionsBuilder()
                    .WithClientId("ConcurrentSubscribe")
                    .WithAutomaticReconnect(true)
                    .Build()
            );

        var connectResult = await client.ConnectAsync().ConfigureAwait(false);

        Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
        Assert.True(client.IsConnected());

        foreach (var worker in Enumerable.Range(0, 50))
        {
            Task.Run(async () =>
            {
                var call = 0;

                while (true)
                {
                    var topicPrefix = $"/test/topic/{worker}/{call}";

                    var allTopics = new[]
                    {
                        $"{topicPrefix}/a",
                        $"{topicPrefix}/b",
                        $"{topicPrefix}/c",
                        $"{topicPrefix}/d",
                        $"{topicPrefix}/e",
                        $"{topicPrefix}/f",
                        $"{topicPrefix}/g"
                    };

                    var randomTopics =
                        allTopics
                            .OrderBy(x => Guid.NewGuid())
                            .Take(Random.Shared.Next(1, allTopics.Length))
                            .ToArray();

                    try
                    {
                        var subscribeResult =
                            await client
                                .SubscribeAsync(new SubscribeOptions()
                                {
                                    TopicFilters = randomTopics.Select(x => new TopicFilter(x)).ToList()
                                })
                                .ConfigureAwait(false);

                        var unsubscribeResult =
                            await client
                                .UnsubscribeAsync(new UnsubscribeOptions()
                                {
                                    Subscriptions = subscribeResult.Subscriptions
                                })
                                .ConfigureAwait(false);
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }

                    call += 1;
                }
            });
        }

        await Task.Delay(-1).ConfigureAwait(false);
    }
}

Code sample

Environment

Where are you running/using this client?

Windows, Linux, MacOS.

Reproduces with HiveMQtt 0.24.0 and current master.

Screenshots

📈 Expected behavior

📎 Additional context

@JaggerJo JaggerJo added the bug Something isn't working label Jan 17, 2025
@JaggerJo JaggerJo changed the title SubscribeResult Constructor crashes if broker does not send subAckPacket.ReasonCodes for all topics SubscribeAsync / UnsubscribeAsync does not check PacketIdentifier of response, just uses first received package Jan 17, 2025
@pglombardo
Copy link
Collaborator

Your PR #223 has been released in 0.24.2. Thanks for pointing this out (and fixing) @JaggerJo!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants