From 20ddea4c4ed43ef9009b1577c5b21625289454c4 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 30 Sep 2024 13:20:00 -0700 Subject: [PATCH] PR feedback 2 --- .../Core/ClientBuilderExtensions.cs | 1 - .../Core/DefaultClientServices.cs | 1 + .../ClusterConnectionStatusObserverAdaptor.cs | 59 +++++----- .../Runtime/OutsideRuntimeClient.cs | 21 ---- .../ClientConnectionEventTests.cs | 107 ++++++++---------- 5 files changed, 77 insertions(+), 112 deletions(-) diff --git a/src/Orleans.Core/Core/ClientBuilderExtensions.cs b/src/Orleans.Core/Core/ClientBuilderExtensions.cs index c05579128b..82d87a1237 100644 --- a/src/Orleans.Core/Core/ClientBuilderExtensions.cs +++ b/src/Orleans.Core/Core/ClientBuilderExtensions.cs @@ -11,7 +11,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection.Extensions; -using Orleans.Core; namespace Orleans.Hosting { diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index 51315459af..6dd179cf51 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -75,6 +75,7 @@ public static void AddDefaultServices(IClientBuilder builder) services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.AddSingleton(); services.AddFromExisting(); services.TryAddFromExisting(); services.TryAddFromExisting(); diff --git a/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs b/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs index 1a1d8a0ad1..fd8e0932d2 100644 --- a/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs +++ b/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs @@ -1,46 +1,45 @@ -using System; +using System; using System.Collections.Generic; using System.Collections.Immutable; using Microsoft.Extensions.Logging; -namespace Orleans +namespace Orleans.Runtime; + +internal sealed class ClusterConnectionStatusObserverAdaptor( + IEnumerable gatewayCountChangedHandlers, + IEnumerable connectionLostHandlers, + ILogger logger) : IClusterConnectionStatusObserver { - internal sealed class ClusterConnectionStatusObserverAdaptor( - IEnumerable gatewayCountChangedHandlers, - IEnumerable connectionLostHandlers, - ILogger logger) : IClusterConnectionStatusObserver - { - private readonly ImmutableArray _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray(); - private readonly ImmutableArray _connectionLostHandler = connectionLostHandlers.ToImmutableArray(); + private readonly ImmutableArray _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray(); + private readonly ImmutableArray _connectionLostHandler = connectionLostHandlers.ToImmutableArray(); - public void NotifyClusterConnectionLost() + public void NotifyClusterConnectionLost() + { + foreach (var handler in _connectionLostHandler) { - foreach (var handler in _connectionLostHandler) + try + { + handler(null, EventArgs.Empty); + } + catch (Exception ex) { - try - { - handler(null, EventArgs.Empty); - } - catch (Exception ex) - { - logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification."); - } + logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification."); } } + } - public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered) + public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered) + { + var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways); + foreach (var handler in _gatewayCountChangedHandlers) { - var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways); - foreach (var handler in _gatewayCountChangedHandlers) + try + { + handler(null, args); + } + catch (Exception ex) { - try - { - handler(null, args); - } - catch (Exception ex) - { - logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification."); - } + logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification."); } } } diff --git a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs index 91e20f330f..d9dff1864b 100644 --- a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs +++ b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs @@ -97,18 +97,6 @@ internal void ConsumeServices() { try { - var connectionLostHandlers = this.ServiceProvider.GetServices(); - foreach (var handler in connectionLostHandlers) - { - this.ClusterConnectionLost += handler; - } - - var gatewayCountChangedHandlers = this.ServiceProvider.GetServices(); - foreach (var handler in gatewayCountChangedHandlers) - { - this.GatewayCountChanged += handler; - } - _statusObservers = this.ServiceProvider.GetServices().ToArray(); this.InternalGrainFactory = this.ServiceProvider.GetRequiredService(); @@ -405,9 +393,6 @@ public void Dispose() Utils.SafeExecute(() => MessageCenter?.Dispose()); - this.ClusterConnectionLost = null; - this.GatewayCountChanged = null; - GC.SuppressFinalize(this); disposed = true; } @@ -426,12 +411,6 @@ public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo) public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType) => this.callbacks.Count(c => c.Value.Message.InterfaceType == grainInterfaceType); - /// - public event ConnectionToClusterLostHandler ClusterConnectionLost; - - /// - public event GatewayCountChangedHandler GatewayCountChanged; - /// public void NotifyClusterConnectionLost() { diff --git a/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs b/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs index 92a7dc05bf..4a4c4440eb 100644 --- a/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs +++ b/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs @@ -1,63 +1,48 @@ -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Orleans.Configuration; using Orleans.TestingHost; -using TestExtensions; using UnitTests.GrainInterfaces; using Xunit; -namespace Tester +namespace Tester; + +public class ClientConnectionEventTests { - public class ClientConnectionEventTests : TestClusterPerTest + [Fact, TestCategory("SlowBVT")] + public async Task EventSendWhenDisconnectedFromCluster() { - private OutsideRuntimeClient runtimeClient; - - protected override void ConfigureTestCluster(TestClusterBuilder builder) - { - builder.AddClientBuilderConfigurator(); - } - - public override async Task InitializeAsync() + var semaphore = new SemaphoreSlim(0, 1); + var builder = new InProcessTestClusterBuilder(); + builder.ConfigureClient(c => { - await base.InitializeAsync(); - this.runtimeClient = this.HostedCluster.Client.ServiceProvider.GetRequiredService(); - } - - public class Configurator : IClientBuilderConfigurator + c.Configure(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5)); + c.AddClusterConnectionLostHandler((sender, args) => semaphore.Release()); + }); + await using var cluster = builder.Build(); + await cluster.DeployAsync(); + + // Burst lot of call, to be sure that we are connected to all silos + for (int i = 0; i < 100; i++) { - public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) - { - clientBuilder.Configure(options => options.GatewayListRefreshPeriod = TimeSpan.FromSeconds(1)); - } + var grain = cluster.Client.GetGrain(i); + await grain.SetLabel(i.ToString()); } - [Fact, TestCategory("SlowBVT")] - public async Task EventSendWhenDisconnectedFromCluster() - { - var runtime = this.HostedCluster.ServiceProvider.GetRequiredService(); - - var semaphore = new SemaphoreSlim(0, 1); - this.runtimeClient.ClusterConnectionLost += (sender, args) => semaphore.Release(); + await cluster.StopAllSilosAsync(); - // Burst lot of call, to be sure that we are connected to all silos - for (int i = 0; i < 100; i++) - { - var grain = GrainFactory.GetGrain(i); - await grain.SetLabel(i.ToString()); - } - - await this.HostedCluster.StopAllSilosAsync(); - - Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10))); - } + Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10))); + } - [Fact, TestCategory("SlowBVT")] - public async Task GatewayChangedEventSentOnDisconnectAndReconnect() + [Fact, TestCategory("SlowBVT")] + public async Task GatewayChangedEventSentOnDisconnectAndReconnect() + { + var regainedGatewaySemaphore = new SemaphoreSlim(0, 1); + var lostGatewaySemaphore = new SemaphoreSlim(0, 1); + var builder = new InProcessTestClusterBuilder(); + builder.ConfigureClient(c => { - var regainedGatewaySemaphore = new SemaphoreSlim(0, 1); - var lostGatewaySemaphore = new SemaphoreSlim(0, 1); - - this.runtimeClient.GatewayCountChanged += (sender, args) => + c.Configure(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5)); + c.AddGatewayCountChangedHandler((sender, args) => { if (args.NumberOfConnectedGateways == 1) { @@ -67,25 +52,27 @@ public async Task GatewayChangedEventSentOnDisconnectAndReconnect() { regainedGatewaySemaphore.Release(); } - }; + }); + }); + await using var cluster = builder.Build(); + await cluster.DeployAsync(); - var silo = this.HostedCluster.SecondarySilos[0]; - await silo.StopSiloAsync(true); + var silo = cluster.Silos[0]; + await silo.StopSiloAsync(true); - Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20))); + Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20))); - await this.HostedCluster.RestartStoppedSecondarySiloAsync(silo.Name); + await cluster.RestartStoppedSecondarySiloAsync(silo.Name); - // Clients need prodding to reconnect. - var remainingAttempts = 90; - bool reconnected; - do - { - this.Client.GetGrain(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore(); - reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1)); - } while (!reconnected && --remainingAttempts > 0); + // Clients need prodding to reconnect. + var remainingAttempts = 90; + bool reconnected; + do + { + cluster.Client.GetGrain(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore(); + reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1)); + } while (!reconnected && --remainingAttempts > 0); - Assert.True(reconnected, "Failed to reconnect to restarted gateway."); - } + Assert.True(reconnected, "Failed to reconnect to restarted gateway."); } }