Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Sep 30, 2024
1 parent 43014d0 commit fe82784
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 37 deletions.
4 changes: 2 additions & 2 deletions src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder b
}

/// <summary>
/// Registers a <see cref="IClusterConnectionStatusObserver"/>.
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder, TObserver observer)
where TObserver : IClusterConnectionStatusObserver
Expand All @@ -125,7 +125,7 @@ public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this
}

/// <summary>
/// Registers a <see cref="IClusterConnectionStatusObserver"/>.
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder)
where TObserver : class, IClusterConnectionStatusObserver
Expand Down
8 changes: 4 additions & 4 deletions src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
namespace Orleans.Core;
namespace Orleans;

/// <summary>
/// Interface that receives notifications about the status of the cluster connection.
/// </summary>
public interface IClusterConnectionStatusObserver
{
/// <summary>
/// Notifies this client that the number of connected gateways has changed
/// Notifies this observer that the number of connected gateways has changed.
/// </summary>
/// <param name="currentNumberOfGateways">
/// The current number of gateways.
/// </param>
/// <param name="previousNumberOfGateways">
/// The previous number of gateways.
/// </param>
/// <param name="connectionRecovered">Helper to detect situations where cluster connectivity was regained.</param>
/// <param name="connectionRecovered">Indicates whether a loss of connectivity has been resolved.</param>
void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered);

/// <summary>
/// Notifies this client that the connection to the cluster has been lost.
/// Notifies this observer that the connection to the cluster has been lost.
/// </summary>
void NotifyClusterConnectionLost();
}
47 changes: 47 additions & 0 deletions src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using Microsoft.Extensions.Logging;

namespace Orleans
{
internal sealed class ClusterConnectionStatusObserverAdaptor(
IEnumerable<GatewayCountChangedHandler> gatewayCountChangedHandlers,
IEnumerable<ConnectionToClusterLostHandler> connectionLostHandlers,
ILogger<ClusterClient> logger) : IClusterConnectionStatusObserver
{
private readonly ImmutableArray<GatewayCountChangedHandler> _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray();
private readonly ImmutableArray<ConnectionToClusterLostHandler> _connectionLostHandler = connectionLostHandlers.ToImmutableArray();

public void NotifyClusterConnectionLost()
{
foreach (var handler in _connectionLostHandler)
{
try
{
handler(null, EventArgs.Empty);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification.");
}
}
}

public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered)
{
var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways);
foreach (var handler in _gatewayCountChangedHandlers)
{
try
{
handler(null, args);
}
catch (Exception ex)
{
logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
}
}
}
}
47 changes: 16 additions & 31 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
Expand All @@ -9,16 +10,15 @@
using Orleans.ClientObservers;
using Orleans.CodeGeneration;
using Orleans.Configuration;
using Orleans.Core;
using Orleans.Messaging;
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;
using Orleans.Serialization.Serializers;
using static Orleans.Internal.StandardExtensions;

namespace Orleans
{

internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConnectionStatusListener
{
internal static bool TestOnlyThrowExceptionDuringInit { get; set; }
Expand All @@ -33,6 +33,7 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne

private readonly MessagingTrace messagingTrace;
private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;
private IClusterConnectionStatusObserver[] _statusObservers;

public IInternalGrainFactory InternalGrainFactory { get; private set; }

Expand Down Expand Up @@ -108,6 +109,8 @@ internal void ConsumeServices()
this.GatewayCountChanged += handler;
}

_statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

this.InternalGrainFactory = this.ServiceProvider.GetRequiredService<IInternalGrainFactory>();
this.messageFactory = this.ServiceProvider.GetService<MessageFactory>();
this.localObjects = new InvokableObjectManager(
Expand Down Expand Up @@ -432,53 +435,35 @@ public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType)
/// <inheritdoc />
public void NotifyClusterConnectionLost()
{
try
foreach (var observer in _statusObservers)
{
this.ClusterConnectionLost?.Invoke(this, EventArgs.Empty);

var statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

if (statusObservers.Length <= 0)
try
{
return;
observer.NotifyClusterConnectionLost();
}

foreach (var observer in statusObservers)
catch (Exception ex)
{
observer.NotifyClusterConnectionLost();
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster disconnection notification.");
}
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending cluster disconnection notification");
}
}

/// <inheritdoc />
public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways)
{
try
foreach (var observer in _statusObservers)
{
this.GatewayCountChanged?.Invoke(this, new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways));

var statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();

if (statusObservers.Length <= 0)
{
return;
}

foreach (var observer in statusObservers)
try
{
observer.NotifyGatewayCountChanged(
currentNumberOfGateways,
previousNumberOfGateways,
currentNumberOfGateways > 0 && previousNumberOfGateways <= 0);
}
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending gateway count changed notification");
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
}
}

Expand Down

0 comments on commit fe82784

Please sign in to comment.