Skip to content

Commit

Permalink
PR feedback 2
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Sep 30, 2024
1 parent fe82784 commit 20ddea4
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 112 deletions.
1 change: 0 additions & 1 deletion src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Orleans.Core;

namespace Orleans.Hosting
{
Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static void AddDefaultServices(IClientBuilder builder)
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<ClientGrainContext>();
services.AddSingleton<IClusterConnectionStatusObserver, ClusterConnectionStatusObserverAdaptor>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
services.TryAddFromExisting<IRuntimeClient, OutsideRuntimeClient>();
services.TryAddFromExisting<IClusterConnectionStatusListener, OutsideRuntimeClient>();
Expand Down
59 changes: 29 additions & 30 deletions src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs
Original file line number Diff line number Diff line change
@@ -1,46 +1,45 @@
using System;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using Microsoft.Extensions.Logging;

namespace Orleans
namespace Orleans.Runtime;

internal sealed class ClusterConnectionStatusObserverAdaptor(
IEnumerable<GatewayCountChangedHandler> gatewayCountChangedHandlers,
IEnumerable<ConnectionToClusterLostHandler> connectionLostHandlers,
ILogger<ClusterClient> logger) : IClusterConnectionStatusObserver
{
internal sealed class ClusterConnectionStatusObserverAdaptor(
IEnumerable<GatewayCountChangedHandler> gatewayCountChangedHandlers,
IEnumerable<ConnectionToClusterLostHandler> connectionLostHandlers,
ILogger<ClusterClient> logger) : IClusterConnectionStatusObserver
{
private readonly ImmutableArray<GatewayCountChangedHandler> _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray();
private readonly ImmutableArray<ConnectionToClusterLostHandler> _connectionLostHandler = connectionLostHandlers.ToImmutableArray();
private readonly ImmutableArray<GatewayCountChangedHandler> _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray();
private readonly ImmutableArray<ConnectionToClusterLostHandler> _connectionLostHandler = connectionLostHandlers.ToImmutableArray();

public void NotifyClusterConnectionLost()
public void NotifyClusterConnectionLost()
{
foreach (var handler in _connectionLostHandler)
{
foreach (var handler in _connectionLostHandler)
try
{
handler(null, EventArgs.Empty);
}
catch (Exception ex)
{
try
{
handler(null, EventArgs.Empty);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification.");
}
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification.");
}
}
}

public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered)
public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered)
{
var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways);
foreach (var handler in _gatewayCountChangedHandlers)
{
var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways);
foreach (var handler in _gatewayCountChangedHandlers)
try
{
handler(null, args);
}
catch (Exception ex)
{
try
{
handler(null, args);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
}
}
Expand Down
21 changes: 0 additions & 21 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,6 @@ internal void ConsumeServices()
{
try
{
var connectionLostHandlers = this.ServiceProvider.GetServices<ConnectionToClusterLostHandler>();
foreach (var handler in connectionLostHandlers)
{
this.ClusterConnectionLost += handler;
}

var gatewayCountChangedHandlers = this.ServiceProvider.GetServices<GatewayCountChangedHandler>();
foreach (var handler in gatewayCountChangedHandlers)
{
this.GatewayCountChanged += handler;
}

_statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

this.InternalGrainFactory = this.ServiceProvider.GetRequiredService<IInternalGrainFactory>();
Expand Down Expand Up @@ -405,9 +393,6 @@ public void Dispose()

Utils.SafeExecute(() => MessageCenter?.Dispose());

this.ClusterConnectionLost = null;
this.GatewayCountChanged = null;

GC.SuppressFinalize(this);
disposed = true;
}
Expand All @@ -426,12 +411,6 @@ public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo)
public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType)
=> this.callbacks.Count(c => c.Value.Message.InterfaceType == grainInterfaceType);

/// <inheritdoc />
public event ConnectionToClusterLostHandler ClusterConnectionLost;

/// <inheritdoc />
public event GatewayCountChangedHandler GatewayCountChanged;

