Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client connection status observer #9158

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,35 @@ public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder b
return builder;
}

/// <summary>
/// Registers a <see cref="GatewayCountChangedHandler"/> event handler.
/// </summary>
public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder builder, Func<IServiceProvider, GatewayCountChangedHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder, TObserver observer)
where TObserver : IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver>(observer);
return builder;
}

/// <summary>
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder)
where TObserver : class, IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver, TObserver>();
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
Expand All @@ -116,6 +145,18 @@ public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="handlerFactory">The handler factory.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder builder, Func<IServiceProvider, ConnectionToClusterLostHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Add <see cref="Activity.Current"/> propagation through grain calls.
/// Note: according to <see cref="ActivitySource.StartActivity(string, ActivityKind)"/> activity will be created only when any listener for activity exists <see cref="ActivitySource.HasListeners()"/> and <see cref="ActivityListener.Sample"/> returns <see cref="ActivitySamplingResult.PropagationData"/>.
Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static void AddDefaultServices(IClientBuilder builder)
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<ClientGrainContext>();
services.AddSingleton<IClusterConnectionStatusObserver, ClusterConnectionStatusObserverAdaptor>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
services.TryAddFromExisting<IRuntimeClient, OutsideRuntimeClient>();
services.TryAddFromExisting<IClusterConnectionStatusListener, OutsideRuntimeClient>();
Expand Down
24 changes: 24 additions & 0 deletions src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Orleans;

/// <summary>
/// Interface that receives notifications about the status of the cluster connection.
/// </summary>
public interface IClusterConnectionStatusObserver
{
/// <summary>
/// Notifies this observer that the number of connected gateways has changed.
/// </summary>
/// <param name="currentNumberOfGateways">
/// The current number of gateways.
/// </param>
/// <param name="previousNumberOfGateways">
/// The previous number of gateways.
/// </param>
/// <param name="connectionRecovered">Indicates whether a loss of connectivity has been resolved.</param>
void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered);

/// <summary>
/// Notifies this observer that the connection to the cluster has been lost.
/// </summary>
void NotifyClusterConnectionLost();
}
46 changes: 46 additions & 0 deletions src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using Microsoft.Extensions.Logging;

namespace Orleans.Runtime;

internal sealed class ClusterConnectionStatusObserverAdaptor(
IEnumerable<GatewayCountChangedHandler> gatewayCountChangedHandlers,
IEnumerable<ConnectionToClusterLostHandler> connectionLostHandlers,
ILogger<ClusterClient> logger) : IClusterConnectionStatusObserver
{
private readonly ImmutableArray<GatewayCountChangedHandler> _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray();
private readonly ImmutableArray<ConnectionToClusterLostHandler> _connectionLostHandler = connectionLostHandlers.ToImmutableArray();

public void NotifyClusterConnectionLost()
{
foreach (var handler in _connectionLostHandler)
{
try
{
handler(null, EventArgs.Empty);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification.");
}
}
}

public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered)
{
var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways);
foreach (var handler in _gatewayCountChangedHandlers)
{
try
{
handler(null, args);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
}
}
}
60 changes: 26 additions & 34 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
Expand All @@ -13,11 +14,11 @@
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;
using Orleans.Serialization.Serializers;
using static Orleans.Internal.StandardExtensions;

namespace Orleans
{

internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConnectionStatusListener
{
internal static bool TestOnlyThrowExceptionDuringInit { get; set; }
Expand All @@ -32,6 +33,7 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne

private readonly MessagingTrace messagingTrace;
private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;
private IClusterConnectionStatusObserver[] _statusObservers;

public IInternalGrainFactory InternalGrainFactory { get; private set; }

Expand Down Expand Up @@ -95,17 +97,7 @@ internal void ConsumeServices()
{
try
{
var connectionLostHandlers = this.ServiceProvider.GetServices<ConnectionToClusterLostHandler>();
foreach (var handler in connectionLostHandlers)
{
this.ClusterConnectionLost += handler;
}

var gatewayCountChangedHandlers = this.ServiceProvider.GetServices<GatewayCountChangedHandler>();
foreach (var handler in gatewayCountChangedHandlers)
{
this.GatewayCountChanged += handler;
}
_statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

this.InternalGrainFactory = this.ServiceProvider.GetRequiredService<IInternalGrainFactory>();
this.messageFactory = this.ServiceProvider.GetService<MessageFactory>();
Expand Down Expand Up @@ -272,7 +264,7 @@ public void SendRequest(GrainReference target, IInvokable request, IResponseComp
{
// don't set expiration for system target messages.
var ttl = request.GetDefaultResponseTimeout() ?? this.clientMessagingOptions.ResponseTimeout;
message.TimeToLive = ttl;
message.TimeToLive = ttl;
}

if (!oneWay)
Expand Down Expand Up @@ -401,9 +393,6 @@ public void Dispose()

Utils.SafeExecute(() => MessageCenter?.Dispose());

this.ClusterConnectionLost = null;
this.GatewayCountChanged = null;

GC.SuppressFinalize(this);
disposed = true;
}
Expand All @@ -422,35 +411,38 @@ public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo)
public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType)
=> this.callbacks.Count(c => c.Value.Message.InterfaceType == grainInterfaceType);

/// <inheritdoc />
public event ConnectionToClusterLostHandler ClusterConnectionLost;

/// <inheritdoc />
public event GatewayCountChangedHandler GatewayCountChanged;

/// <inheritdoc />
public void NotifyClusterConnectionLost()
{
try
foreach (var observer in _statusObservers)
{
this.ClusterConnectionLost?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending cluster disconnection notification");
try
{
observer.NotifyClusterConnectionLost();
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster disconnection notification.");
}
}
}

/// <inheritdoc />
public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways)
{
try
{
this.GatewayCountChanged?.Invoke(this, new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways));
}
catch (Exception ex)
foreach (var observer in _statusObservers)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending gateway count changed notification");
try
{
observer.NotifyGatewayCountChanged(
currentNumberOfGateways,
previousNumberOfGateways,
currentNumberOfGateways > 0 && previousNumberOfGateways <= 0);
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
}
}

