diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 4307f8500..6d1baca81 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -108,6 +110,10 @@ Global {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU + {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -123,6 +129,7 @@ Global {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} {13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69} + {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/Applications/GH-1749/GH-1749.csproj b/projects/Applications/GH-1749/GH-1749.csproj new file mode 100644 index 000000000..9e44f6447 --- /dev/null +++ b/projects/Applications/GH-1749/GH-1749.csproj @@ -0,0 +1,19 @@ + + + + Exe + net8.0 + GH_1749 + enable + enable + + + + + + + + + + + diff --git a/projects/Applications/GH-1749/Program.cs b/projects/Applications/GH-1749/Program.cs new file mode 100644 index 000000000..d3bf5a327 --- /dev/null +++ b/projects/Applications/GH-1749/Program.cs @@ -0,0 +1,217 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + +using System.Runtime.ExceptionServices; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace GH_1749 +{ + class GH1749Consumer : AsyncDefaultBasicConsumer + { + public GH1749Consumer(IChannel channel) : base(channel) + { + } + + protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) + { + Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", Util.Now, consumerTags[0]); + return base.OnCancelAsync(consumerTags, cancellationToken); + } + } + + static class Program + { + const string DefaultHostName = "localhost"; + const string ConnectionClientProvidedName = "GH_1749"; + static readonly CancellationTokenSource s_cancellationTokenSource = new(); + static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token; + + static Util? s_util; + + static async Task Main(string[] args) + { + string hostname = DefaultHostName; + if (args.Length > 0) + { + hostname = args[0]; + } + + s_util = new Util(hostname, "guest", "guest"); + + AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException; + + ConnectionFactory connectionFactory = new() + { + HostName = hostname, + AutomaticRecoveryEnabled = true, + UserName = "guest", + Password = "guest", + ClientProvidedName = ConnectionClientProvidedName + }; + + var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true); + await using var connection = await connectionFactory.CreateConnectionAsync(); + + connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea); + _ = CloseConnectionAsync(); + return Task.CompletedTask; + }; + + connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync; + + connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync; + connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync; + + connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync; + + connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + }; + + connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync; + connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync; + + connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync; + + await using var channel = await connection.CreateChannelAsync(options: channelOptions); + + channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync; + channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; + + QueueDeclareOk queue = await channel.QueueDeclareAsync(); + + var consumer = new GH1749Consumer(channel); + await channel.BasicConsumeAsync(queue.QueueName, true, consumer); + + _ = CloseConnectionAsync(); + + Console.WriteLine("{0} [INFO] consumer is running", Util.Now); + Console.ReadLine(); + } + + static async Task CloseConnectionAsync() + { + if (s_util is null) + { + throw new NullReferenceException("s_util"); + } + + try + { + Console.WriteLine("{0} [INFO] start closing connection: {1}", Now, ConnectionClientProvidedName); + await s_util.CloseConnectionAsync(ConnectionClientProvidedName); + Console.WriteLine("{0} [INFO] done closing connection: {1}", Now, ConnectionClientProvidedName); + } + catch (Exception ex) + { + Console.Error.WriteLine("{0} [ERROR] error while closing connection: {1}", Now, ex); + } + } + + private static string Now => Util.Now; + + private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e) + { + // Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception); + if (e.Exception is ObjectDisposedException) + { + Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception); + } + } + + private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter); + return Task.CompletedTask; + } + + private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter); + return Task.CompletedTask; + } + + private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag); + return Task.CompletedTask; + } + } +} + diff --git a/projects/Applications/GH-1749/Util.cs b/projects/Applications/GH-1749/Util.cs new file mode 100644 index 000000000..1cc2debda --- /dev/null +++ b/projects/Applications/GH-1749/Util.cs @@ -0,0 +1,140 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + +using System.Globalization; +using EasyNetQ.Management.Client; + +namespace GH_1749 +{ + public class Util : IDisposable + { + private static readonly Random s_random = Random.Shared; + private readonly ManagementClient _managementClient; + + public Util() : this("localhost", "guest", "guest") + { + } + + public Util(string hostname, string managementUsername, string managementPassword) + { + if (string.IsNullOrEmpty(managementUsername)) + { + managementUsername = "guest"; + } + + if (string.IsNullOrEmpty(managementPassword)) + { + throw new ArgumentNullException(nameof(managementPassword)); + } + + var managementUri = new Uri($"http://{hostname}:15672"); + _managementClient = new ManagementClient(managementUri, managementUsername, managementPassword); + } + + public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture); + + public static Random S_Random + { + get + { + return s_random; + } + } + + public async Task CloseConnectionAsync(string connectionClientProvidedName) + { + ushort tries = 1; + EasyNetQ.Management.Client.Model.Connection? connectionToClose = null; + do + { + IReadOnlyList connections; + try + { + do + { + ushort delayMilliseconds = (ushort)(tries * 2 * 100); + if (delayMilliseconds > 1000) + { + delayMilliseconds = 1000; + } + + await Task.Delay(TimeSpan.FromMilliseconds(delayMilliseconds)); + + connections = await _managementClient.GetConnectionsAsync(); + } while (connections.Count == 0); + + connectionToClose = connections.Where(c0 => + { + if (c0.ClientProperties.ContainsKey("connection_name")) + { + object? maybeConnectionName = c0.ClientProperties["connection_name"]; + if (maybeConnectionName is string connectionNameStr) + { + return string.Equals(connectionNameStr, connectionClientProvidedName, StringComparison.InvariantCultureIgnoreCase); + } + } + + return false; + }).FirstOrDefault(); + } + catch (ArgumentNullException) + { + // Sometimes we see this in GitHub CI + tries++; + continue; + } + + if (connectionToClose != null) + { + try + { + await _managementClient.CloseConnectionAsync(connectionToClose); + return; + } + catch (UnexpectedHttpStatusCodeException) + { + tries++; + } + } + } while (tries <= 10); + + if (connectionToClose == null) + { + throw new InvalidOperationException( + $"{Now} [ERROR] could not find/delete connection: '{connectionClientProvidedName}'"); + } + } + + public void Dispose() => _managementClient.Dispose(); + } +}