From 054910d651464b69a9c958a9d5395e13d30b6a54 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 8 Oct 2024 12:33:20 -0700 Subject: [PATCH] ChaoticCluster playground + instrumentation --- Directory.Packages.props | 7 + Orleans.sln | 7 + .../ChaoticCluster.ServiceDefaults.csproj | 22 +++ .../Extensions.cs | 111 +++++++++++++ .../ChaoticCluster.Silo.csproj | 5 +- .../ChaoticCluster.Silo/Program.cs | 153 +++++++++++++++++- .../SiloBuilderConfigurator.cs | 26 +++ .../Diagnostics/MessagingTrace.cs | 53 ------ .../Metrics/DirectoryInstruments.cs | 4 +- .../Diagnostics/Metrics/InstrumentNames.cs | 2 + src/Orleans.Runtime/Catalog/ActivationData.cs | 3 +- src/Orleans.Runtime/Catalog/Catalog.cs | 59 ++++--- .../GrainDirectory/GrainDirectoryReplica.cs | 2 + .../GrainDirectory/GrainDirectoryResolver.cs | 2 +- .../LocalSiloHealthMonitor.cs | 1 + .../GrainDirectoryResilienceTests.cs | 14 +- 16 files changed, 368 insertions(+), 103 deletions(-) create mode 100644 playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/ChaoticCluster.ServiceDefaults.csproj create mode 100644 playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/Extensions.cs create mode 100644 playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 22b1d4b5bf..a084e077cc 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -55,6 +55,13 @@ + + + + + + + diff --git a/Orleans.sln b/Orleans.sln index 1c8f2559e9..d5e8786f4a 100644 --- a/Orleans.sln +++ b/Orleans.sln @@ -248,6 +248,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChaoticCluster.AppHost", "p EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChaoticCluster.Silo", "playground\ChaoticCluster\ChaoticCluster.Silo\ChaoticCluster.Silo.csproj", "{76A549FA-69F1-4967-82B6-161A8B52C86B}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChaoticCluster.ServiceDefaults", "playground\ChaoticCluster\ChaoticCluster.ServiceDefaults\ChaoticCluster.ServiceDefaults.csproj", "{4004A79F-B6BB-4472-891B-AD1348AE3E93}" +EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestSerializerExternalModels", "test\Misc\TestSerializerExternalModels\TestSerializerExternalModels.csproj", "{5D587DDE-036D-4694-A314-8DDF270AC031}" EndProject Global @@ -648,6 +650,10 @@ Global {76A549FA-69F1-4967-82B6-161A8B52C86B}.Debug|Any CPU.Build.0 = Debug|Any CPU {76A549FA-69F1-4967-82B6-161A8B52C86B}.Release|Any CPU.ActiveCfg = Release|Any CPU {76A549FA-69F1-4967-82B6-161A8B52C86B}.Release|Any CPU.Build.0 = Release|Any CPU + {4004A79F-B6BB-4472-891B-AD1348AE3E93}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4004A79F-B6BB-4472-891B-AD1348AE3E93}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4004A79F-B6BB-4472-891B-AD1348AE3E93}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4004A79F-B6BB-4472-891B-AD1348AE3E93}.Release|Any CPU.Build.0 = Release|Any CPU {5D587DDE-036D-4694-A314-8DDF270AC031}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5D587DDE-036D-4694-A314-8DDF270AC031}.Debug|Any CPU.Build.0 = Debug|Any CPU {5D587DDE-036D-4694-A314-8DDF270AC031}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -771,6 +777,7 @@ Global {2579A7F6-EBE8-485A-BB20-A5D19DB5612B} = {A41DE3D1-F8AA-4234-BE6F-3C9646A1507A} {4E79EC4B-2DC4-41E3-9AE6-17C1FFF17B02} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B} {76A549FA-69F1-4967-82B6-161A8B52C86B} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B} + {4004A79F-B6BB-4472-891B-AD1348AE3E93} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B} {5D587DDE-036D-4694-A314-8DDF270AC031} = {70BCC54E-1618-4742-A079-07588065E361} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution diff --git a/playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/ChaoticCluster.ServiceDefaults.csproj b/playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/ChaoticCluster.ServiceDefaults.csproj new file mode 100644 index 0000000000..2388aea655 --- /dev/null +++ b/playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/ChaoticCluster.ServiceDefaults.csproj @@ -0,0 +1,22 @@ + + + + net8.0 + enable + enable + true + + + + + + + + + + + + + + + diff --git a/playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/Extensions.cs b/playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/Extensions.cs new file mode 100644 index 0000000000..29dcb42871 --- /dev/null +++ b/playground/ChaoticCluster/ChaoticCluster.ServiceDefaults/Extensions.cs @@ -0,0 +1,111 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Diagnostics.HealthChecks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; +using OpenTelemetry; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +namespace Microsoft.Extensions.Hosting; +// Adds common .NET Aspire services: service discovery, resilience, health checks, and OpenTelemetry. +// This project should be referenced by each service project in your solution. +// To learn more about using this project, see https://aka.ms/dotnet/aspire/service-defaults +public static class Extensions +{ + public static IHostApplicationBuilder AddServiceDefaults(this IHostApplicationBuilder builder) + { + builder.ConfigureOpenTelemetry(); + + builder.AddDefaultHealthChecks(); + + builder.Services.AddServiceDiscovery(); + + builder.Services.ConfigureHttpClientDefaults(http => + { + // Turn on resilience by default + http.AddStandardResilienceHandler(); + + // Turn on service discovery by default + http.AddServiceDiscovery(); + }); + + // Uncomment the following to restrict the allowed schemes for service discovery. + // builder.Services.Configure(options => + // { + // options.AllowedSchemes = ["https"]; + // }); + + return builder; + } + + public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicationBuilder builder) + { + builder.Logging.AddOpenTelemetry(logging => + { + logging.IncludeFormattedMessage = true; + logging.IncludeScopes = true; + }); + + builder.Services.AddOpenTelemetry() + .WithMetrics(metrics => + { + metrics.AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation() + .AddRuntimeInstrumentation() + .AddMeter("System.Runtime") + .AddMeter("Microsoft.Orleans"); + }); + + builder.AddOpenTelemetryExporters(); + + return builder; + } + + private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostApplicationBuilder builder) + { + var useOtlpExporter = !string.IsNullOrWhiteSpace(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]); + + if (useOtlpExporter) + { + builder.Services.AddOpenTelemetry().UseOtlpExporter(); + } + + // Uncomment the following lines to enable the Azure Monitor exporter (requires the Azure.Monitor.OpenTelemetry.AspNetCore package) + //if (!string.IsNullOrEmpty(builder.Configuration["APPLICATIONINSIGHTS_CONNECTION_STRING"])) + //{ + // builder.Services.AddOpenTelemetry() + // .UseAzureMonitor(); + //} + + return builder; + } + + public static IHostApplicationBuilder AddDefaultHealthChecks(this IHostApplicationBuilder builder) + { + builder.Services.AddHealthChecks() + // Add a default liveness check to ensure app is responsive + .AddCheck("self", () => HealthCheckResult.Healthy(), ["live"]); + + return builder; + } + + public static WebApplication MapDefaultEndpoints(this WebApplication app) + { + // Adding health checks endpoints to applications in non-development environments has security implications. + // See https://aka.ms/dotnet/aspire/healthchecks for details before enabling these endpoints in non-development environments. + if (app.Environment.IsDevelopment()) + { + // All health checks must pass for app to be considered ready to accept traffic after starting + app.MapHealthChecks("/health"); + + // Only health checks tagged with the "live" tag must pass for app to be considered alive + app.MapHealthChecks("/alive", new HealthCheckOptions + { + Predicate = r => r.Tags.Contains("live") + }); + } + + return app; + } +} diff --git a/playground/ChaoticCluster/ChaoticCluster.Silo/ChaoticCluster.Silo.csproj b/playground/ChaoticCluster/ChaoticCluster.Silo/ChaoticCluster.Silo.csproj index 17a1f45a96..6dfb9074aa 100644 --- a/playground/ChaoticCluster/ChaoticCluster.Silo/ChaoticCluster.Silo.csproj +++ b/playground/ChaoticCluster/ChaoticCluster.Silo/ChaoticCluster.Silo.csproj @@ -1,16 +1,19 @@ - + Exe net8.0 enable enable + true + true + diff --git a/playground/ChaoticCluster/ChaoticCluster.Silo/Program.cs b/playground/ChaoticCluster/ChaoticCluster.Silo/Program.cs index 3751555cbd..01869077fd 100644 --- a/playground/ChaoticCluster/ChaoticCluster.Silo/Program.cs +++ b/playground/ChaoticCluster/ChaoticCluster.Silo/Program.cs @@ -1,2 +1,151 @@ -// See https://aka.ms/new-console-template for more information -Console.WriteLine("Hello, World!"); +using System.Diagnostics; +using ChaoticCluster.Silo; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Orleans.TestingHost; + +var builder = Host.CreateApplicationBuilder(args); +builder.AddServiceDefaults(); // Configure OTel +using var app = builder.Build(); +await app.StartAsync(); + +var testClusterBuilder = new InProcessTestClusterBuilder(1); +testClusterBuilder.ConfigureSilo((options, siloBuilder) => new SiloBuilderConfigurator().Configure(siloBuilder)); +testClusterBuilder.ConfigureSiloHost((options, hostBuilder) => +{ + foreach (var provider in app.Services.GetServices()) + { + hostBuilder.Logging.AddProvider(provider); + } +}); + +testClusterBuilder.ConfigureClientHost(hostBuilder => +{ + foreach (var provider in app.Services.GetServices()) + { + hostBuilder.Logging.AddProvider(provider); + } +}); + +var testCluster = testClusterBuilder.Build(); +await testCluster.DeployAsync(); +var log = testCluster.Client.ServiceProvider.GetRequiredService>(); +log.LogInformation($"ServiceId: {testCluster.Options.ServiceId}"); +log.LogInformation($"ClusterId: {testCluster.Options.ClusterId}"); + +var cts = new CancellationTokenSource(TimeSpan.FromMinutes(15)); +var reconfigurationTimer = Stopwatch.StartNew(); +var upperLimit = 10; +var lowerLimit = 1; // Membership is kept on the primary, so we can't go below 1 +var target = upperLimit; +var idBase = 0L; +var client = testCluster.Silos[0].ServiceProvider.GetRequiredService(); +const int CallsPerIteration = 100; +const int MaxGrains = 524_288; // 2**19; + +var loadTask = Task.Run(async () => +{ + while (!cts.IsCancellationRequested) + { + var time = Stopwatch.StartNew(); + var tasks = Enumerable.Range(0, CallsPerIteration).Select(i => client.GetGrain((idBase + i) % MaxGrains).Ping().AsTask()).ToList(); + var workTask = Task.WhenAll(tasks); + using var delayCancellation = new CancellationTokenSource(); + var delay = TimeSpan.FromMilliseconds(90_000); + var delayTask = Task.Delay(delay, delayCancellation.Token); + await Task.WhenAny(workTask, delayTask); + + try + { + await workTask; + } + catch (SiloUnavailableException sue) + { + log.LogInformation(sue, "Swallowed transient exception."); + } + catch (OrleansMessageRejectionException omre) + { + log.LogInformation(omre, "Swallowed rejection."); + } + catch (Exception exception) + { + log.LogError(exception, "Unhandled exception."); + throw; + } + + delayCancellation.Cancel(); + idBase += CallsPerIteration; + } +}); + +var chaosTask = Task.Run(async () => +{ + var clusterOperation = Task.CompletedTask; + while (!cts.IsCancellationRequested) + { + try + { + var remaining = TimeSpan.FromSeconds(10) - reconfigurationTimer.Elapsed; + if (remaining <= TimeSpan.Zero) + { + reconfigurationTimer.Restart(); + await clusterOperation; + + clusterOperation = Task.Run(async () => + { + var currentCount = testCluster.Silos.Count; + + if (currentCount > target) + { + // Stop or kill a random silo, but not the primary (since that hosts cluster membership) + var victim = testCluster.Silos[Random.Shared.Next(1, testCluster.Silos.Count - 1)]; + if (currentCount % 2 == 0) + { + log.LogInformation($"Stopping '{victim.SiloAddress}'."); + await testCluster.StopSiloAsync(victim); + log.LogInformation($"Stopped '{victim.SiloAddress}'."); + } + else + { + log.LogInformation($"Killing '{victim.SiloAddress}'."); + await testCluster.KillSiloAsync(victim); + log.LogInformation($"Killed '{victim.SiloAddress}'."); + } + } + else if (currentCount < target) + { + log.LogInformation("Starting new silo."); + var result = await testCluster.StartAdditionalSiloAsync(); + log.LogInformation($"Started '{result.SiloAddress}'."); + } + + if (currentCount <= lowerLimit) + { + target = upperLimit; + } + else if (currentCount >= upperLimit) + { + target = lowerLimit; + } + }); + } + else + { + await Task.Delay(remaining); + } + } + catch (Exception exception) + { + log.LogInformation(exception, "Ignoring chaos exception."); + } + } +}); + +await await Task.WhenAny(loadTask, chaosTask); +cts.Cancel(); +await Task.WhenAll(loadTask, chaosTask); +await testCluster.StopAllSilosAsync(); +await testCluster.DisposeAsync(); + +await app.StopAsync(); \ No newline at end of file diff --git a/playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs b/playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs new file mode 100644 index 0000000000..aac181b83f --- /dev/null +++ b/playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs @@ -0,0 +1,26 @@ +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.TestingHost; + +namespace ChaoticCluster.Silo; + +class SiloBuilderConfigurator : ISiloConfigurator + { + public void Configure(ISiloBuilder siloBuilder) + { +#pragma warning disable ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. + siloBuilder.AddDistributedGrainDirectory(); +#pragma warning restore ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. + } + } + +internal interface IMyTestGrain : IGrainWithIntegerKey +{ + ValueTask Ping(); +} + +[CollectionAgeLimit(Minutes = 1.01)] +internal class MyTestGrain : Grain, IMyTestGrain +{ + public ValueTask Ping() => default; +} diff --git a/src/Orleans.Core/Diagnostics/MessagingTrace.cs b/src/Orleans.Core/Diagnostics/MessagingTrace.cs index 48b0c6b700..9f44a421a9 100644 --- a/src/Orleans.Core/Diagnostics/MessagingTrace.cs +++ b/src/Orleans.Core/Diagnostics/MessagingTrace.cs @@ -218,57 +218,4 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except this.log.Log(logLevel, eventId, state, exception, formatter); } } - -public static partial class DumpCapture -{ - internal static FileInfo CreateMiniDump(string infix) => CreateMiniDump(Process.GetCurrentProcess(), infix); - internal static FileInfo CreateMiniDump(Process process, string infix, MiniDumpType dumpType = MiniDumpType.MiniDumpWithFullMemory) - { - var dumpFileName = $@"{process.ProcessName}-{infix}-{DateTime.UtcNow.ToString("yyyy-MM-dd-HH-mm-ss-fffZ", CultureInfo.InvariantCulture)}.dmp"; - - using (var stream = File.Create(dumpFileName)) - { - var result = MiniDumpWriteDump( - process.Handle, - process.Id, - stream.SafeFileHandle.DangerousGetHandle(), - dumpType, - IntPtr.Zero, - IntPtr.Zero, - IntPtr.Zero); - } - - return new FileInfo(dumpFileName); - } - - [System.Runtime.InteropServices.DllImport("Dbghelp.dll")] - private static extern bool MiniDumpWriteDump( - IntPtr hProcess, - int processId, - IntPtr hFile, - MiniDumpType dumpType, - IntPtr exceptionParam, - IntPtr userStreamParam, - IntPtr callbackParam); - - internal enum MiniDumpType - { - MiniDumpNormal = 0x00000000, - MiniDumpWithDataSegs = 0x00000001, - MiniDumpWithFullMemory = 0x00000002, - MiniDumpWithHandleData = 0x00000004, - MiniDumpFilterMemory = 0x00000008, - MiniDumpScanMemory = 0x00000010, - MiniDumpWithUnloadedModules = 0x00000020, - MiniDumpWithIndirectlyReferencedMemory = 0x00000040, - MiniDumpFilterModulePaths = 0x00000080, - MiniDumpWithProcessThreadData = 0x00000100, - MiniDumpWithPrivateReadWriteMemory = 0x00000200, - MiniDumpWithoutOptionalData = 0x00000400, - MiniDumpWithFullMemoryInfo = 0x00000800, - MiniDumpWithThreadInfo = 0x00001000, - MiniDumpWithCodeSegs = 0x00002000, - MiniDumpWithoutManagedState = 0x00004000, - } -} } diff --git a/src/Orleans.Core/Diagnostics/Metrics/DirectoryInstruments.cs b/src/Orleans.Core/Diagnostics/Metrics/DirectoryInstruments.cs index 263130add9..e59e3a9dde 100644 --- a/src/Orleans.Core/Diagnostics/Metrics/DirectoryInstruments.cs +++ b/src/Orleans.Core/Diagnostics/Metrics/DirectoryInstruments.cs @@ -21,8 +21,10 @@ internal static class DirectoryInstruments internal static readonly Counter ValidationsCacheSent = Instruments.Meter.CreateCounter(InstrumentNames.DIRECTORY_VALIDATIONS_CACHE_SENT); internal static readonly Counter ValidationsCacheReceived = Instruments.Meter.CreateCounter(InstrumentNames.DIRECTORY_VALIDATIONS_CACHE_RECEIVED); + internal static readonly Counter SnapshotTransferCount = Instruments.Meter.CreateCounter(InstrumentNames.DIRECTORY_RANGE_SNAPSHOT_TRANSFER_COUNT); internal static readonly Histogram SnapshotTransferDuration = Instruments.Meter.CreateHistogram(InstrumentNames.DIRECTORY_RANGE_SNAPSHOT_TRANSFER_DURATION); - internal static readonly Histogram RangeRecoveryDuration = Instruments.Meter.CreateHistogram(InstrumentNames.DIRECTORY_RANGE_SNAPSHOT_TRANSFER_DURATION); + internal static readonly Counter RangeRecoveryCount = Instruments.Meter.CreateCounter(InstrumentNames.DIRECTORY_RANGE_RECOVERY_COUNT); + internal static readonly Histogram RangeRecoveryDuration = Instruments.Meter.CreateHistogram(InstrumentNames.DIRECTORY_RANGE_RECOVERY_DURATION); internal static readonly Histogram RangeLockHeldDuration = Instruments.Meter.CreateHistogram(InstrumentNames.DIRECTORY_RANGE_LOCK_HELD_DURATION); internal static ObservableGauge DirectoryPartitionSize; diff --git a/src/Orleans.Core/Diagnostics/Metrics/InstrumentNames.cs b/src/Orleans.Core/Diagnostics/Metrics/InstrumentNames.cs index bad327c9a5..cf1a8ec643 100644 --- a/src/Orleans.Core/Diagnostics/Metrics/InstrumentNames.cs +++ b/src/Orleans.Core/Diagnostics/Metrics/InstrumentNames.cs @@ -83,7 +83,9 @@ internal static class InstrumentNames public const string DIRECTORY_UNREGISTRATIONS_MANY_REMOTE_SENT = "orleans-directory-unregistrations-many-remote-sent"; public const string DIRECTORY_UNREGISTRATIONS_MANY_REMOTE_RECEIVED = "orleans-directory-unregistrations-many-remote-received"; + public const string DIRECTORY_RANGE_SNAPSHOT_TRANSFER_COUNT = "orleans-directory-snapshot-transfer-count"; public const string DIRECTORY_RANGE_SNAPSHOT_TRANSFER_DURATION = "orleans-directory-snapshot-transfer-duration"; + public const string DIRECTORY_RANGE_RECOVERY_COUNT = "orleans-directory-recovery-count"; public const string DIRECTORY_RANGE_RECOVERY_DURATION = "orleans-directory-recovery-duration"; public const string DIRECTORY_RANGE_LOCK_HELD_DURATION = "orleans-directory-range-lock-held-duration"; diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index 6c05ce0db1..e072c525e2 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -1798,8 +1798,9 @@ private async Task FinishDeactivating(ActivationState previousState, Cancellatio // If the instance is being deactivated due to a directory failure, we should not unregister it. var isDirectoryFailure = DeactivationReason.ReasonCode is DeactivationReasonCode.DirectoryFailure; + var isShuttingDown = DeactivationReason.ReasonCode is DeactivationReasonCode.ShuttingDown; - if (!migrated && IsUsingGrainDirectory && !cancellationToken.IsCancellationRequested && !isDirectoryFailure) + if (!migrated && IsUsingGrainDirectory && !cancellationToken.IsCancellationRequested && !isDirectoryFailure && !isShuttingDown) { // Unregister from directory. // If the grain was migrated, the new activation will perform a check-and-set on the registration itself. diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 602c237990..d7b91c7cf8 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -261,26 +261,11 @@ internal async Task DeactivateActivations(DeactivationReason reason, List(list.Count); - foreach (var activation in list) + await Parallel.ForEachAsync(list, cancellationToken, (activation, ct) => { - activation.Deactivate(reason, cancellationToken); - tasks.Add(activation.Deactivated); - } - - await Task.WhenAll(tasks); - } - - internal void StartDeactivatingActivations(DeactivationReason reason, List list, CancellationToken cancellationToken) - { - if (list == null || list.Count == 0) return; - - if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("DeactivateActivations: {Count} activations.", list.Count); - - foreach (var activation in list) - { - activation.Deactivate(reason, cancellationToken); - } + activation.Deactivate(reason, ct); + return new (activation.Deactivated); + }); } public async Task DeactivateAllActivations(CancellationToken cancellationToken) @@ -290,14 +275,14 @@ public async Task DeactivateAllActivations(CancellationToken cancellationToken) logger.LogDebug((int)ErrorCode.Catalog_DeactivateAllActivations, "DeactivateAllActivations."); } - var activationsToShutdown = new List(); - foreach (var pair in activations) - { - activationsToShutdown.Add(pair.Value); - } - + if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("DeactivateActivations: {Count} activations.", activations.Count); var reason = new DeactivationReason(DeactivationReasonCode.ShuttingDown, "This process is terminating."); - await DeactivateActivations(reason, activationsToShutdown, cancellationToken).WaitAsync(cancellationToken); + await Parallel.ForEachAsync(activations, cancellationToken, (kv, ct) => + { + var activation = kv.Value; + activation.Deactivate(reason, ct); + return new (activation.Deactivated); + }); } public SiloStatus LocalSiloStatus @@ -308,20 +293,20 @@ public SiloStatus LocalSiloStatus } } - public Task DeleteActivations(List addresses, DeactivationReasonCode reasonCode, string reasonText) + public async Task DeleteActivations(List addresses, DeactivationReasonCode reasonCode, string reasonText) { var tasks = new List(addresses.Count); var deactivationReason = new DeactivationReason(reasonCode, reasonText); - foreach (var activationAddress in addresses) + await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) => { if (TryGetGrainContext(activationAddress.GrainId, out var grainContext)) { grainContext.Deactivate(deactivationReason); - tasks.Add(grainContext.Deactivated); + return new ValueTask(grainContext.Deactivated); } - } - return Task.WhenAll(tasks); + return ValueTask.CompletedTask; + }); } // TODO move this logic in the LocalGrainDirectory @@ -388,6 +373,18 @@ internal void OnSiloStatusChange(SiloAddress updatedSilo, SiloStatus status) StartDeactivatingActivations(reason, activationsToShutdown, CancellationToken.None); } } + + void StartDeactivatingActivations(DeactivationReason reason, List list, CancellationToken cancellationToken) + { + if (list == null || list.Count == 0) return; + + if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("DeactivateActivations: {Count} activations.", list.Count); + + foreach (var activation in list) + { + activation.Deactivate(reason, cancellationToken); + } + } } } } diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs index b3dc5a4a34..00a28cba4d 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs @@ -559,6 +559,7 @@ private async Task TransferSnapshotAsync(DirectoryMembershipSnapshot curre _logger.LogDebug("Transferred '{Count}' entries for range '{Range}' from '{PreviousOwner}'.", snapshot.GrainAddresses.Count, addedRange, previousOwner); } + DirectoryInstruments.SnapshotTransferCount.Add(1); DirectoryInstruments.SnapshotTransferDuration.Record((long)stopwatch.Elapsed.TotalMilliseconds); return true; @@ -598,6 +599,7 @@ private async Task RecoverPartitionRange(DirectoryMembershipSnapshot current, Ri } } + DirectoryInstruments.RangeRecoveryCount.Add(1); DirectoryInstruments.RangeRecoveryDuration.Record((long)stopwatch.Elapsed.TotalMilliseconds); if (_logger.IsEnabled(LogLevel.Debug)) { diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs index c608686e73..00f60a3b34 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs @@ -29,7 +29,7 @@ public GrainDirectoryResolver( var services = serviceProvider.GetGrainDirectories(); foreach (var svc in services) { - this.directoryPerName.Add(svc.Name, serviceProvider.GetRequiredKeyedService(svc.Name)); + this.directoryPerName[svc.Name] = serviceProvider.GetRequiredKeyedService(svc.Name); } this.directoryPerName.TryGetValue(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, out var defaultDirectory); diff --git a/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs b/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs index 0a93c7058e..1f6a639796 100644 --- a/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs +++ b/src/Orleans.Runtime/MembershipService/LocalSiloHealthMonitor.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; diff --git a/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs b/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs index fa98a98e4b..58b8c3824c 100644 --- a/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs +++ b/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs @@ -57,17 +57,7 @@ public async Task ElasticChaos() var time = Stopwatch.StartNew(); var tasks = Enumerable.Range(0, CallsPerIteration).Select(i => client.GetGrain(idBase + i).Ping().AsTask()).ToList(); var workTask = Task.WhenAll(tasks); - using var delayCancellation = new CancellationTokenSource(); - var delay = TimeSpan.FromMilliseconds(90_000); - var delayTask = Task.Delay(delay, delayCancellation.Token); - await Task.WhenAny(workTask, delayTask); - if (delayTask.IsCompleted) - { - log.LogError("SLOW CALL."); - DumpCapture.CreateMiniDump("delayed"); - Assert.False(delayTask.IsCompleted, $"Request took longer than {delay.TotalSeconds}s to complete."); - } - + try { await workTask; @@ -83,11 +73,9 @@ public async Task ElasticChaos() catch (Exception exception) { log.LogError(exception, "Unhandled exception."); - DumpCapture.CreateMiniDump("unexpected"); throw; } - delayCancellation.Cancel(); idBase += CallsPerIteration; } });