/// <inheritdoc />
public void NotifyClusterConnectionLost()
{
Expand Down
107 changes: 47 additions & 60 deletions test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs
Original file line number Diff line number Diff line change
@@ -1,63 +1,48 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration;
using Orleans.TestingHost;
using TestExtensions;
using UnitTests.GrainInterfaces;
using Xunit;

namespace Tester
namespace Tester;

public class ClientConnectionEventTests
{
public class ClientConnectionEventTests : TestClusterPerTest
[Fact, TestCategory("SlowBVT")]
public async Task EventSendWhenDisconnectedFromCluster()
{
private OutsideRuntimeClient runtimeClient;

protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.AddClientBuilderConfigurator<Configurator>();
}

public override async Task InitializeAsync()
var semaphore = new SemaphoreSlim(0, 1);
var builder = new InProcessTestClusterBuilder();
builder.ConfigureClient(c =>
{
await base.InitializeAsync();
this.runtimeClient = this.HostedCluster.Client.ServiceProvider.GetRequiredService<OutsideRuntimeClient>();
}

public class Configurator : IClientBuilderConfigurator
c.Configure<GatewayOptions>(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5));
c.AddClusterConnectionLostHandler((sender, args) => semaphore.Release());
});
await using var cluster = builder.Build();
await cluster.DeployAsync();

// Burst lot of call, to be sure that we are connected to all silos
for (int i = 0; i < 100; i++)
{
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder.Configure<GatewayOptions>(options => options.GatewayListRefreshPeriod = TimeSpan.FromSeconds(1));
}
var grain = cluster.Client.GetGrain<ITestGrain>(i);
await grain.SetLabel(i.ToString());
}

[Fact, TestCategory("SlowBVT")]
public async Task EventSendWhenDisconnectedFromCluster()
{
var runtime = this.HostedCluster.ServiceProvider.GetRequiredService<OutsideRuntimeClient>();

var semaphore = new SemaphoreSlim(0, 1);
this.runtimeClient.ClusterConnectionLost += (sender, args) => semaphore.Release();
await cluster.StopAllSilosAsync();

// Burst lot of call, to be sure that we are connected to all silos
for (int i = 0; i < 100; i++)
{
var grain = GrainFactory.GetGrain<ITestGrain>(i);
await grain.SetLabel(i.ToString());
}

await this.HostedCluster.StopAllSilosAsync();

Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10)));
}
Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10)));
}

[Fact, TestCategory("SlowBVT")]
public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
[Fact, TestCategory("SlowBVT")]
public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
{
var regainedGatewaySemaphore = new SemaphoreSlim(0, 1);
var lostGatewaySemaphore = new SemaphoreSlim(0, 1);
var builder = new InProcessTestClusterBuilder();
builder.ConfigureClient(c =>
{
var regainedGatewaySemaphore = new SemaphoreSlim(0, 1);
var lostGatewaySemaphore = new SemaphoreSlim(0, 1);

this.runtimeClient.GatewayCountChanged += (sender, args) =>
c.Configure<GatewayOptions>(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5));
c.AddGatewayCountChangedHandler((sender, args) =>
{
if (args.NumberOfConnectedGateways == 1)
{
Expand All @@ -67,25 +52,27 @@ public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
{
regainedGatewaySemaphore.Release();
}
};
});
});
await using var cluster = builder.Build();
await cluster.DeployAsync();

var silo = this.HostedCluster.SecondarySilos[0];
await silo.StopSiloAsync(true);
var silo = cluster.Silos[0];
await silo.StopSiloAsync(true);

Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20)));
Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20)));

await this.HostedCluster.RestartStoppedSecondarySiloAsync(silo.Name);
await cluster.RestartStoppedSecondarySiloAsync(silo.Name);

// Clients need prodding to reconnect.
var remainingAttempts = 90;
bool reconnected;
do
{
this.Client.GetGrain<ITestGrain>(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore();
reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1));
} while (!reconnected && --remainingAttempts > 0);
// Clients need prodding to reconnect.
var remainingAttempts = 90;
bool reconnected;
do
{
cluster.Client.GetGrain<ITestGrain>(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore();
reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1));
} while (!reconnected && --remainingAttempts > 0);

Assert.True(reconnected, "Failed to reconnect to restarted gateway.");
}
Assert.True(reconnected, "Failed to reconnect to restarted gateway.");
}
}

0 comments on commit 20ddea4

Please sign in to comment.