Expand Down
107 changes: 47 additions & 60 deletions test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs
Original file line number Diff line number Diff line change
@@ -1,63 +1,48 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration;
using Orleans.TestingHost;
using TestExtensions;
using UnitTests.GrainInterfaces;
using Xunit;

namespace Tester
namespace Tester;

public class ClientConnectionEventTests
{
public class ClientConnectionEventTests : TestClusterPerTest
[Fact, TestCategory("SlowBVT")]
public async Task EventSendWhenDisconnectedFromCluster()
{
private OutsideRuntimeClient runtimeClient;

protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.AddClientBuilderConfigurator<Configurator>();
}

public override async Task InitializeAsync()
var semaphore = new SemaphoreSlim(0, 1);
var builder = new InProcessTestClusterBuilder();
builder.ConfigureClient(c =>
{
await base.InitializeAsync();
this.runtimeClient = this.HostedCluster.Client.ServiceProvider.GetRequiredService<OutsideRuntimeClient>();
}

public class Configurator : IClientBuilderConfigurator
c.Configure<GatewayOptions>(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5));
c.AddClusterConnectionLostHandler((sender, args) => semaphore.Release());
});
await using var cluster = builder.Build();
await cluster.DeployAsync();

// Burst lot of call, to be sure that we are connected to all silos
for (int i = 0; i < 100; i++)
{
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder.Configure<GatewayOptions>(options => options.GatewayListRefreshPeriod = TimeSpan.FromSeconds(1));
}
var grain = cluster.Client.GetGrain<ITestGrain>(i);
await grain.SetLabel(i.ToString());
}

[Fact, TestCategory("SlowBVT")]
public async Task EventSendWhenDisconnectedFromCluster()
{
var runtime = this.HostedCluster.ServiceProvider.GetRequiredService<OutsideRuntimeClient>();

var semaphore = new SemaphoreSlim(0, 1);
this.runtimeClient.ClusterConnectionLost += (sender, args) => semaphore.Release();
await cluster.StopAllSilosAsync();

// Burst lot of call, to be sure that we are connected to all silos
for (int i = 0; i < 100; i++)
{
var grain = GrainFactory.GetGrain<ITestGrain>(i);
await grain.SetLabel(i.ToString());
}

await this.HostedCluster.StopAllSilosAsync();

Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10)));
}
Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10)));
}

[Fact, TestCategory("SlowBVT")]
public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
[Fact, TestCategory("SlowBVT")]
public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
{
var regainedGatewaySemaphore = new SemaphoreSlim(0, 1);
var lostGatewaySemaphore = new SemaphoreSlim(0, 1);
var builder = new InProcessTestClusterBuilder();
builder.ConfigureClient(c =>
{
var regainedGatewaySemaphore = new SemaphoreSlim(0, 1);
var lostGatewaySemaphore = new SemaphoreSlim(0, 1);

this.runtimeClient.GatewayCountChanged += (sender, args) =>
c.Configure<GatewayOptions>(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5));
c.AddGatewayCountChangedHandler((sender, args) =>
{
if (args.NumberOfConnectedGateways == 1)
{
Expand All @@ -67,25 +52,27 @@ public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
{
regainedGatewaySemaphore.Release();
}
};
});
});
await using var cluster = builder.Build();
await cluster.DeployAsync();

var silo = this.HostedCluster.SecondarySilos[0];
await silo.StopSiloAsync(true);
var silo = cluster.Silos[0];
await silo.StopSiloAsync(true);

Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20)));
Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20)));

await this.HostedCluster.RestartStoppedSecondarySiloAsync(silo.Name);
await cluster.RestartStoppedSecondarySiloAsync(silo.Name);

// Clients need prodding to reconnect.
var remainingAttempts = 90;
bool reconnected;
do
{
this.Client.GetGrain<ITestGrain>(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore();
reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1));
} while (!reconnected && --remainingAttempts > 0);
// Clients need prodding to reconnect.
var remainingAttempts = 90;
bool reconnected;
do
{
cluster.Client.GetGrain<ITestGrain>(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore();
reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1));
} while (!reconnected && --remainingAttempts > 0);

Assert.True(reconnected, "Failed to reconnect to restarted gateway.");
}
Assert.True(reconnected, "Failed to reconnect to restarted gateway.");
}
}
Loading