diff --git a/.github/ISSUE_TEMPLATE/new-release-template.md b/.github/ISSUE_TEMPLATE/new-release-template.md index eb472a05c..1a52e32a1 100644 --- a/.github/ISSUE_TEMPLATE/new-release-template.md +++ b/.github/ISSUE_TEMPLATE/new-release-template.md @@ -14,6 +14,7 @@ _Due: <2-3-business-days-before-release>_ - [ ] Delete DTFx test packages from the [ADO feed](https://dev.azure.com/durabletaskframework/Durable%20Task%20Framework%20CI/_artifacts/feed/durabletask). - [ ] Run the [DTFx release pipeline](https://durabletaskframework.visualstudio.com/Durable%20Task%20Framework%20CI/_build?definitionId=21) ([defined here](https://github.com/Azure/durabletask/blob/main/azure-pipelines-release.yml)) to obtain new packages. - [ ] Publish DTFx packages to the [ADO feed](https://dev.azure.com/durabletaskframework/Durable%20Task%20Framework%20CI/_artifacts/feed/durabletask) for testing. +- [ ] Keep branch `azure-storage-v12` updated with branch `main`. **Prep Release (assigned to: )** _Due: <2-business-days-before-release>_ @@ -23,6 +24,7 @@ _Due: <2-business-days-before-release>_ - [ ] Add the Durable Functions package to the [ADO test feed](https://dev.azure.com/durabletaskframework/Durable%20Task%20Framework%20CI/_artifacts/feed/durabletask-test). - [ ] Check for package size, make sure it's not surprisingly heavier than a previous release. - [ ] Merge (**choose create a merge commit, NOT squash merge**) dev into main. Person performing validation must approve PR. +- [ ] Keep branch `v3.x` updated with branch `dev`. Do not merge PRs that are specific to Durable Functions v2. **Validation** _Due: <1-business-days-before-release>_ diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index f1f890c4a..eedf52e97 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -20,4 +20,6 @@ resolves #issue_for_this_pr * [ ] My changes **do not** change the version of the WebJobs.Extensions.DurableTask package * [ ] Otherwise: major or minor version updates are reflected in `/src/Worker.Extensions.DurableTask/AssemblyInfo.cs` * [ ] My changes **do not** add EventIds to our EventSource logs - * [ ] Otherwise: Ensure the EventIds are within the supported range in our existing Windows infrastructure. You may validate this with a deployed app's telemetry. You may also extend the range by completing a PR such as [this one](https://msazure.visualstudio.com/One/_git/AAPT-Antares-Websites/pullrequest/7463263?_a=files). \ No newline at end of file + * [ ] Otherwise: Ensure the EventIds are within the supported range in our existing Windows infrastructure. You may validate this with a deployed app's telemetry. You may also extend the range by completing a PR such as [this one](https://msazure.visualstudio.com/One/_git/AAPT-Antares-Websites/pullrequest/7463263?_a=files). +* [ ] My changes **should** be added to v3.x branch. + * [ ] Otherwise: This change only applies to Durable Functions v2.x and **will not** be merged to branch v3.x. diff --git a/.github/workflows/smoketest-netherite-inproc-v4.yml b/.github/workflows/smoketest-netherite-inproc-v4.yml new file mode 100644 index 000000000..607e2ad71 --- /dev/null +++ b/.github/workflows/smoketest-netherite-inproc-v4.yml @@ -0,0 +1,22 @@ +name: Smoke Test - .NET in-proc w/ Netherite on Functions V4 + +on: + push: + branches: [ main, dev ] + paths: + - 'src/**' + - 'test/SmokeTests/BackendSmokeTests/Netherite/**' + pull_request: + branches: [ main, dev ] + paths: + - 'src/**' + - 'test/SmokeTests/BackendSmokeTests/Netherite/**' + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Run V4 .NET in-proc w/ Netherite Smoke Test + run: test/SmokeTests/e2e-test.ps1 -DockerfilePath test/SmokeTests/BackendSmokeTests/Netherite/Dockerfile -HttpStartPath api/DurableFunctionsHttpStart -ContainerName NetheriteApp + shell: pwsh diff --git a/.nuget/NuGet.Config b/.nuget/NuGet.Config deleted file mode 100644 index fe953401c..000000000 --- a/.nuget/NuGet.Config +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - - - - - - diff --git a/README.md b/README.md index 7aac43a27..92e1d4214 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Package Name | NuGet ---|--- Microsoft.Azure.WebJobs.Extensions.DurableTask | [![NuGet](https://img.shields.io/nuget/v/Microsoft.Azure.WebJobs.Extensions.DurableTask.svg)](https://www.nuget.org/packages/Microsoft.Azure.WebJobs.Extensions.DurableTask) Microsoft.Azure.WebJobs.Extensions.DurableTask.Analyzers (C# only) | [![NuGet](https://img.shields.io/nuget/v/Microsoft.Azure.WebJobs.Extensions.DurableTask.Analyzers.svg)](https://www.nuget.org/packages/Microsoft.Azure.WebJobs.Extensions.DurableTask.Analyzers) +Microsoft.Azure.Functions.Worker.Extensions.DurableTask | [![NuGet](https://img.shields.io/nuget/v/Microsoft.Azure.Functions.Worker.Extensions.DurableTask.svg)](https://www.nuget.org/packages/Microsoft.Azure.Functions.Worker.Extensions.DurableTask) ## Language support diff --git a/WebJobs.Extensions.DurableTask.sln b/WebJobs.Extensions.DurableTask.sln index d821ef64a..8efe24eee 100644 --- a/WebJobs.Extensions.DurableTask.sln +++ b/WebJobs.Extensions.DurableTask.sln @@ -9,11 +9,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{7EC858EE-348 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{78BCF152-C22C-408F-9FB1-0F8C99B154B5}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{51D8DCEA-EA91-410E-AA6A-F42473F0C691}" - ProjectSection(SolutionItems) = preProject - .nuget\NuGet.Config = .nuget\NuGet.Config - EndProjectSection -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extensions.DurableTask.Tests.V2", "test\FunctionsV2\WebJobs.Extensions.DurableTask.Tests.V2.csproj", "{F2A5DABB-36D4-4152-AF49-2570149899E1}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extensions.DurableTask.Tests.V1", "test\FunctionsV1\WebJobs.Extensions.DurableTask.Tests.V1.csproj", "{F99CA746-553C-43B4-943B-59A5D190459A}" @@ -23,6 +18,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution .editorconfig = .editorconfig azure-pipelines-release-dotnet-isolated.yml = azure-pipelines-release-dotnet-isolated.yml azure-pipelines-release.yml = azure-pipelines-release.yml + nuget.config = nuget.config README.md = README.md release_notes.md = release_notes.md .stylecop\stylecop.json = .stylecop\stylecop.json diff --git a/azure-pipelines-analyzer-release.yml b/azure-pipelines-analyzer-release.yml index 6b0d8974d..56fb72fc6 100644 --- a/azure-pipelines-analyzer-release.yml +++ b/azure-pipelines-analyzer-release.yml @@ -38,7 +38,7 @@ steps: command: restore projects: '**/**/*.csproj' feedsToUse: config - nugetConfigPath: '.nuget/nuget.config' + nugetConfigPath: 'nuget.config' # Build durable-analyzer - task: VSBuild@1 diff --git a/azure-pipelines-release.yml b/azure-pipelines-release.yml index b9ae7f45e..404ce9508 100644 --- a/azure-pipelines-release.yml +++ b/azure-pipelines-release.yml @@ -38,7 +38,7 @@ steps: command: restore projects: '**/**/*.csproj' feedsToUse: config - nugetConfigPath: '.nuget/nuget.config' + nugetConfigPath: 'nuget.config' # Build durable-extension - task: VSBuild@1 diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 17cf3fdc0..68ef305bf 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -23,7 +23,7 @@ jobs: command: 'restore' projects: 'test/FunctionsV1/*.csproj' feedsToUse: 'config' - nugetConfigPath: '.nuget/nuget.config' + nugetConfigPath: 'nuget.config' - task: DotNetCoreCLI@2 inputs: @@ -72,7 +72,7 @@ jobs: command: 'restore' projects: 'test/FunctionsV2/*.csproj' feedsToUse: 'config' - nugetConfigPath: '.nuget/nuget.config' + nugetConfigPath: 'nuget.config' - task: DotNetCoreCLI@2 inputs: @@ -119,7 +119,7 @@ jobs: command: 'restore' projects: 'test/WebJobs.Extensions.DurableTask.Analyzers.Test/*.csproj' feedsToUse: 'config' - nugetConfigPath: '.nuget/nuget.config' + nugetConfigPath: 'nuget.config' - task: DotNetCoreCLI@2 inputs: diff --git a/nuget.config b/nuget.config index 52ed3a3be..652118ea6 100644 --- a/nuget.config +++ b/nuget.config @@ -3,8 +3,7 @@ - - + diff --git a/release_notes.md b/release_notes.md index b0f5bd2a9..2d1644667 100644 --- a/release_notes.md +++ b/release_notes.md @@ -4,7 +4,7 @@ ### Bug Fixes -- Address input issues when using .NET isolated (#2581)[https://github.com/Azure/azure-functions-durable-extension/issues/2581] +- Fix support for distributed tracing v2 in dotnet-isolated and Java (https://github.com/Azure/azure-functions-durable-extension/pull/2634) ### Breaking Changes diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs index f395c23de..38151e3cc 100644 --- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs @@ -10,6 +10,7 @@ using DurableTask.AzureStorage.Tracking; using DurableTask.Core; using Microsoft.Extensions.Logging; +using Microsoft.WindowsAzure.Storage; using Newtonsoft.Json; using Newtonsoft.Json.Converters; using Newtonsoft.Json.Linq; @@ -200,6 +201,16 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC } #if !FUNCTIONS_V1 + + internal DurableTaskMetricsProvider GetMetricsProvider( + string functionName, + string hubName, + CloudStorageAccount storageAccount, + ILogger logger) + { + return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount); + } + /// public override bool TryGetScaleMonitor( string functionId, @@ -208,12 +219,31 @@ public override bool TryGetScaleMonitor( string connectionName, out IScaleMonitor scaleMonitor) { + CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(); + DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger); scaleMonitor = new DurableTaskScaleMonitor( functionId, functionName, hubName, - this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(), - this.logger); + storageAccount, + this.logger, + metricsProvider); + return true; + } + +#endif +#if FUNCTIONS_V3_OR_GREATER + public override bool TryGetTargetScaler( + string functionId, + string functionName, + string hubName, + string connectionName, + out ITargetScaler targetScaler) + { + // This is only called by the ScaleController, it doesn't run in the Functions Host process. + CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(); + DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger); + targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger); return true; } #endif diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs index 2bc6f22e2..529a7adab 100644 --- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs @@ -565,5 +565,27 @@ public virtual bool TryGetScaleMonitor( return false; } #endif + +#if FUNCTIONS_V3_OR_GREATER + /// + /// Tries to obtain a scaler for target based scaling. + /// + /// Function id. + /// Function name. + /// Task hub name. + /// The name of the storage-specific connection settings. + /// The target-based scaler. + /// True if target-based scaling is supported, false otherwise. + public virtual bool TryGetTargetScaler( + string functionId, + string functionName, + string hubName, + string connectionName, + out ITargetScaler targetScaler) + { + targetScaler = null; + return false; + } +#endif } } diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index cc1e63976..dd351c1fe 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -150,13 +150,13 @@ public DurableTaskExtension( this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver)); this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService)); - this.ResolveAppSettingOptions(); + DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver); ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName); this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents); this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper(); - this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories); + this.durabilityProviderFactory = GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories); this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider(); this.isOptionsConfigured = true; @@ -249,6 +249,8 @@ public string HubName internal DurableTaskOptions Options { get; } + internal DurabilityProvider DefaultDurabilityProvider => this.defaultDurabilityProvider; + internal HttpApiHandler HttpApiHandler { get; private set; } internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; } @@ -296,7 +298,7 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault); } - private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable orchestrationServiceFactories) + internal static IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable orchestrationServiceFactories) { bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType); @@ -578,32 +580,13 @@ private void StopLocalGrpcServer() } #endif - private void ResolveAppSettingOptions() - { - if (this.Options == null) - { - throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings."); - } - - if (this.nameResolver == null) - { - throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings."); - } - - if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName)) - { - // use the resolved task hub name - this.Options.HubName = taskHubName; - } - } - private void InitializeForFunctionsV1(ExtensionConfigContext context) { #if FUNCTIONS_V1 context.ApplyConfig(this.Options, "DurableTask"); this.nameResolver = context.Config.NameResolver; this.loggerFactory = context.Config.LoggerFactory; - this.ResolveAppSettingOptions(); + DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver); ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName); this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents); this.connectionInfoResolver = new WebJobsConnectionInfoProvider(); @@ -1573,59 +1556,6 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat activity.AddTag("DurableFunctionsRuntimeStatus", statusStr); } } - - internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string connectionName) - { - if (this.defaultDurabilityProvider.TryGetScaleMonitor( - functionId, - functionName.Name, - this.Options.HubName, - connectionName, - out IScaleMonitor scaleMonitor)) - { - return scaleMonitor; - } - else - { - // the durability provider does not support runtime scaling. - // Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on). - return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower()); - } - } - - /// - /// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. - /// This is required to allow operation of those providers even if runtime scaling is turned off - /// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. - /// - private sealed class NoOpScaleMonitor : IScaleMonitor - { - /// - /// Construct a placeholder scale monitor. - /// - /// A descriptive name. - public NoOpScaleMonitor(string name) - { - this.Descriptor = new ScaleMonitorDescriptor(name); - } - - /// - /// A descriptive name. - /// - public ScaleMonitorDescriptor Descriptor { get; private set; } - - /// - Task IScaleMonitor.GetMetricsAsync() - { - throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); - } - - /// - ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) - { - throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); - } - } #endif } } diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs index ac98d8632..bd793c17a 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs @@ -2,16 +2,19 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; -using System.Net.Http; -using System.Threading; +using System.Collections.Generic; +using System.Linq; #if !FUNCTIONS_V1 using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth; using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; #else using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; @@ -109,6 +112,27 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder) return builder; } +#if FUNCTIONS_V3_OR_GREATER + /// + /// Adds the and providers for the Durable Triggers. + /// + /// The to configure. + /// Returns the provided . + internal static IWebJobsBuilder AddDurableScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata) + { + // this segment adheres to the followings pattern: https://github.com/Azure/azure-sdk-for-net/pull/38756 + DurableTaskTriggersScaleProvider provider = null; + builder.Services.AddSingleton(serviceProvider => + { + provider = new DurableTaskTriggersScaleProvider(serviceProvider.GetService>(), serviceProvider.GetService(), serviceProvider.GetService(), serviceProvider.GetService>(), triggerMetadata); + return provider; + }); + builder.Services.AddSingleton(serviceProvider => serviceProvider.GetServices().Single(x => x == provider)); + builder.Services.AddSingleton(serviceProvider => serviceProvider.GetServices().Single(x => x == provider)); + return builder; + } +#endif + /// /// Adds the Durable Task extension to the provided . /// diff --git a/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs b/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs index 2c5085698..e730d6e71 100644 --- a/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs +++ b/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs @@ -116,8 +116,8 @@ public void FunctionScheduled( isReplay); this.logger.LogInformation( - "{instanceId}: Function '{functionName} ({functionType})' scheduled. Reason: {reason}. IsReplay: {isReplay}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", - instanceId, functionName, functionType, reason, isReplay, FunctionState.Scheduled, hubName, + "{instanceId}: Function '{functionName} ({functionType})' scheduled. Reason: {reason}. IsReplay: {isReplay}. State: {state}. RuntimeStatus: {runtimeStatus}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + instanceId, functionName, functionType, reason, isReplay, FunctionState.Scheduled, OrchestrationRuntimeStatus.Pending, hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); } } @@ -146,8 +146,8 @@ public void FunctionStarting( isReplay); this.logger.LogInformation( - "{instanceId}: Function '{functionName} ({functionType})' started. IsReplay: {isReplay}. Input: {input}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", - instanceId, functionName, functionType, isReplay, input, FunctionState.Started, hubName, + "{instanceId}: Function '{functionName} ({functionType})' started. IsReplay: {isReplay}. Input: {input}. State: {state}. RuntimeStatus: {runtimeStatus}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", + instanceId, functionName, functionType, isReplay, input, FunctionState.Started, OrchestrationRuntimeStatus.Running, hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++, taskEventId); } } @@ -233,9 +233,9 @@ public void FunctionCompleted( isReplay); this.logger.LogInformation( - "{instanceId}: Function '{functionName} ({functionType})' completed. ContinuedAsNew: {continuedAsNew}. IsReplay: {isReplay}. Output: {output}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", - instanceId, functionName, functionType, continuedAsNew, isReplay, output, FunctionState.Completed, - hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++, taskEventId); + "{instanceId}: Function '{functionName} ({functionType})' completed. ContinuedAsNew: {continuedAsNew}. IsReplay: {isReplay}. Output: {output}. State: {state}. RuntimeStatus: {runtimeStatus}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", + instanceId, functionName, functionType, continuedAsNew, isReplay, output, FunctionState.Completed, OrchestrationRuntimeStatus.Completed, hubName, + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++, taskEventId); } } @@ -279,9 +279,9 @@ public void FunctionTerminated( IsReplay: false); this.logger.LogWarning( - "{instanceId}: Function '{functionName} ({functionType})' was terminated. Reason: {reason}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", - instanceId, functionName, functionType, reason, FunctionState.Terminated, hubName, LocalAppName, - LocalSlotName, ExtensionVersion, this.sequenceNumber++); + "{instanceId}: Function '{functionName} ({functionType})' was terminated. Reason: {reason}. State: {state}. RuntimeStatus: {runtimeStatus}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + instanceId, functionName, functionType, reason, FunctionState.Terminated, OrchestrationRuntimeStatus.Terminated, hubName, + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); } public void SuspendingOrchestration( @@ -304,9 +304,9 @@ public void SuspendingOrchestration( IsReplay: false); this.logger.LogInformation( - "{instanceId}: Suspending function '{functionName} ({functionType})'. Reason: {reason}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", - instanceId, functionName, functionType, reason, FunctionState.Suspended, hubName, LocalAppName, - LocalSlotName, ExtensionVersion, this.sequenceNumber++); + "{instanceId}: Suspending function '{functionName} ({functionType})'. Reason: {reason}. State: {state}. RuntimeStatus: {runtimeStatus}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + instanceId, functionName, functionType, reason, FunctionState.Suspended, OrchestrationRuntimeStatus.Suspended, hubName, + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); } public void ResumingOrchestration( @@ -329,9 +329,9 @@ public void ResumingOrchestration( IsReplay: false); this.logger.LogInformation( - "{instanceId}: Resuming function '{functionName} ({functionType})'. Reason: {reason}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", - instanceId, functionName, functionType, reason, FunctionState.Scheduled, hubName, LocalAppName, - LocalSlotName, ExtensionVersion, this.sequenceNumber++); + "{instanceId}: Resuming function '{functionName} ({functionType})'. Reason: {reason}. State: {state}. RuntimeStatus: {runtimeStatus}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + instanceId, functionName, functionType, reason, FunctionState.Scheduled, OrchestrationRuntimeStatus.Running, hubName, + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); } public void FunctionRewound( @@ -355,8 +355,8 @@ public void FunctionRewound( this.logger.LogWarning( "{instanceId}: Function '{functionName} ({functionType})' was rewound. Reason: {reason}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", - instanceId, functionName, functionType, reason, FunctionState.Rewound, hubName, LocalAppName, - LocalSlotName, ExtensionVersion, this.sequenceNumber++); + instanceId, functionName, functionType, reason, FunctionState.Rewound, hubName, + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); } public void FunctionFailed( @@ -383,8 +383,8 @@ public void FunctionFailed( isReplay); this.logger.LogError( - "{instanceId}: Function '{functionName} ({functionType})' failed with an error. Reason: {reason}. IsReplay: {isReplay}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", - instanceId, functionName, functionType, reason, isReplay, FunctionState.Failed, hubName, + "{instanceId}: Function '{functionName} ({functionType})' failed with an error. Reason: {reason}. IsReplay: {isReplay}. State: {state}. RuntimeStatus: {runtimeStatus}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", + instanceId, functionName, functionType, reason, isReplay, FunctionState.Failed, OrchestrationRuntimeStatus.Failed, hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++, taskEventId); } } @@ -953,8 +953,8 @@ public void TimerExpired( this.logger.LogInformation( "{instanceId}: Function '{functionName} ({functionType})' was resumed by a timer scheduled for '{expirationTime}'. IsReplay: {isReplay}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", - instanceId, functionName, functionType, expirationTimeString, isReplay, FunctionState.TimerExpired, - hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); + instanceId, functionName, functionType, expirationTimeString, isReplay, FunctionState.TimerExpired, hubName, + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); } } diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs index e764ae623..4fa87cc07 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs @@ -2,12 +2,9 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; -using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; -using DurableTask.AzureStorage.Monitoring; -using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale; using Microsoft.Azure.WebJobs.Host.Listeners; #if !FUNCTIONS_V1 using Microsoft.Azure.WebJobs.Host.Scale; @@ -15,7 +12,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { -#if !FUNCTIONS_V1 +#if FUNCTIONS_V3_OR_GREATER + internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider, ITargetScalerProvider +#elif FUNCTIONS_V2_OR_GREATER internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider #else internal sealed class DurableTaskListener : IListener @@ -26,10 +25,15 @@ internal sealed class DurableTaskListener : IListener private readonly FunctionName functionName; private readonly FunctionType functionType; private readonly string connectionName; + #if !FUNCTIONS_V1 private readonly Lazy scaleMonitor; #endif +#if FUNCTIONS_V3_OR_GREATER + private readonly Lazy targetScaler; +#endif + public DurableTaskListener( DurableTaskExtension config, string functionId, @@ -48,12 +52,25 @@ public DurableTaskListener( this.functionName = functionName; this.functionType = functionType; this.connectionName = connectionName; + #if !FUNCTIONS_V1 this.scaleMonitor = new Lazy(() => - this.config.GetScaleMonitor( + ScaleUtils.GetScaleMonitor( + this.config.DefaultDurabilityProvider, this.functionId, this.functionName, - this.connectionName)); + this.connectionName, + this.config.Options.HubName)); + +#endif +#if FUNCTIONS_V3_OR_GREATER + this.targetScaler = new Lazy(() => + ScaleUtils.GetTargetScaler( + this.config.DefaultDurabilityProvider, + this.functionId, + this.functionName, + this.connectionName, + this.config.Options.HubName)); #endif } @@ -98,5 +115,12 @@ public IScaleMonitor GetMonitor() return this.scaleMonitor.Value; } #endif + +#if FUNCTIONS_V3_OR_GREATER + public ITargetScaler GetTargetScaler() + { + return this.targetScaler.Value; + } +#endif } } diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs new file mode 100644 index 000000000..e3565d169 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs @@ -0,0 +1,79 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +#if !FUNCTIONS_V1 +using System; +using System.Threading.Tasks; +using DurableTask.AzureStorage.Monitoring; +using Microsoft.Extensions.Logging; +using Microsoft.WindowsAzure.Storage; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + internal class DurableTaskMetricsProvider + { + private readonly string functionName; + private readonly string hubName; + private readonly ILogger logger; + private readonly CloudStorageAccount storageAccount; + + private DisconnectedPerformanceMonitor performanceMonitor; + + public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount) + { + this.functionName = functionName; + this.hubName = hubName; + this.logger = logger; + this.performanceMonitor = performanceMonitor; + this.storageAccount = storageAccount; + } + + public virtual async Task GetMetricsAsync() + { + DurableTaskTriggerMetrics metrics = new DurableTaskTriggerMetrics(); + + // Durable stores its own metrics, so we just collect them here + PerformanceHeartbeat heartbeat = null; + try + { + DisconnectedPerformanceMonitor performanceMonitor = this.GetPerformanceMonitor(); + heartbeat = await performanceMonitor.PulseAsync(); + } + catch (StorageException e) + { + this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName); + } + + if (heartbeat != null) + { + metrics.PartitionCount = heartbeat.PartitionCount; + metrics.ControlQueueLengths = JsonConvert.SerializeObject(heartbeat.ControlQueueLengths); + metrics.ControlQueueLatencies = JsonConvert.SerializeObject(heartbeat.ControlQueueLatencies); + metrics.WorkItemQueueLength = heartbeat.WorkItemQueueLength; + if (heartbeat.WorkItemQueueLatency > TimeSpan.Zero) + { + metrics.WorkItemQueueLatency = heartbeat.WorkItemQueueLatency.ToString(); + } + } + + return metrics; + } + + internal DisconnectedPerformanceMonitor GetPerformanceMonitor() + { + if (this.performanceMonitor == null) + { + if (this.storageAccount == null) + { + throw new ArgumentNullException(nameof(this.storageAccount)); + } + + this.performanceMonitor = new DisconnectedPerformanceMonitor(this.storageAccount, this.hubName); + } + + return this.performanceMonitor; + } + } +} +#endif \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs index 3bf18598d..4c05b3df0 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs @@ -22,6 +22,7 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor IScaleMonitor.GetMetricsAsync() @@ -72,33 +75,7 @@ async Task IScaleMonitor.GetMetricsAsync() public async Task GetMetricsAsync() { - DurableTaskTriggerMetrics metrics = new DurableTaskTriggerMetrics(); - - // Durable stores its own metrics, so we just collect them here - PerformanceHeartbeat heartbeat = null; - try - { - DisconnectedPerformanceMonitor performanceMonitor = this.GetPerformanceMonitor(); - heartbeat = await performanceMonitor.PulseAsync(); - } - catch (StorageException e) - { - this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName); - } - - if (heartbeat != null) - { - metrics.PartitionCount = heartbeat.PartitionCount; - metrics.ControlQueueLengths = JsonConvert.SerializeObject(heartbeat.ControlQueueLengths); - metrics.ControlQueueLatencies = JsonConvert.SerializeObject(heartbeat.ControlQueueLatencies); - metrics.WorkItemQueueLength = heartbeat.WorkItemQueueLength; - if (heartbeat.WorkItemQueueLatency > TimeSpan.Zero) - { - metrics.WorkItemQueueLatency = heartbeat.WorkItemQueueLatency.ToString(); - } - } - - return metrics; + return await this.durableTaskMetricsProvider.GetMetricsAsync(); } ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) @@ -151,7 +128,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric } } - DisconnectedPerformanceMonitor performanceMonitor = this.GetPerformanceMonitor(); + DisconnectedPerformanceMonitor performanceMonitor = this.durableTaskMetricsProvider.GetPerformanceMonitor(); var scaleRecommendation = performanceMonitor.MakeScaleRecommendation(workerCount, heartbeats.ToArray()); bool writeToUserLogs = false; diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs new file mode 100644 index 000000000..c1faa94f4 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs @@ -0,0 +1,94 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +#if FUNCTIONS_V3_OR_GREATER +#nullable enable +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + internal class DurableTaskTargetScaler : ITargetScaler + { + private readonly DurableTaskMetricsProvider metricsProvider; + private readonly TargetScalerResult scaleResult; + private readonly DurabilityProvider durabilityProvider; + private readonly ILogger logger; + private readonly string functionId; + + public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger) + { + this.functionId = functionId; + this.metricsProvider = metricsProvider; + this.scaleResult = new TargetScalerResult(); + this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId); + this.durabilityProvider = durabilityProvider; + this.logger = logger; + } + + public TargetScalerDescriptor TargetScalerDescriptor { get; } + + private int MaxConcurrentActivities => this.durabilityProvider.MaxConcurrentTaskActivityWorkItems; + + private int MaxConcurrentOrchestrators => this.durabilityProvider.MaxConcurrentTaskOrchestrationWorkItems; + + public async Task GetScaleResultAsync(TargetScalerContext context) + { + DurableTaskTriggerMetrics? metrics = null; + try + { + // This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process. + metrics = await this.metricsProvider.GetMetricsAsync(); + + // compute activityWorkers: the number of workers we need to process all activity messages + var workItemQueueLength = metrics.WorkItemQueueLength; + double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities); + + var serializedControlQueueLengths = metrics.ControlQueueLengths; + var controlQueueLengths = JsonConvert.DeserializeObject>(serializedControlQueueLengths); + + var controlQueueMessages = controlQueueLengths.Sum(); + var activeControlQueues = controlQueueLengths.Count(x => x > 0); + + // compute orchestratorWorkers: the number of workers we need to process all orchestrator messages. + // We bound this result to be no larger than the partition count + var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators); + var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers); + + int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers); + this.scaleResult.TargetWorkerCount = numWorkersToRequest; + + // When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table. + // This works because this code does not execute in the Functions Host process, but in the ScaleController process, + // and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto. + var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " + + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; + var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " + + metricsLog; + + // target worker count should never be negative + if (numWorkersToRequest < 0) + { + throw new InvalidOperationException("Number of workers to request cannot be negative"); + } + + this.logger.LogInformation(scaleControllerLog); + return this.scaleResult; + } + catch (Exception ex) + { + // We want to augment the exception with metrics information for investigation purposes + var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " + + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; + var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog; + throw new Exception(errorLog, ex); + } + } + } +} +#endif \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs index 43008bbd9..68fa8a10c 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See LICENSE in the project root for license information. #if !FUNCTIONS_V1 +using System.Collections.Generic; using Microsoft.Azure.WebJobs.Host.Scale; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask @@ -10,13 +11,13 @@ internal class DurableTaskTriggerMetrics : ScaleMetrics /// /// The number of partitions in the task hub. /// - public int PartitionCount { get; set; } + public virtual int PartitionCount { get; set; } /// /// The number of messages across control queues. This will /// be in the form of a serialized array of ints, e.g. "[1,2,3,4]". /// - public string ControlQueueLengths { get; set; } + public virtual string ControlQueueLengths { get; set; } /// /// The latency of messages across control queues. This will @@ -28,7 +29,7 @@ internal class DurableTaskTriggerMetrics : ScaleMetrics /// /// The number of messages in the work-item queue. /// - public int WorkItemQueueLength { get; set; } + public virtual int WorkItemQueueLength { get; set; } /// /// The approximate age of the first work-item queue message. This diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs index 37ff88ddf..caa5f04a5 100644 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs @@ -5,11 +5,13 @@ using System; using System.Collections.Generic; using System.IO; +using System.Reflection.Metadata.Ecma335; using System.Threading; using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Core.History; using DurableTask.Core.Query; +using DurableTask.Core.Serializing.Internal; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Hosting; @@ -142,83 +144,50 @@ public override Task Hello(Empty request, ServerCallContext context) public async override Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) { - var instance = new OrchestrationInstance - { - InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), - ExecutionId = Guid.NewGuid().ToString(), - }; - try { - await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( - new TaskMessage - { - Event = new ExecutionStartedEvent(-1, request.Input) - { - Name = request.Name, - Version = request.Version, - OrchestrationInstance = instance, - ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), - }, - OrchestrationInstance = instance, - }, - this.GetStatusesNotToOverride()); - + string instanceId = await this.GetClient(context).StartNewAsync( + request.Name, request.InstanceId, Raw(request.Input)); return new P.CreateInstanceResponse { - InstanceId = instance.InstanceId, + InstanceId = instanceId, }; } catch (InvalidOperationException) { - throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {instance.InstanceId} already exists.")); + throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); } } public async override Task RaiseEvent(P.RaiseEventRequest request, ServerCallContext context) { - await this.GetDurabilityProvider(context).SendTaskOrchestrationMessageAsync( - new TaskMessage - { - Event = new EventRaisedEvent(-1, request.Input) - { - Name = request.Name, - }, - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = request.InstanceId, - }, - }); - - // No fields in the response + await this.GetClient(context).RaiseEventAsync(request.InstanceId, request.Name, Raw(request.Input)); return new P.RaiseEventResponse(); } public async override Task TerminateInstance(P.TerminateRequest request, ServerCallContext context) { - await this.GetDurabilityProvider(context).ForceTerminateTaskOrchestrationAsync( - request.InstanceId, - request.Output); - - // No fields in the response + await this.GetClient(context).TerminateAsync(request.InstanceId, request.Output); return new P.TerminateResponse(); } public async override Task SuspendInstance(P.SuspendRequest request, ServerCallContext context) { - await this.GetDurabilityProvider(context).SuspendTaskOrchestrationAsync(request.InstanceId, request.Reason); + await this.GetClient(context).SuspendAsync(request.InstanceId, request.Reason); return new P.SuspendResponse(); } public async override Task ResumeInstance(P.ResumeRequest request, ServerCallContext context) { - await this.GetDurabilityProvider(context).ResumeTaskOrchestrationAsync(request.InstanceId, request.Reason); + await this.GetClient(context).ResumeAsync(request.InstanceId, request.Reason); return new P.ResumeResponse(); } public async override Task RewindInstance(P.RewindInstanceRequest request, ServerCallContext context) { - await this.GetDurabilityProvider(context).RewindAsync(request.InstanceId, request.Reason); +#pragma warning disable CS0618 // Type or member is obsolete + await this.GetClient(context).RewindAsync(request.InstanceId, request.Reason); +#pragma warning restore CS0618 // Type or member is obsolete return new P.RewindInstanceResponse(); } @@ -303,6 +272,13 @@ await this.GetDurabilityProvider(context).ForceTerminateTaskOrchestrationAsync( return CreateGetInstanceResponse(state, request); } +#pragma warning disable CS0618 // Type or member is obsolete -- 'internal' usage. + private static RawInput Raw(string input) + { + return new RawInput(input); + } +#pragma warning restore CS0618 // Type or member is obsolete + private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, P.GetInstanceRequest request) { return new P.GetInstanceResponse @@ -338,18 +314,21 @@ private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationStat }; } - private DurabilityProvider GetDurabilityProvider(ServerCallContext context) + private DurableClientAttribute GetAttribute(ServerCallContext context) { string? taskHub = context.RequestHeaders.GetValue("Durable-TaskHub"); string? connectionName = context.RequestHeaders.GetValue("Durable-ConnectionName"); - var attribute = new DurableClientAttribute() { TaskHub = taskHub, ConnectionName = connectionName }; - return this.extension.GetDurabilityProvider(attribute); + return new DurableClientAttribute() { TaskHub = taskHub, ConnectionName = connectionName }; + } + + private DurabilityProvider GetDurabilityProvider(ServerCallContext context) + { + return this.extension.GetDurabilityProvider(this.GetAttribute(context)); } - private OrchestrationStatus[] GetStatusesNotToOverride() + private IDurableClient GetClient(ServerCallContext context) { - OverridableStates overridableStates = this.extension.Options.OverridableExistingInstanceStates; - return overridableStates.ToDedupeStatuses(); + return this.extension.GetClient(this.GetAttribute(context)); } } } diff --git a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml index 518e1305d..f76c0ad61 100644 --- a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml +++ b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml @@ -2634,30 +2634,6 @@ This metadata will show up in Application Insights, if enabled. - - - A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. - This is required to allow operation of those providers even if runtime scaling is turned off - see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. - - - - - Construct a placeholder scale monitor. - - A descriptive name. - - - - A descriptive name. - - - - - - - - Extension for registering a Durable Functions configuration with JobHostConfiguration. @@ -4850,6 +4826,31 @@ The delegate to handle exception to determine if retries should proceed. + + + A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. + This is required to allow operation of those providers even if runtime scaling is turned off + see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. + + + + + Construct a placeholder scale monitor. + + A descriptive name. + The function ID. + + + + A descriptive name. + + + + + + + + Connection info provider which resolves connection information from a standard application (Non WebJob). diff --git a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs index 47249848a..e5eac789f 100644 --- a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs +++ b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Net.Http; using DurableTask.AzureStorage.Partitioning; +using Microsoft.Azure.WebJobs.Host; using Newtonsoft.Json; using Newtonsoft.Json.Converters; using Newtonsoft.Json.Linq; @@ -229,6 +230,25 @@ public string HubName // to mock the value from ExtensionConfigContext. It should not be used in production code paths. internal Func WebhookUriProviderOverride { get; set; } + internal static void ResolveAppSettingOptions(DurableTaskOptions options, INameResolver nameResolver) + { + if (options == null) + { + throw new InvalidOperationException($"{nameof(options)} must be set before resolving app settings."); + } + + if (nameResolver == null) + { + throw new InvalidOperationException($"{nameof(nameResolver)} must be set before resolving app settings."); + } + + if (nameResolver.TryResolveWholeString(options.HubName, out string taskHubName)) + { + // use the resolved task hub name + options.HubName = taskHubName; + } + } + /// /// Sets HubName to a value that is considered a default value. /// diff --git a/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs b/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs new file mode 100644 index 000000000..fd50f9e38 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs @@ -0,0 +1,129 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. +#nullable enable +#if FUNCTIONS_V3_OR_GREATER + +using System; +using System.Collections.Generic; +using System.ComponentModel; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale +{ + internal class DurableTaskTriggersScaleProvider : IScaleMonitorProvider, ITargetScalerProvider + { + private readonly IScaleMonitor monitor; + private readonly ITargetScaler targetScaler; + private readonly DurableTaskOptions options; + private readonly INameResolver nameResolver; + private readonly ILoggerFactory loggerFactory; + private readonly IEnumerable durabilityProviderFactories; + + public DurableTaskTriggersScaleProvider( + IOptions durableTaskOptions, + INameResolver nameResolver, + ILoggerFactory loggerFactory, + IEnumerable durabilityProviderFactories, + TriggerMetadata triggerMetadata) + { + this.options = durableTaskOptions.Value; + this.nameResolver = nameResolver; + this.loggerFactory = loggerFactory; + this.durabilityProviderFactories = durabilityProviderFactories; + + string functionId = triggerMetadata.FunctionName; + FunctionName functionName = new FunctionName(functionId); + + this.GetOptions(triggerMetadata); + + IDurabilityProviderFactory durabilityProviderFactory = this.GetDurabilityProviderFactory(); + DurabilityProvider defaultDurabilityProvider = durabilityProviderFactory.GetDurabilityProvider(); + + string? connectionName = durabilityProviderFactory is AzureStorageDurabilityProviderFactory azureStorageDurabilityProviderFactory + ? azureStorageDurabilityProviderFactory.DefaultConnectionName + : null; + + this.targetScaler = ScaleUtils.GetTargetScaler( + defaultDurabilityProvider, + functionId, + functionName, + connectionName, + this.options.HubName); + + this.monitor = ScaleUtils.GetScaleMonitor( + defaultDurabilityProvider, + functionId, + functionName, + connectionName, + this.options.HubName); + } + + private void GetOptions(TriggerMetadata triggerMetadata) + { + // the metadata is the sync triggers payload + var metadata = triggerMetadata.Metadata.ToObject(); + + // The property `taskHubName` is always expected in the SyncTriggers payload + this.options.HubName = metadata?.TaskHubName ?? throw new InvalidOperationException($"Expected `taskHubName` property in SyncTriggers payload but found none. Payload: {triggerMetadata.Metadata}"); + if (metadata?.MaxConcurrentActivityFunctions != null) + { + this.options.MaxConcurrentActivityFunctions = metadata?.MaxConcurrentActivityFunctions; + } + + if (metadata?.MaxConcurrentOrchestratorFunctions != null) + { + this.options.MaxConcurrentOrchestratorFunctions = metadata?.MaxConcurrentOrchestratorFunctions; + } + + if (metadata?.StorageProvider != null) + { + this.options.StorageProvider = metadata?.StorageProvider; + } + + DurableTaskOptions.ResolveAppSettingOptions(this.options, this.nameResolver); + } + + private IDurabilityProviderFactory GetDurabilityProviderFactory() + { + var logger = this.loggerFactory.CreateLogger(); + var durabilityProviderFactory = DurableTaskExtension.GetDurabilityProviderFactory(this.options, logger, this.durabilityProviderFactories); + return durabilityProviderFactory; + } + + public IScaleMonitor GetMonitor() + { + return this.monitor; + } + + public ITargetScaler GetTargetScaler() + { + return this.targetScaler; + } + + /// + /// Captures the relevant DF SyncTriggers JSON properties for making scaling decisions. + /// + internal class DurableTaskMetadata + { + [JsonProperty] + public string? TaskHubName { get; set; } + + [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(null)] + public int? MaxConcurrentOrchestratorFunctions { get; set; } + + [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(null)] + public int? MaxConcurrentActivityFunctions { get; set; } + + [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(null)] + public IDictionary? StorageProvider { get; set; } + } + } +} +#endif \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs b/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs new file mode 100644 index 000000000..0d60da23f --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs @@ -0,0 +1,120 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. +#nullable enable + +using System; +using System.Threading.Tasks; + +#if !FUNCTIONS_V1 +using Microsoft.Azure.WebJobs.Host.Scale; +#endif + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale +{ + internal static class ScaleUtils + { +#if !FUNCTIONS_V1 + internal static IScaleMonitor GetScaleMonitor(DurabilityProvider durabilityProvider, string functionId, FunctionName functionName, string? connectionName, string hubName) + { + if (durabilityProvider.TryGetScaleMonitor( + functionId, + functionName.Name, + hubName, + connectionName, + out IScaleMonitor scaleMonitor)) + { + return scaleMonitor; + } + else + { + // the durability provider does not support runtime scaling. + // Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on). + return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{hubName}".ToLower(), functionId); + } + } + + /// + /// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling. + /// This is required to allow operation of those providers even if runtime scaling is turned off + /// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018. + /// + internal sealed class NoOpScaleMonitor : IScaleMonitor + { + /// + /// Construct a placeholder scale monitor. + /// + /// A descriptive name. + /// The function ID. + public NoOpScaleMonitor(string name, string functionId) + { +#if FUNCTIONS_V3_OR_GREATER + this.Descriptor = new ScaleMonitorDescriptor(name, functionId); +#else +#pragma warning disable CS0618 // Type or member is obsolete + this.Descriptor = new ScaleMonitorDescriptor(name); +#pragma warning restore CS0618 // Type or member is obsolete +#endif + } + + /// + /// A descriptive name. + /// + public ScaleMonitorDescriptor Descriptor { get; private set; } + + /// + Task IScaleMonitor.GetMetricsAsync() + { + throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); + } + + /// + ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) + { + throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling"); + } + } +#endif + +#if FUNCTIONS_V3_OR_GREATER +#pragma warning disable SA1201 // Elements should appear in the correct order + internal static ITargetScaler GetTargetScaler(DurabilityProvider durabilityProvider, string functionId, FunctionName functionName, string? connectionName, string hubName) +#pragma warning restore SA1201 // Elements should appear in the correct order + { + if (durabilityProvider.TryGetTargetScaler( + functionId, + functionName.Name, + hubName, + connectionName, + out ITargetScaler targetScaler)) + { + return targetScaler; + } + else + { + // the durability provider does not support target-based scaling. + // Create an empty target scaler to avoid exceptions (unless target-based scaling is actually turned on). + return new NoOpTargetScaler(functionId); + } + } + + internal sealed class NoOpTargetScaler : ITargetScaler + { + /// + /// Construct a placeholder target scaler. + /// + /// The function ID. + public NoOpTargetScaler(string functionId) + { + this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId); + } + + public TargetScalerDescriptor TargetScalerDescriptor { get; } + + public Task GetScaleResultAsync(TargetScalerContext context) + { + throw new NotSupportedException("The current DurableTask backend configuration does not support target-based scaling"); + } + } +#endif + } +} diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index 8ee4cb106..83dd33581 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -6,7 +6,7 @@ Microsoft.Azure.WebJobs.Extensions.DurableTask 2 11 - 3 + 4 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 @@ -74,7 +74,7 @@ - + @@ -89,7 +89,7 @@ - + @@ -104,7 +104,7 @@ - + diff --git a/test/Common/TestHelpers.cs b/test/Common/TestHelpers.cs index ec1fd043c..83259db08 100644 --- a/test/Common/TestHelpers.cs +++ b/test/Common/TestHelpers.cs @@ -13,9 +13,8 @@ using Microsoft.ApplicationInsights.Channel; #if !FUNCTIONS_V1 using Microsoft.Extensions.Hosting; +using Microsoft.Azure.WebJobs.Host.Scale; #endif -using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; -using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -67,6 +66,9 @@ public static ITestHost GetJobHost( int entityMessageReorderWindowInMinutes = 30, string exactTaskHubName = null, bool addDurableClientFactory = false, +#if !FUNCTIONS_V1 + Action configureScaleOptions = null, +#endif Type[] types = null) { switch (storageProviderType) @@ -160,6 +162,7 @@ public static ITestHost GetJobHost( #if !FUNCTIONS_V1 addDurableClientFactory: addDurableClientFactory, types: types, + configureScaleOptions: configureScaleOptions, #endif durabilityProviderFactoryType: durabilityProviderFactoryType); } @@ -175,6 +178,9 @@ public static ITestHost GetJobHostWithOptions( Action onSend = null, Type durabilityProviderFactoryType = null, bool addDurableClientFactory = false, +#if !FUNCTIONS_V1 + Action configureScaleOptions = null, +#endif Type[] types = null) { if (serializerSettings == null) @@ -198,6 +204,7 @@ public static ITestHost GetJobHostWithOptions( durabilityProviderFactoryType: durabilityProviderFactoryType, addDurableClientFactory: addDurableClientFactory, typeLocator: typeLocator, + configureScaleOptions: configureScaleOptions, #endif loggerProvider: loggerProvider, nameResolver: testNameResolver, @@ -608,9 +615,9 @@ private static List GetLogs_OrchestrationEventGridApiReturnBadStatus(str { var list = new List() { - $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' scheduled. Reason: NewInstance. IsReplay: False. State: Scheduled.", - $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' started. IsReplay: False. Input: \"World\". State: Started.", - $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' completed. ContinuedAsNew: False. IsReplay: False. Output: \"Hello, World!\". State: Completed.", + $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' scheduled. Reason: NewInstance. IsReplay: False. State: Scheduled. RuntimeStatus: Pending.", + $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' started. IsReplay: False. Input: \"World\". State: Started. RuntimeStatus: Running.", + $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' completed. ContinuedAsNew: False. IsReplay: False. Output: \"Hello, World!\". State: Completed. RuntimeStatus: Completed.", $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' failed to send a 'Started' notification event to Azure Event Grid. Status code: 500. Details: {{\"message\":\"Exception has been thrown\"}}. ", $"{messageId}: Function '{functionNames[0]} ({FunctionType.Orchestrator})' failed to send a 'Completed' notification event to Azure Event Grid. Status code: 500. Details: {{\"message\":\"Exception has been thrown\"}}. ", }; diff --git a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs index 930ff4a1c..57566da29 100644 --- a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs +++ b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs @@ -40,6 +40,12 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output) ILogger logger = this.loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask")); this.traceHelper = new EndToEndTraceHelper(logger, false); this.performanceMonitor = new Mock(MockBehavior.Strict, this.storageAccount, this.hubName, (int?)null); + var metricsProvider = new DurableTaskMetricsProvider( + this.functionName.Name, + this.hubName, + logger, + this.performanceMonitor.Object, + this.storageAccount); this.scaleMonitor = new DurableTaskScaleMonitor( this.functionId, @@ -47,6 +53,7 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output) this.hubName, this.storageAccount, logger, + metricsProvider, this.performanceMonitor.Object); } diff --git a/test/FunctionsV2/DurableTaskTargetScalerTests.cs b/test/FunctionsV2/DurableTaskTargetScalerTests.cs new file mode 100644 index 000000000..cc2272e16 --- /dev/null +++ b/test/FunctionsV2/DurableTaskTargetScalerTests.cs @@ -0,0 +1,179 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System; +using System.Linq; +using System.Threading.Tasks; +using DurableTask.AzureStorage.Monitoring; +using DurableTask.Core; +using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Azure.WebJobs.Logging; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.WindowsAzure.Storage; +using Moq; +using Xunit; +using Xunit.Abstractions; +using static Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale.ScaleUtils; +using static Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests.PlatformSpecificHelpers; + +namespace WebJobs.Extensions.DurableTask.Tests.V2 +{ + public class DurableTaskTargetScalerTests + { + private readonly DurableTaskTargetScaler targetScaler; + private readonly TargetScalerContext scalerContext; + private readonly Mock metricsProviderMock; + private readonly Mock triggerMetricsMock; + private readonly Mock orchestrationServiceMock; + private readonly Mock durabilityProviderMock; + private readonly TestLoggerProvider loggerProvider; + private readonly ITestOutputHelper output; + + public DurableTaskTargetScalerTests(ITestOutputHelper output) + { + this.scalerContext = new TargetScalerContext(); + this.output = output; + var loggerFactory = new LoggerFactory(); + this.loggerProvider = new TestLoggerProvider(this.output); + loggerFactory.AddProvider(this.loggerProvider); + ILogger logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask")); + + DisconnectedPerformanceMonitor nullPerformanceMonitorMock = null; + CloudStorageAccount nullCloudStorageAccountMock = null; + this.metricsProviderMock = new Mock( + MockBehavior.Strict, + "FunctionName", + "HubName", + logger, + nullPerformanceMonitorMock, + nullCloudStorageAccountMock); + + this.triggerMetricsMock = new Mock(MockBehavior.Strict); + this.orchestrationServiceMock = new Mock(MockBehavior.Strict); + + this.durabilityProviderMock = new Mock( + MockBehavior.Strict, + "storageProviderName", + this.orchestrationServiceMock.Object, + new Mock().Object, + "connectionName"); + + this.targetScaler = new DurableTaskTargetScaler( + "FunctionId", + this.metricsProviderMock.Object, + this.durabilityProviderMock.Object, + logger); + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(1, 10, 10, "[1, 1, 1, 1]", 10)] + [InlineData(1, 10, 0, "[0, 0, 0, 0]", 0)] + [InlineData(1, 10, 0, "[2, 2, 3, 3]", 1)] + [InlineData(1, 10, 0, "[9999, 0, 0, 0]", 1)] + [InlineData(1, 10, 0, "[9999, 0, 0, 1]", 2)] + [InlineData(10, 10, 10, "[2, 2, 3, 3 ]", 1)] + [InlineData(10, 10, 30, "[10, 10, 10, 1]", 4)] + public async Task TestTargetScaler(int maxConcurrentActivities, int maxConcurrentOrchestrators, int workItemQueueLength, string controlQueueLengths, int expectedWorkerCount) + { + this.orchestrationServiceMock.SetupGet(m => m.MaxConcurrentTaskActivityWorkItems).Returns(maxConcurrentActivities); + this.orchestrationServiceMock.SetupGet(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(maxConcurrentOrchestrators); + + this.triggerMetricsMock.SetupGet(m => m.WorkItemQueueLength).Returns(workItemQueueLength); + this.triggerMetricsMock.SetupGet(m => m.ControlQueueLengths).Returns(controlQueueLengths); + + this.metricsProviderMock.Setup(m => m.GetMetricsAsync()).ReturnsAsync(this.triggerMetricsMock.Object); + + var scaleResult = await this.targetScaler.GetScaleResultAsync(this.scalerContext); + var targetWorkerCount = scaleResult.TargetWorkerCount; + Assert.Equal(expectedWorkerCount, targetWorkerCount); + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(true)] + [InlineData(false)] + public void TestGetTargetScaler(bool supportsTBS) + { + ITargetScaler targetScaler = new Mock().Object; + this.durabilityProviderMock.Setup(m => m.TryGetTargetScaler(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out targetScaler)) + .Returns(supportsTBS); + + var scaler = ScaleUtils.GetTargetScaler(this.durabilityProviderMock.Object, "FunctionId", new FunctionName("FunctionName"), "connectionName", "HubName"); + if (!supportsTBS) + { + Assert.IsType(scaler); + Assert.ThrowsAsync(() => scaler.GetScaleResultAsync(context: null)); + } + else + { + Assert.Equal(targetScaler, scaler); + } + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(true)] + [InlineData(false)] + public void TestGetScaleMonitor(bool supportsScaleMonitor) + { + IScaleMonitor scaleMonitor = new Mock().Object; + this.durabilityProviderMock.Setup(m => m.TryGetScaleMonitor(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out scaleMonitor)) + .Returns(supportsScaleMonitor); + + var monitor = ScaleUtils.GetScaleMonitor(this.durabilityProviderMock.Object, "FunctionId", new FunctionName("FunctionName"), "connectionName", "HubName"); + if (!supportsScaleMonitor) + { + Assert.IsType(monitor); + Assert.Throws(() => monitor.GetScaleStatus(context: null)); + } + else + { + Assert.Equal(scaleMonitor, monitor); + } + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [InlineData(true)] + [InlineData(false)] + public async void ScaleHostE2ETest(bool isTbsEnabled) + { + Action configureScaleOptions = (scaleOptions) => + { + scaleOptions.IsTargetScalingEnabled = isTbsEnabled; + scaleOptions.MetricsPurgeEnabled = false; + scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4); + scaleOptions.IsRuntimeScalingEnabled = true; + scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1); + }; + using (FunctionsV2HostWrapper host = (FunctionsV2HostWrapper)TestHelpers.GetJobHost(this.loggerProvider, nameof(this.ScaleHostE2ETest), enableExtendedSessions: false, configureScaleOptions: configureScaleOptions)) + { + await host.StartAsync(); + + IScaleStatusProvider scaleManager = host.InnerHost.Services.GetService(); + var client = await host.StartOrchestratorAsync(nameof(TestOrchestrations.FanOutFanIn), 50, this.output); + var status = await client.WaitForCompletionAsync(this.output, timeout: TimeSpan.FromSeconds(400)); + var scaleStatus = await scaleManager.GetScaleStatusAsync(new ScaleStatusContext()); + await host.StopAsync(); + Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus); + + // We inspect the Host's logs for evidence that the Host is correctly sampling our scaling requests. + // the expected logs depend on whether TBS is enabled or not + var expectedSubString = "scale monitors to sample"; + if (isTbsEnabled) + { + expectedSubString = "target scalers to sample"; + } + + bool containsExpectedLog = this.loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage ?? "").Any(p => p.Contains(expectedSubString)); + Assert.True(containsExpectedLog); + } + } + } +} diff --git a/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs b/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs index bfd72124f..4b2d46456 100644 --- a/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs +++ b/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs @@ -8,7 +8,7 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Channel; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; -using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; @@ -37,7 +37,8 @@ public static ITestHost CreateJobHost( IMessageSerializerSettingsFactory serializerSettingsFactory, Action onSend, bool addDurableClientFactory, - ITypeLocator typeLocator) + ITypeLocator typeLocator, + Action configureScaleOptions = null) { // Unless otherwise specified, use legacy partition management for tests as it makes the task hubs start up faster. // These tests run on a single task hub workers, so they don't test partition management anyways, and that is tested @@ -47,7 +48,7 @@ public static ITestHost CreateJobHost( options.Value.StorageProvider.Add(nameof(AzureStorageOptions.UseLegacyPartitionManagement), true); } - IHost host = new HostBuilder() + var hostBuilder = new HostBuilder() .ConfigureLogging( loggingBuilder => { @@ -97,9 +98,22 @@ public static ITestHost CreateJobHost( return telemetryActivator; }); } - }) - .Build(); + }); + + // if a configureScaleOptions action is provided, then we're probably trying to test the host's scaling logic + // we configure WebJobsScale and set the minimum logging level to `Debug`, as scaling logs are usually at the `Debug` level + if (configureScaleOptions != null) + { + hostBuilder.ConfigureWebJobsScale( + (context, builder) => + { + // ignore + }, + configureScaleOptions) + .ConfigureLogging(builder => builder.SetMinimumLevel(LogLevel.Debug)); + } + var host = hostBuilder.Build(); return new FunctionsV2HostWrapper(host, options, nameResolver); } @@ -216,7 +230,7 @@ private static IWebJobsBuilder AddEmulatorDurableTask(this IWebJobsBuilder build internal class FunctionsV2HostWrapper : ITestHost { - private readonly IHost innerHost; + internal readonly IHost InnerHost; private readonly JobHost innerWebJobsHost; private readonly DurableTaskOptions options; private readonly INameResolver nameResolver; @@ -226,8 +240,8 @@ public FunctionsV2HostWrapper( IOptions options, INameResolver nameResolver) { - this.innerHost = innerHost ?? throw new ArgumentNullException(nameof(innerHost)); - this.innerWebJobsHost = (JobHost)this.innerHost.Services.GetService(); + this.InnerHost = innerHost ?? throw new ArgumentNullException(nameof(innerHost)); + this.innerWebJobsHost = (JobHost)this.InnerHost.Services.GetService(); this.options = options.Value; this.nameResolver = nameResolver; } @@ -236,8 +250,8 @@ internal FunctionsV2HostWrapper( IHost innerHost, IOptions options) { - this.innerHost = innerHost; - this.innerWebJobsHost = (JobHost)this.innerHost.Services.GetService(); + this.InnerHost = innerHost; + this.innerWebJobsHost = (JobHost)this.InnerHost.Services.GetService(); this.options = options.Value; } @@ -249,16 +263,16 @@ public Task CallAsync(MethodInfo method, IDictionary args) public void Dispose() { - this.innerHost.Dispose(); + this.InnerHost.Dispose(); } - public Task StartAsync() => this.innerHost.StartAsync(); + public Task StartAsync() => this.InnerHost.StartAsync(); public async Task StopAsync() { try { - await this.innerHost.StopAsync(); + await this.InnerHost.StopAsync(); } catch (OperationCanceledException) { diff --git a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj index 4840ca88a..ec2d5dd68 100644 --- a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj +++ b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj @@ -16,7 +16,7 @@ - + diff --git a/test/SmokeTests/BackendSmokeTests/Netherite/.gitignore b/test/SmokeTests/BackendSmokeTests/Netherite/.gitignore new file mode 100644 index 000000000..ff5b00c50 --- /dev/null +++ b/test/SmokeTests/BackendSmokeTests/Netherite/.gitignore @@ -0,0 +1,264 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. + +# Azure Functions localsettings file +local.settings.json + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# DNX +project.lock.json +project.fragment.lock.json +artifacts/ + +*_i.c +*_p.c +*_i.h +*.ilk +*.meta +*.obj +*.pch +*.pdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# TODO: Comment the next line if you want to checkin your web deploy settings +# but database connection strings (with potential passwords) will be unencrypted +#*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/packages/* +# except build/, which is used as an MSBuild target. +!**/packages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/packages/repositories.config +# NuGet v3's project.json files produces more ignoreable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +node_modules/ +orleans.codegen.cs + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm + +# SQL Server files +*.mdf +*.ldf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# JetBrains Rider +.idea/ +*.sln.iml + +# CodeRush +.cr/ + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc \ No newline at end of file diff --git a/test/SmokeTests/BackendSmokeTests/Netherite/Dockerfile b/test/SmokeTests/BackendSmokeTests/Netherite/Dockerfile new file mode 100644 index 000000000..82adb8e27 --- /dev/null +++ b/test/SmokeTests/BackendSmokeTests/Netherite/Dockerfile @@ -0,0 +1,16 @@ +# Copyright (c) .NET Foundation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root for license information. + +FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build-env + +# Build the app +COPY . /root +RUN cd /root/test/SmokeTests/BackendSmokeTests/Netherite && \ + mkdir -p /home/site/wwwroot && \ + dotnet publish -c Release --output /home/site/wwwroot + +# Deploy the app +FROM mcr.microsoft.com/azure-functions/dotnet:4 +ENV AzureWebJobsScriptRoot=/home/site/wwwroot \ + AzureFunctionsJobHost__Logging__Console__IsEnabled=true +COPY --from=build-env ["/home/site/wwwroot", "/home/site/wwwroot"] diff --git a/test/SmokeTests/BackendSmokeTests/Netherite/DurableFunctionsOrchestrationCSharp.cs b/test/SmokeTests/BackendSmokeTests/Netherite/DurableFunctionsOrchestrationCSharp.cs new file mode 100644 index 000000000..5dc6d7c3e --- /dev/null +++ b/test/SmokeTests/BackendSmokeTests/Netherite/DurableFunctionsOrchestrationCSharp.cs @@ -0,0 +1,50 @@ +using System.Collections.Generic; +using System.Net.Http; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs; +using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Microsoft.Azure.WebJobs.Extensions.Http; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Extensions.Logging; + +namespace Company.Function +{ + public static class DurableFunctionsOrchestrationCSharp + { + [FunctionName("DurableFunctionsOrchestrationCSharp")] + public static async Task> RunOrchestrator( + [OrchestrationTrigger] IDurableOrchestrationContext context) + { + var outputs = new List(); + + // Replace "hello" with the name of your Durable Activity Function. + outputs.Add(await context.CallActivityAsync(nameof(SayHello), "Tokyo")); + outputs.Add(await context.CallActivityAsync(nameof(SayHello), "Seattle")); + outputs.Add(await context.CallActivityAsync(nameof(SayHello), "London")); + + // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] + return outputs; + } + + [FunctionName(nameof(SayHello))] + public static string SayHello([ActivityTrigger] string name, ILogger log) + { + log.LogInformation("Saying hello to {name}.", name); + return $"Hello {name}!"; + } + + [FunctionName("DurableFunctionsHttpStart")] + public static async Task HttpStart( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req, + [DurableClient] IDurableOrchestrationClient starter, + ILogger log) + { + // Function input comes from the request content. + string instanceId = await starter.StartNewAsync("DurableFunctionsOrchestrationCSharp", null); + + log.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId); + + return starter.CreateCheckStatusResponse(req, instanceId); + } + } +} \ No newline at end of file diff --git a/test/SmokeTests/BackendSmokeTests/Netherite/Netherite.csproj b/test/SmokeTests/BackendSmokeTests/Netherite/Netherite.csproj new file mode 100644 index 000000000..f1d8b2660 --- /dev/null +++ b/test/SmokeTests/BackendSmokeTests/Netherite/Netherite.csproj @@ -0,0 +1,20 @@ + + + net6.0 + v4 + + + + + + + + + PreserveNewest + + + PreserveNewest + Never + + + diff --git a/test/SmokeTests/BackendSmokeTests/Netherite/Nuget.config b/test/SmokeTests/BackendSmokeTests/Netherite/Nuget.config new file mode 100644 index 000000000..440b9b697 --- /dev/null +++ b/test/SmokeTests/BackendSmokeTests/Netherite/Nuget.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/test/SmokeTests/BackendSmokeTests/Netherite/host.json b/test/SmokeTests/BackendSmokeTests/Netherite/host.json new file mode 100644 index 000000000..35cc1adde --- /dev/null +++ b/test/SmokeTests/BackendSmokeTests/Netherite/host.json @@ -0,0 +1,23 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + }, + "enableLiveMetricsFilters": true + } + }, + "extensions": { + "durableTask": { + "useGracefulShutdown": true, + "storageProvider": { + "type": "Netherite", + "partitionCount": 12, + "StorageConnectionName": "AzureWebJobsStorage", + "EventHubsConnectionName": "SingleHost" + } + } + } +} \ No newline at end of file diff --git a/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj b/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj index 365d9c3e6..9dfbda77f 100644 --- a/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj +++ b/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj @@ -11,14 +11,19 @@ - + + + - + PreserveNewest diff --git a/test/SmokeTests/e2e-test.ps1 b/test/SmokeTests/e2e-test.ps1 index 842ec7535..44c1279b8 100644 --- a/test/SmokeTests/e2e-test.ps1 +++ b/test/SmokeTests/e2e-test.ps1 @@ -9,7 +9,7 @@ param( [string]$ContainerName="app", [switch]$NoSetup=$false, [switch]$NoValidation=$false, - [string]$AzuriteVersion="3.20.1", + [string]$AzuriteVersion="3.26.0", [int]$Sleep=30 ) @@ -92,4 +92,4 @@ try { throw } -Write-Host "Success!" -ForegroundColor Green \ No newline at end of file +Write-Host "Success!" -ForegroundColor Green