Skip to content

Commit

Permalink
Revert "Add IClusterConnectionStatusObserver support" until InProcess…
Browse files Browse the repository at this point in the history
…TestCluster is merged (#9157)

Revert "Add IClusterConnectionStatusObserver support (#9145)"

This reverts commit 546b739.
  • Loading branch information
ReubenBond authored Oct 1, 2024
1 parent 546b739 commit 9907c0b
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 1,604 deletions.
41 changes: 0 additions & 41 deletions src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,35 +104,6 @@ public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder b
return builder;
}

/// <summary>
/// Registers a <see cref="GatewayCountChangedHandler"/> event handler.
/// </summary>
public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder builder, Func<IServiceProvider, GatewayCountChangedHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder, TObserver observer)
where TObserver : IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver>(observer);
return builder;
}

/// <summary>
/// Registers a cluster connection status observer.
/// </summary>
public static IClientBuilder AddClusterConnectionStatusObserver<TObserver>(this IClientBuilder builder)
where TObserver : class, IClusterConnectionStatusObserver
{
builder.Services.AddSingleton<IClusterConnectionStatusObserver, TObserver>();
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
Expand All @@ -145,18 +116,6 @@ public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder
return builder;
}

/// <summary>
/// Registers a <see cref="ConnectionToClusterLostHandler"/> event handler.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="handlerFactory">The handler factory.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder builder, Func<IServiceProvider, ConnectionToClusterLostHandler> handlerFactory)
{
builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
return builder;
}

/// <summary>
/// Add <see cref="Activity.Current"/> propagation through grain calls.
/// Note: according to <see cref="ActivitySource.StartActivity(string, ActivityKind)"/> activity will be created only when any listener for activity exists <see cref="ActivitySource.HasListeners()"/> and <see cref="ActivityListener.Sample"/> returns <see cref="ActivitySamplingResult.PropagationData"/>.
Expand Down
1 change: 0 additions & 1 deletion src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public static void AddDefaultServices(IClientBuilder builder)
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<ClientGrainContext>();
services.AddSingleton<IClusterConnectionStatusObserver, ClusterConnectionStatusObserverAdaptor>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
services.TryAddFromExisting<IRuntimeClient, OutsideRuntimeClient>();
services.TryAddFromExisting<IClusterConnectionStatusListener, OutsideRuntimeClient>();
Expand Down
24 changes: 0 additions & 24 deletions src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs

This file was deleted.

46 changes: 0 additions & 46 deletions src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs

This file was deleted.

60 changes: 34 additions & 26 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
Expand All @@ -14,11 +13,11 @@
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;
using Orleans.Serialization.Serializers;
using static Orleans.Internal.StandardExtensions;

namespace Orleans
{

internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConnectionStatusListener
{
internal static bool TestOnlyThrowExceptionDuringInit { get; set; }
Expand All @@ -33,7 +32,6 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne

private readonly MessagingTrace messagingTrace;
private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;
private IClusterConnectionStatusObserver[] _statusObservers;

public IInternalGrainFactory InternalGrainFactory { get; private set; }

Expand Down Expand Up @@ -97,7 +95,17 @@ internal void ConsumeServices()
{
try
{
_statusObservers = this.ServiceProvider.GetServices<IClusterConnectionStatusObserver>().ToArray();
var connectionLostHandlers = this.ServiceProvider.GetServices<ConnectionToClusterLostHandler>();
foreach (var handler in connectionLostHandlers)
{
this.ClusterConnectionLost += handler;
}

var gatewayCountChangedHandlers = this.ServiceProvider.GetServices<GatewayCountChangedHandler>();
foreach (var handler in gatewayCountChangedHandlers)
{
this.GatewayCountChanged += handler;
}

this.InternalGrainFactory = this.ServiceProvider.GetRequiredService<IInternalGrainFactory>();
this.messageFactory = this.ServiceProvider.GetService<MessageFactory>();
Expand Down Expand Up @@ -264,7 +272,7 @@ public void SendRequest(GrainReference target, IInvokable request, IResponseComp
{
// don't set expiration for system target messages.
var ttl = request.GetDefaultResponseTimeout() ?? this.clientMessagingOptions.ResponseTimeout;
message.TimeToLive = ttl;
message.TimeToLive = ttl;
}

if (!oneWay)
Expand Down Expand Up @@ -393,6 +401,9 @@ public void Dispose()

Utils.SafeExecute(() => MessageCenter?.Dispose());

this.ClusterConnectionLost = null;
this.GatewayCountChanged = null;

GC.SuppressFinalize(this);
disposed = true;
}
Expand All @@ -411,38 +422,35 @@ public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo)
public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType)
=> this.callbacks.Count(c => c.Value.Message.InterfaceType == grainInterfaceType);

/// <inheritdoc />
public event ConnectionToClusterLostHandler ClusterConnectionLost;

/// <inheritdoc />
public event GatewayCountChangedHandler GatewayCountChanged;

/// <inheritdoc />
public void NotifyClusterConnectionLost()
{
foreach (var observer in _statusObservers)
try
{
try
{
observer.NotifyClusterConnectionLost();
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster disconnection notification.");
}
this.ClusterConnectionLost?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending cluster disconnection notification");
}
}

/// <inheritdoc />
public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways)
{
foreach (var observer in _statusObservers)
try
{
try
{
observer.NotifyGatewayCountChanged(
currentNumberOfGateways,
previousNumberOfGateways,
currentNumberOfGateways > 0 && previousNumberOfGateways <= 0);
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
}
this.GatewayCountChanged?.Invoke(this, new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways));
}
catch (Exception ex)
{
this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending gateway count changed notification");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,4 @@ public override async ValueTask DisposeAsync()

_connectionClosedTokenSource.Dispose();
}

public override string ToString() => $"InMem({LocalEndPoint}<->{RemoteEndPoint})";
}
Loading

0 comments on commit 9907c0b

Please sign in to comment.