Skip to content

Commit

Permalink
Add IClusterConnectionStatusObserver support
Browse files Browse the repository at this point in the history
  • Loading branch information
galvesribeiro authored and ReubenBond committed Sep 30, 2024
1 parent 9f51b16 commit 43014d0
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 1 deletion.
42 changes: 42 additions & 0 deletions src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Orleans.Core;

namespace Orleans.Hosting
{
Expand Down Expand Up @@ -104,6 +105,35 @@ public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder b
return builder;
}

/// <summary>
/// Registers a <see cref="GatewayCountChangedHandler"/> event handler.
/// </summary>
public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder builder, Func<IServiceProvider, GatewayCountChangedHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Registers a <see cref="IClusterConnectionStatusObserver"/>.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder, TObserver observer)
where TObserver : IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver>(observer);
return builder;
}

/// <summary>
/// Registers a <see cref="IClusterConnectionStatusObserver"/>.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder)
where TObserver : class, IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver, TObserver>();
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
Expand All @@ -116,6 +146,18 @@ public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="handlerFactory">The handler factory.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder builder, Func<IServiceProvider, ConnectionToClusterLostHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Add <see cref="Activity.Current"/> propagation through grain calls.
/// Note: according to <see cref="ActivitySource.StartActivity(string, ActivityKind)"/> activity will be created only when any listener for activity exists <see cref="ActivitySource.HasListeners()"/> and <see cref="ActivityListener.Sample"/> returns <see cref="ActivitySamplingResult.PropagationData"/>.
Expand Down
24 changes: 24 additions & 0 deletions src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Orleans.Core;

/// <summary>
/// Interface that receives notifications about the status of the cluster connection.
/// </summary>
public interface IClusterConnectionStatusObserver
{
/// <summary>
/// Notifies this client that the number of connected gateways has changed
/// </summary>
/// <param name="currentNumberOfGateways">
/// The current number of gateways.
/// </param>
/// <param name="previousNumberOfGateways">
/// The previous number of gateways.
/// </param>
/// <param name="connectionRecovered">Helper to detect situations where cluster connectivity was regained.</param>
void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered);

/// <summary>
/// Notifies this client that the connection to the cluster has been lost.
/// </summary>
void NotifyClusterConnectionLost();
}
30 changes: 29 additions & 1 deletion src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Orleans.ClientObservers;
using Orleans.CodeGeneration;
using Orleans.Configuration;
using Orleans.Core;
using Orleans.Messaging;
using Orleans.Runtime;
using Orleans.Serialization;
Expand Down Expand Up @@ -272,7 +273,7 @@ public void SendRequest(GrainReference target, IInvokable request, IResponseComp
{
// don't set expiration for system target messages.
var ttl = request.GetDefaultResponseTimeout() ?? this.clientMessagingOptions.ResponseTimeout;
message.TimeToLive = ttl;
message.TimeToLive = ttl;
}

if (!oneWay)
Expand Down Expand Up @@ -434,6 +435,18 @@ public void NotifyClusterConnectionLost()
try
{
this.ClusterConnectionLost?.Invoke(this, EventArgs.Empty);

var statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

if (statusObservers.Length <= 0)
{
return;
}

foreach (var observer in statusObservers)
{
observer.NotifyClusterConnectionLost();
}
}
catch (Exception ex)
{
Expand All @@ -447,6 +460,21 @@ public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousN
try
{
this.GatewayCountChanged?.Invoke(this, new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways));

var statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

if (statusObservers.Length <= 0)
{
return;
}

foreach (var observer in statusObservers)
{
observer.NotifyGatewayCountChanged(
currentNumberOfGateways,
previousNumberOfGateways,
currentNumberOfGateways > 0 && previousNumberOfGateways <= 0);
}
}
catch (Exception ex)
{
Expand Down

0 comments on commit 43014d0

Please sign in to comment.