diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index d02140c36..dd351c1fe 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -150,13 +150,13 @@ public DurableTaskExtension( this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver)); this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService)); - this.ResolveAppSettingOptions(); + DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver); ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName); this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents); this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper(); - this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories); + this.durabilityProviderFactory = GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories); this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider(); this.isOptionsConfigured = true; @@ -249,6 +249,8 @@ public string HubName internal DurableTaskOptions Options { get; } + internal DurabilityProvider DefaultDurabilityProvider => this.defaultDurabilityProvider; + internal HttpApiHandler HttpApiHandler { get; private set; } internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; } @@ -296,7 +298,7 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault); } - private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable orchestrationServiceFactories) + internal static IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable orchestrationServiceFactories) { bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType); @@ -578,32 +580,13 @@ private void StopLocalGrpcServer() } #endif - private void ResolveAppSettingOptions() - { - if (this.Options == null) - { - throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings."); - } - - if (this.nameResolver == null) - { - throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings."); - } - - if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName)) - { - // use the resolved task hub name - this.Options.HubName = taskHubName; - } - } - private void InitializeForFunctionsV1(ExtensionConfigContext context) { #if FUNCTIONS_V1 context.ApplyConfig(this.Options, "DurableTask"); this.nameResolver = context.Config.NameResolver; this.loggerFactory = context.Config.LoggerFactory; - this.ResolveAppSettingOptions(); + DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver); ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName); this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents); this.connectionInfoResolver = new WebJobsConnectionInfoProvider(); @@ -1573,108 +1556,6 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat activity.AddTag("DurableFunctionsRuntimeStatus", statusStr); } } - - internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string connectionName) - { - if (this.defaultDurabilityProvider.TryGetScaleMonitor( - functionId, - functionName.Name, - this.Options.HubName, - connectionName, - out IScaleMonitor scaleMonitor)) - { - return scaleMonitor; - } - else - { - // the durability provider does not support runtime scaling. - // Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on). - return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower(), functionId); - } - } -#endif -#if FUNCTIONS_V3_OR_GREATER - - internal ITargetScaler GetTargetScaler(string functionId, FunctionName functionName, string connectionName) - { - if (this.defaultDurabilityProvider.TryGetTargetScaler( - functionId, - functionName.Name, - this.Options.HubName, - connectionName, - out ITargetScaler targetScaler)) - { - return targetScaler; - } - else - { - // the durability provider does not support target-based scaling. - // Create an empty target scaler to avoid exceptions (unless target-based scaling is actually turned on). - return new NoOpTargetScaler(functionId); - } - } - - private sealed class NoOpTargetScaler : ITargetScaler - { - /// - /// Construct a placeholder target scaler. - /// - /// The function ID. - public NoOpTargetScaler(string functionId) - { - this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId); - } - - public TargetScalerDescriptor TargetScalerDescriptor { get; } - - public Task GetScaleResultAsync(TargetScalerContext context) - { - throw new NotImplementedException("The current DurableTask backend configuration does not support target-based scaling"); - } - } -#endif - -#if !FUNCTIONS_V1 - /// - /// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. - /// This is required to allow operation of those providers even if runtime scaling is turned off - /// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. - /// - private sealed class NoOpScaleMonitor : IScaleMonitor - { - /// - /// Construct a placeholder scale monitor. - /// - /// A descriptive name. - /// The function ID. - public NoOpScaleMonitor(string name, string functionId) - { -#if FUNCTIONS_V3_OR_GREATER - this.Descriptor = new ScaleMonitorDescriptor(name, functionId); -#else -#pragma warning disable CS0618 // Type or member is obsolete - this.Descriptor = new ScaleMonitorDescriptor(name); -#pragma warning restore CS0618 // Type or member is obsolete -#endif - } - - /// - /// A descriptive name. - /// - public ScaleMonitorDescriptor Descriptor { get; private set; } - - /// - Task IScaleMonitor.GetMetricsAsync() - { - throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); - } - - /// - ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) - { - throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); - } - } #endif } } diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs index 93c7bff07..bd793c17a 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs @@ -2,14 +2,19 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; +using System.Collections.Generic; +using System.Linq; #if !FUNCTIONS_V1 using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth; using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; #else using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; @@ -107,6 +112,27 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder) return builder; } +#if FUNCTIONS_V3_OR_GREATER + /// + /// Adds the and providers for the Durable Triggers. + /// + /// The to configure. + /// Returns the provided . + internal static IWebJobsBuilder AddDurableScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata) + { + // this segment adheres to the followings pattern: https://github.com/Azure/azure-sdk-for-net/pull/38756 + DurableTaskTriggersScaleProvider provider = null; + builder.Services.AddSingleton(serviceProvider => + { + provider = new DurableTaskTriggersScaleProvider(serviceProvider.GetService>(), serviceProvider.GetService(), serviceProvider.GetService(), serviceProvider.GetService>(), triggerMetadata); + return provider; + }); + builder.Services.AddSingleton(serviceProvider => serviceProvider.GetServices().Single(x => x == provider)); + builder.Services.AddSingleton(serviceProvider => serviceProvider.GetServices().Single(x => x == provider)); + return builder; + } +#endif + /// /// Adds the Durable Task extension to the provided . /// diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs index 3e01cd3fe..4fa87cc07 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs @@ -4,6 +4,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale; using Microsoft.Azure.WebJobs.Host.Listeners; #if !FUNCTIONS_V1 using Microsoft.Azure.WebJobs.Host.Scale; @@ -51,21 +52,25 @@ public DurableTaskListener( this.functionName = functionName; this.functionType = functionType; this.connectionName = connectionName; -#if !FUNCTIONS_V1 +#if !FUNCTIONS_V1 this.scaleMonitor = new Lazy(() => - this.config.GetScaleMonitor( + ScaleUtils.GetScaleMonitor( + this.config.DefaultDurabilityProvider, this.functionId, this.functionName, - this.connectionName)); + this.connectionName, + this.config.Options.HubName)); #endif #if FUNCTIONS_V3_OR_GREATER this.targetScaler = new Lazy(() => - this.config.GetTargetScaler( + ScaleUtils.GetTargetScaler( + this.config.DefaultDurabilityProvider, this.functionId, this.functionName, - this.connectionName)); + this.connectionName, + this.config.Options.HubName)); #endif } diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs index bed7f549b..4c05b3df0 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs @@ -51,7 +51,7 @@ public DurableTaskScaleMonitor( // We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2. // Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4. this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower()); -#pragma warning restore CS0618 // Type or member is obsolete +#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1 #endif } diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs index 786b5fd08..c1faa94f4 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. #if FUNCTIONS_V3_OR_GREATER +#nullable enable using System; using System.Collections.Generic; using System.Linq; @@ -38,46 +39,55 @@ public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider met public async Task GetScaleResultAsync(TargetScalerContext context) { - // This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process. - var metrics = await this.metricsProvider.GetMetricsAsync(); - - // compute activityWorkers: the number of workers we need to process all activity messages - var workItemQueueLength = metrics.WorkItemQueueLength; - double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities); - - var serializedControlQueueLengths = metrics.ControlQueueLengths; - var controlQueueLengths = JsonConvert.DeserializeObject>(serializedControlQueueLengths); - - var controlQueueMessages = controlQueueLengths.Sum(); - var activeControlQueues = controlQueueLengths.Count(x => x > 0); - - // compute orchestratorWorkers: the number of workers we need to process all orchestrator messages. - // We bound this result to be no larger than the partition count - var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators); - var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers); - - int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers); - this.scaleResult.TargetWorkerCount = numWorkersToRequest; - - // When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table. - // This works because this code does not execute in the Functions Host process, but in the ScaleController process, - // and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto. - var scaleControllerLog = $"Target worker count for {this.functionId}: {numWorkersToRequest}. " + - $"Metrics used: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " + - $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; - - // target worker count should never be negative - if (numWorkersToRequest < 0) + DurableTaskTriggerMetrics? metrics = null; + try { - scaleControllerLog = "Tried to request a negative worker count." + scaleControllerLog; - this.logger.LogError(scaleControllerLog); - - // Throw exception so ScaleController can handle the error. - throw new Exception(scaleControllerLog); + // This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process. + metrics = await this.metricsProvider.GetMetricsAsync(); + + // compute activityWorkers: the number of workers we need to process all activity messages + var workItemQueueLength = metrics.WorkItemQueueLength; + double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities); + + var serializedControlQueueLengths = metrics.ControlQueueLengths; + var controlQueueLengths = JsonConvert.DeserializeObject>(serializedControlQueueLengths); + + var controlQueueMessages = controlQueueLengths.Sum(); + var activeControlQueues = controlQueueLengths.Count(x => x > 0); + + // compute orchestratorWorkers: the number of workers we need to process all orchestrator messages. + // We bound this result to be no larger than the partition count + var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators); + var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers); + + int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers); + this.scaleResult.TargetWorkerCount = numWorkersToRequest; + + // When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table. + // This works because this code does not execute in the Functions Host process, but in the ScaleController process, + // and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto. + var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " + + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; + var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " + + metricsLog; + + // target worker count should never be negative + if (numWorkersToRequest < 0) + { + throw new InvalidOperationException("Number of workers to request cannot be negative"); + } + + this.logger.LogInformation(scaleControllerLog); + return this.scaleResult; + } + catch (Exception ex) + { + // We want to augment the exception with metrics information for investigation purposes + var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " + + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; + var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog; + throw new Exception(errorLog, ex); } - - this.logger.LogDebug(scaleControllerLog); - return this.scaleResult; } } } diff --git a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml index 0113cfbf7..f76c0ad61 100644 --- a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml +++ b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml @@ -2634,31 +2634,6 @@ This metadata will show up in Application Insights, if enabled. - - - A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. - This is required to allow operation of those providers even if runtime scaling is turned off - see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. - - - - - Construct a placeholder scale monitor. - - A descriptive name. - The function ID. - - - - A descriptive name. - - - - - - - - Extension for registering a Durable Functions configuration with JobHostConfiguration. @@ -4851,6 +4826,31 @@ The delegate to handle exception to determine if retries should proceed. + + + A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. + This is required to allow operation of those providers even if runtime scaling is turned off + see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. + + + + + Construct a placeholder scale monitor. + + A descriptive name. + The function ID. + + + + A descriptive name. + + + + + + + + Connection info provider which resolves connection information from a standard application (Non WebJob). diff --git a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs index 47249848a..e5eac789f 100644 --- a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs +++ b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Net.Http; using DurableTask.AzureStorage.Partitioning; +using Microsoft.Azure.WebJobs.Host; using Newtonsoft.Json; using Newtonsoft.Json.Converters; using Newtonsoft.Json.Linq; @@ -229,6 +230,25 @@ public string HubName // to mock the value from ExtensionConfigContext. It should not be used in production code paths. internal Func WebhookUriProviderOverride { get; set; } + internal static void ResolveAppSettingOptions(DurableTaskOptions options, INameResolver nameResolver) + { + if (options == null) + { + throw new InvalidOperationException($"{nameof(options)} must be set before resolving app settings."); + } + + if (nameResolver == null) + { + throw new InvalidOperationException($"{nameof(nameResolver)} must be set before resolving app settings."); + } + + if (nameResolver.TryResolveWholeString(options.HubName, out string taskHubName)) + { + // use the resolved task hub name + options.HubName = taskHubName; + } + } + /// /// Sets HubName to a value that is considered a default value. /// diff --git a/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs b/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs new file mode 100644 index 000000000..fd50f9e38 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs @@ -0,0 +1,129 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. +#nullable enable +#if FUNCTIONS_V3_OR_GREATER + +using System; +using System.Collections.Generic; +using System.ComponentModel; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale +{ + internal class DurableTaskTriggersScaleProvider : IScaleMonitorProvider, ITargetScalerProvider + { + private readonly IScaleMonitor monitor; + private readonly ITargetScaler targetScaler; + private readonly DurableTaskOptions options; + private readonly INameResolver nameResolver; + private readonly ILoggerFactory loggerFactory; + private readonly IEnumerable durabilityProviderFactories; + + public DurableTaskTriggersScaleProvider( + IOptions durableTaskOptions, + INameResolver nameResolver, + ILoggerFactory loggerFactory, + IEnumerable durabilityProviderFactories, + TriggerMetadata triggerMetadata) + { + this.options = durableTaskOptions.Value; + this.nameResolver = nameResolver; + this.loggerFactory = loggerFactory; + this.durabilityProviderFactories = durabilityProviderFactories; + + string functionId = triggerMetadata.FunctionName; + FunctionName functionName = new FunctionName(functionId); + + this.GetOptions(triggerMetadata); + + IDurabilityProviderFactory durabilityProviderFactory = this.GetDurabilityProviderFactory(); + DurabilityProvider defaultDurabilityProvider = durabilityProviderFactory.GetDurabilityProvider(); + + string? connectionName = durabilityProviderFactory is AzureStorageDurabilityProviderFactory azureStorageDurabilityProviderFactory + ? azureStorageDurabilityProviderFactory.DefaultConnectionName + : null; + + this.targetScaler = ScaleUtils.GetTargetScaler( + defaultDurabilityProvider, + functionId, + functionName, + connectionName, + this.options.HubName); + + this.monitor = ScaleUtils.GetScaleMonitor( + defaultDurabilityProvider, + functionId, + functionName, + connectionName, + this.options.HubName); + } + + private void GetOptions(TriggerMetadata triggerMetadata) + { + // the metadata is the sync triggers payload + var metadata = triggerMetadata.Metadata.ToObject(); + + // The property `taskHubName` is always expected in the SyncTriggers payload + this.options.HubName = metadata?.TaskHubName ?? throw new InvalidOperationException($"Expected `taskHubName` property in SyncTriggers payload but found none. Payload: {triggerMetadata.Metadata}"); + if (metadata?.MaxConcurrentActivityFunctions != null) + { + this.options.MaxConcurrentActivityFunctions = metadata?.MaxConcurrentActivityFunctions; + } + + if (metadata?.MaxConcurrentOrchestratorFunctions != null) + { + this.options.MaxConcurrentOrchestratorFunctions = metadata?.MaxConcurrentOrchestratorFunctions; + } + + if (metadata?.StorageProvider != null) + { + this.options.StorageProvider = metadata?.StorageProvider; + } + + DurableTaskOptions.ResolveAppSettingOptions(this.options, this.nameResolver); + } + + private IDurabilityProviderFactory GetDurabilityProviderFactory() + { + var logger = this.loggerFactory.CreateLogger(); + var durabilityProviderFactory = DurableTaskExtension.GetDurabilityProviderFactory(this.options, logger, this.durabilityProviderFactories); + return durabilityProviderFactory; + } + + public IScaleMonitor GetMonitor() + { + return this.monitor; + } + + public ITargetScaler GetTargetScaler() + { + return this.targetScaler; + } + + /// + /// Captures the relevant DF SyncTriggers JSON properties for making scaling decisions. + /// + internal class DurableTaskMetadata + { + [JsonProperty] + public string? TaskHubName { get; set; } + + [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(null)] + public int? MaxConcurrentOrchestratorFunctions { get; set; } + + [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(null)] + public int? MaxConcurrentActivityFunctions { get; set; } + + [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(null)] + public IDictionary? StorageProvider { get; set; } + } + } +} +#endif \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs b/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs new file mode 100644 index 000000000..0d60da23f --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs @@ -0,0 +1,120 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. +#nullable enable + +using System; +using System.Threading.Tasks; + +#if !FUNCTIONS_V1 +using Microsoft.Azure.WebJobs.Host.Scale; +#endif + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale +{ + internal static class ScaleUtils + { +#if !FUNCTIONS_V1 + internal static IScaleMonitor GetScaleMonitor(DurabilityProvider durabilityProvider, string functionId, FunctionName functionName, string? connectionName, string hubName) + { + if (durabilityProvider.TryGetScaleMonitor( + functionId, + functionName.Name, + hubName, + connectionName, + out IScaleMonitor scaleMonitor)) + { + return scaleMonitor; + } + else + { + // the durability provider does not support runtime scaling. + // Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on). + return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{hubName}".ToLower(), functionId); + } + } + + /// + /// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. + /// This is required to allow operation of those providers even if runtime scaling is turned off + /// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. + /// + internal sealed class NoOpScaleMonitor : IScaleMonitor + { + /// + /// Construct a placeholder scale monitor. + /// + /// A descriptive name. + /// The function ID. + public NoOpScaleMonitor(string name, string functionId) + { +#if FUNCTIONS_V3_OR_GREATER + this.Descriptor = new ScaleMonitorDescriptor(name, functionId); +#else +#pragma warning disable CS0618 // Type or member is obsolete + this.Descriptor = new ScaleMonitorDescriptor(name); +#pragma warning restore CS0618 // Type or member is obsolete +#endif + } + + /// + /// A descriptive name. + /// + public ScaleMonitorDescriptor Descriptor { get; private set; } + + /// + Task IScaleMonitor.GetMetricsAsync() + { + throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); + } + + /// + ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) + { + throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); + } + } +#endif + +#if FUNCTIONS_V3_OR_GREATER +#pragma warning disable SA1201 // Elements should appear in the correct order + internal static ITargetScaler GetTargetScaler(DurabilityProvider durabilityProvider, string functionId, FunctionName functionName, string? connectionName, string hubName) +#pragma warning restore SA1201 // Elements should appear in the correct order + { + if (durabilityProvider.TryGetTargetScaler( + functionId, + functionName.Name, + hubName, + connectionName, + out ITargetScaler targetScaler)) + { + return targetScaler; + } + else + { + // the durability provider does not support target-based scaling. + // Create an empty target scaler to avoid exceptions (unless target-based scaling is actually turned on). + return new NoOpTargetScaler(functionId); + } + } + + internal sealed class NoOpTargetScaler : ITargetScaler + { + /// + /// Construct a placeholder target scaler. + /// + /// The function ID. + public NoOpTargetScaler(string functionId) + { + this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId); + } + + public TargetScalerDescriptor TargetScalerDescriptor { get; } + + public Task GetScaleResultAsync(TargetScalerContext context) + { + throw new NotSupportedException("The current DurableTask backend configuration does not support target-based scaling"); + } + } +#endif + } +} diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index f06495b10..b057bfdba 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -89,7 +89,7 @@ - + diff --git a/test/Common/TestHelpers.cs b/test/Common/TestHelpers.cs index ec1fd043c..dafa10e5d 100644 --- a/test/Common/TestHelpers.cs +++ b/test/Common/TestHelpers.cs @@ -13,9 +13,8 @@ using Microsoft.ApplicationInsights.Channel; #if !FUNCTIONS_V1 using Microsoft.Extensions.Hosting; +using Microsoft.Azure.WebJobs.Host.Scale; #endif -using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; -using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -67,6 +66,9 @@ public static ITestHost GetJobHost( int entityMessageReorderWindowInMinutes = 30, string exactTaskHubName = null, bool addDurableClientFactory = false, +#if !FUNCTIONS_V1 + Action configureScaleOptions = null, +#endif Type[] types = null) { switch (storageProviderType) @@ -160,6 +162,7 @@ public static ITestHost GetJobHost( #if !FUNCTIONS_V1 addDurableClientFactory: addDurableClientFactory, types: types, + configureScaleOptions: configureScaleOptions, #endif durabilityProviderFactoryType: durabilityProviderFactoryType); } @@ -175,6 +178,9 @@ public static ITestHost GetJobHostWithOptions( Action onSend = null, Type durabilityProviderFactoryType = null, bool addDurableClientFactory = false, +#if !FUNCTIONS_V1 + Action configureScaleOptions = null, +#endif Type[] types = null) { if (serializerSettings == null) @@ -198,6 +204,7 @@ public static ITestHost GetJobHostWithOptions( durabilityProviderFactoryType: durabilityProviderFactoryType, addDurableClientFactory: addDurableClientFactory, typeLocator: typeLocator, + configureScaleOptions: configureScaleOptions, #endif loggerProvider: loggerProvider, nameResolver: testNameResolver, diff --git a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs index 5ff08d579..57566da29 100644 --- a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs +++ b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs @@ -54,8 +54,7 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output) this.storageAccount, logger, metricsProvider, - this.performanceMonitor.Object - ); + this.performanceMonitor.Object); } [Fact] diff --git a/test/FunctionsV2/DurableTaskTargetScalerTests.cs b/test/FunctionsV2/DurableTaskTargetScalerTests.cs index d0ada8b51..cc2272e16 100644 --- a/test/FunctionsV2/DurableTaskTargetScalerTests.cs +++ b/test/FunctionsV2/DurableTaskTargetScalerTests.cs @@ -1,19 +1,25 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See LICENSE in the project root for license information. +using System; +using System.Linq; using System.Threading.Tasks; using DurableTask.AzureStorage.Monitoring; using DurableTask.Core; using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Azure.WebJobs.Logging; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.WindowsAzure.Storage; using Moq; using Xunit; using Xunit.Abstractions; +using static Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale.ScaleUtils; +using static Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests.PlatformSpecificHelpers; namespace WebJobs.Extensions.DurableTask.Tests.V2 { @@ -24,14 +30,17 @@ public class DurableTaskTargetScalerTests private readonly Mock metricsProviderMock; private readonly Mock triggerMetricsMock; private readonly Mock orchestrationServiceMock; + private readonly Mock durabilityProviderMock; + private readonly TestLoggerProvider loggerProvider; + private readonly ITestOutputHelper output; public DurableTaskTargetScalerTests(ITestOutputHelper output) { this.scalerContext = new TargetScalerContext(); - + this.output = output; var loggerFactory = new LoggerFactory(); - var loggerProvider = new TestLoggerProvider(output); - loggerFactory.AddProvider(loggerProvider); + this.loggerProvider = new TestLoggerProvider(this.output); + loggerFactory.AddProvider(this.loggerProvider); ILogger logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask")); DisconnectedPerformanceMonitor nullPerformanceMonitorMock = null; @@ -47,7 +56,7 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output) this.triggerMetricsMock = new Mock(MockBehavior.Strict); this.orchestrationServiceMock = new Mock(MockBehavior.Strict); - var durabilityProviderMock = new Mock( + this.durabilityProviderMock = new Mock( MockBehavior.Strict, "storageProviderName", this.orchestrationServiceMock.Object, @@ -57,7 +66,7 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output) this.targetScaler = new DurableTaskTargetScaler( "FunctionId", this.metricsProviderMock.Object, - durabilityProviderMock.Object, + this.durabilityProviderMock.Object, logger); } @@ -84,5 +93,87 @@ public async Task TestTargetScaler(int maxConcurrentActivities, int maxConcurren var targetWorkerCount = scaleResult.TargetWorkerCount; Assert.Equal(expectedWorkerCount, targetWorkerCount); } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(true)] + [InlineData(false)] + public void TestGetTargetScaler(bool supportsTBS) + { + ITargetScaler targetScaler = new Mock().Object; + this.durabilityProviderMock.Setup(m => m.TryGetTargetScaler(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out targetScaler)) + .Returns(supportsTBS); + + var scaler = ScaleUtils.GetTargetScaler(this.durabilityProviderMock.Object, "FunctionId", new FunctionName("FunctionName"), "connectionName", "HubName"); + if (!supportsTBS) + { + Assert.IsType(scaler); + Assert.ThrowsAsync(() => scaler.GetScaleResultAsync(context: null)); + } + else + { + Assert.Equal(targetScaler, scaler); + } + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(true)] + [InlineData(false)] + public void TestGetScaleMonitor(bool supportsScaleMonitor) + { + IScaleMonitor scaleMonitor = new Mock().Object; + this.durabilityProviderMock.Setup(m => m.TryGetScaleMonitor(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out scaleMonitor)) + .Returns(supportsScaleMonitor); + + var monitor = ScaleUtils.GetScaleMonitor(this.durabilityProviderMock.Object, "FunctionId", new FunctionName("FunctionName"), "connectionName", "HubName"); + if (!supportsScaleMonitor) + { + Assert.IsType(monitor); + Assert.Throws(() => monitor.GetScaleStatus(context: null)); + } + else + { + Assert.Equal(scaleMonitor, monitor); + } + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(true)] + [InlineData(false)] + public async void ScaleHostE2ETest(bool isTbsEnabled) + { + Action configureScaleOptions = (scaleOptions) => + { + scaleOptions.IsTargetScalingEnabled = isTbsEnabled; + scaleOptions.MetricsPurgeEnabled = false; + scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4); + scaleOptions.IsRuntimeScalingEnabled = true; + scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1); + }; + using (FunctionsV2HostWrapper host = (FunctionsV2HostWrapper)TestHelpers.GetJobHost(this.loggerProvider, nameof(this.ScaleHostE2ETest), enableExtendedSessions: false, configureScaleOptions: configureScaleOptions)) + { + await host.StartAsync(); + + IScaleStatusProvider scaleManager = host.InnerHost.Services.GetService(); + var client = await host.StartOrchestratorAsync(nameof(TestOrchestrations.FanOutFanIn), 50, this.output); + var status = await client.WaitForCompletionAsync(this.output, timeout: TimeSpan.FromSeconds(400)); + var scaleStatus = await scaleManager.GetScaleStatusAsync(new ScaleStatusContext()); + await host.StopAsync(); + Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus); + + // We inspect the Host's logs for evidence that the Host is correctly sampling our scaling requests. + // the expected logs depend on whether TBS is enabled or not + var expectedSubString = "scale monitors to sample"; + if (isTbsEnabled) + { + expectedSubString = "target scalers to sample"; + } + + bool containsExpectedLog = this.loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage ?? "").Any(p => p.Contains(expectedSubString)); + Assert.True(containsExpectedLog); + } + } } } diff --git a/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs b/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs index bfd72124f..4b2d46456 100644 --- a/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs +++ b/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs @@ -8,7 +8,7 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Channel; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; -using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; @@ -37,7 +37,8 @@ public static ITestHost CreateJobHost( IMessageSerializerSettingsFactory serializerSettingsFactory, Action onSend, bool addDurableClientFactory, - ITypeLocator typeLocator) + ITypeLocator typeLocator, + Action configureScaleOptions = null) { // Unless otherwise specified, use legacy partition management for tests as it makes the task hubs start up faster. // These tests run on a single task hub workers, so they don't test partition management anyways, and that is tested @@ -47,7 +48,7 @@ public static ITestHost CreateJobHost( options.Value.StorageProvider.Add(nameof(AzureStorageOptions.UseLegacyPartitionManagement), true); } - IHost host = new HostBuilder() + var hostBuilder = new HostBuilder() .ConfigureLogging( loggingBuilder => { @@ -97,9 +98,22 @@ public static ITestHost CreateJobHost( return telemetryActivator; }); } - }) - .Build(); + }); + + // if a configureScaleOptions action is provided, then we're probably trying to test the host's scaling logic + // we configure WebJobsScale and set the minimum logging level to `Debug`, as scaling logs are usually at the `Debug` level + if (configureScaleOptions != null) + { + hostBuilder.ConfigureWebJobsScale( + (context, builder) => + { + // ignore + }, + configureScaleOptions) + .ConfigureLogging(builder => builder.SetMinimumLevel(LogLevel.Debug)); + } + var host = hostBuilder.Build(); return new FunctionsV2HostWrapper(host, options, nameResolver); } @@ -216,7 +230,7 @@ private static IWebJobsBuilder AddEmulatorDurableTask(this IWebJobsBuilder build internal class FunctionsV2HostWrapper : ITestHost { - private readonly IHost innerHost; + internal readonly IHost InnerHost; private readonly JobHost innerWebJobsHost; private readonly DurableTaskOptions options; private readonly INameResolver nameResolver; @@ -226,8 +240,8 @@ public FunctionsV2HostWrapper( IOptions options, INameResolver nameResolver) { - this.innerHost = innerHost ?? throw new ArgumentNullException(nameof(innerHost)); - this.innerWebJobsHost = (JobHost)this.innerHost.Services.GetService(); + this.InnerHost = innerHost ?? throw new ArgumentNullException(nameof(innerHost)); + this.innerWebJobsHost = (JobHost)this.InnerHost.Services.GetService(); this.options = options.Value; this.nameResolver = nameResolver; } @@ -236,8 +250,8 @@ internal FunctionsV2HostWrapper( IHost innerHost, IOptions options) { - this.innerHost = innerHost; - this.innerWebJobsHost = (JobHost)this.innerHost.Services.GetService(); + this.InnerHost = innerHost; + this.innerWebJobsHost = (JobHost)this.InnerHost.Services.GetService(); this.options = options.Value; } @@ -249,16 +263,16 @@ public Task CallAsync(MethodInfo method, IDictionary args) public void Dispose() { - this.innerHost.Dispose(); + this.InnerHost.Dispose(); } - public Task StartAsync() => this.innerHost.StartAsync(); + public Task StartAsync() => this.InnerHost.StartAsync(); public async Task StopAsync() { try { - await this.innerHost.StopAsync(); + await this.InnerHost.StopAsync(); } catch (OperationCanceledException) { diff --git a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj index e47d188ea..ec2d5dd68 100644 --- a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj +++ b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj @@ -16,7 +16,7 @@ - +