Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry Pick] Make scalers into per-task hub singletons #2967 #2984

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;

private readonly object initLock = new object();

#if !FUNCTIONS_V1
private DurableTaskScaleMonitor singletonScaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private DurableTaskTargetScaler singletonTargetScaler;
#endif

public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageServiceClientProviderFactory clientProviderFactory,
Expand Down Expand Up @@ -226,12 +236,18 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger)
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger)
{
return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccountClientProvider);
}

// Common routine for getting the scaler ID. Note that we MUST use the same ID for both the
// scale monitor and the target scaler.
private static string GetScalerUniqueId(string hubName)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccountClientProvider);
return $"DurableTask-AzureStorage:{hubName ?? "default"}";
}

/// <inheritdoc/>
Expand All @@ -242,16 +258,28 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccountClientProvider,
this.logger,
metricsProvider);
return true;
lock (this.initLock)
{
if (this.singletonScaleMonitor == null)
{
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(
hubName,
this.clientProviderFactory.GetClientProvider(connectionName),
this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = GetScalerUniqueId(hubName);
this.singletonScaleMonitor = new DurableTaskScaleMonitor(
id,
hubName,
this.logger,
metricsProvider);
}

scaleMonitor = this.singletonScaleMonitor;
return true;
}
}

#endif
Expand All @@ -263,11 +291,25 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
lock (this.initLock)
{
if (this.singletonTargetScaler == null)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(
hubName,
this.clientProviderFactory.GetClientProvider(connectionName),
this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = GetScalerUniqueId(hubName);
this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
}

targetScaler = this.singletonTargetScaler;
return true;
}
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly StorageAccountClientProvider storageAccountClientProvider;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, StorageAccountClientProvider storageAccountClientProvider)
public DurableTaskMetricsProvider(
string hubName,
ILogger logger,
DisconnectedPerformanceMonitor performanceMonitor,
StorageAccountClientProvider storageAccountClientProvider)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
Expand All @@ -43,7 +45,7 @@ public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
}
catch (Exception e) when (e.InnerException is RequestFailedException)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}

if (heartbeat != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure;
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
Expand All @@ -17,42 +15,29 @@
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
{
private readonly string functionId;
private readonly string functionName;
private readonly string hubName;
private readonly StorageAccountClientProvider storageAccountClientProvider;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
private readonly ILogger logger;
private readonly DurableTaskMetricsProvider durableTaskMetricsProvider;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskScaleMonitor(
string functionId,
string functionName,
string id,
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger,
DurableTaskMetricsProvider durableTaskMetricsProvider,
DisconnectedPerformanceMonitor performanceMonitor = null)
DurableTaskMetricsProvider durableTaskMetricsProvider)
{
this.functionId = functionId;
this.functionName = functionName;
this.hubName = hubName;
this.storageAccountClientProvider = storageAccountClientProvider;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;

#if FUNCTIONS_V3_OR_GREATER
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
#else
#pragma warning disable CS0618 // Type or member is obsolete.

// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id);

Check warning on line 40 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / build

'ScaleMonitorDescriptor.ScaleMonitorDescriptor(string)' is obsolete: 'This constructor is obsolete. Use the version that takes function id instead.'

Check warning on line 40 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / build

'ScaleMonitorDescriptor.ScaleMonitorDescriptor(string)' is obsolete: 'This constructor is obsolete. Use the version that takes function id instead.'

Check warning on line 40 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / build

'ScaleMonitorDescriptor.ScaleMonitorDescriptor(string)' is obsolete: 'This constructor is obsolete. Use the version that takes function id instead.'
#endif
}

Expand Down Expand Up @@ -151,9 +136,10 @@
if (writeToUserLogs)
{
this.logger.LogInformation(
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
this.functionName);
scaleStatus.Vote,
scaleRecommendation?.Reason);
}

return scaleStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
private readonly string functionId;
private readonly string scaler;

public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
public DurableTaskTargetScaler(
string scalerId,
DurableTaskMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider,
ILogger logger)
{
this.functionId = functionId;
this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
Expand Down Expand Up @@ -68,7 +72,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
Expand All @@ -85,7 +89,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<MinorVersion>0</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix>rc.2</VersionSuffix>
<VersionSuffix>rc.3</VersionSuffix>
<FileVersion>$(MajorVersion).$(MinorVersion).$(PatchVersion)</FileVersion>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<Company>Microsoft Corporation</Company>
Expand Down
12 changes: 8 additions & 4 deletions test/Common/TestEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,14 @@ public static async Task HttpEntity(

private static async Task<int> CallHttpAsync(string requestUri)
{
using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri))
{
return (int)response.StatusCode;
}
////using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri))
////{
//// return (int)response.StatusCode;
////}

// Making real calls to HTTP endpoints is not reliable in tests.
await Task.Delay(100);
return 200;
}

//-------------- an entity that uses custom deserialization
Expand Down
2 changes: 1 addition & 1 deletion test/Common/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
using DurableTask.AzureStorage;
using Microsoft.ApplicationInsights.Channel;
#if !FUNCTIONS_V1
using Microsoft.Extensions.Hosting;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Hosting;
#endif
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Storage;
using Microsoft.Azure.WebJobs.Host.TestCommon;
Expand Down
7 changes: 2 additions & 5 deletions test/FunctionsV2/DurableTaskListenerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Linq;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
Expand Down Expand Up @@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue()
IScaleMonitor scaleMonitor = this.listener.GetMonitor();

Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType());
Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTask-AzureStorage:DurableTaskHub", scaleMonitor.Descriptor.Id);

var scaleMonitor2 = this.listener.GetMonitor();
IScaleMonitor scaleMonitor2 = this.listener.GetMonitor();

Assert.Same(scaleMonitor, scaleMonitor2);
}
Expand Down
17 changes: 7 additions & 10 deletions test/FunctionsV2/DurableTaskScaleMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class DurableTaskScaleMonitorTests
{
private readonly string functionId = "DurableTaskTriggerFunctionId";
private readonly FunctionName functionName = new FunctionName("DurableTaskTriggerFunctionName");
private readonly string hubName = "DurableTaskTriggerHubName";
private readonly StorageAccountClientProvider clientProvider = new StorageAccountClientProvider(TestHelpers.GetStorageConnectionString());
private readonly ITestOutputHelper output;
Expand All @@ -39,32 +37,31 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
this.loggerFactory.AddProvider(this.loggerProvider);
ILogger logger = this.loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask"));
this.traceHelper = new EndToEndTraceHelper(logger, false);
this.performanceMonitor = new Mock<DisconnectedPerformanceMonitor>(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings{
this.performanceMonitor = new Mock<DisconnectedPerformanceMonitor>(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings
{
StorageAccountClientProvider = this.clientProvider,
TaskHubName = this.hubName,
});
var metricsProvider = new DurableTaskMetricsProvider(
this.functionName.Name,
this.hubName,
logger,
this.performanceMonitor.Object,
this.clientProvider);

string scalerId = $"DurableTask-AzureStorage:{this.hubName}";

this.scaleMonitor = new DurableTaskScaleMonitor(
this.functionId,
this.functionName.Name,
scalerId,
this.hubName,
this.clientProvider,
logger,
metricsProvider,
this.performanceMonitor.Object);
metricsProvider);
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void ScaleMonitorDescriptor_ReturnsExpectedValue()
{
Assert.Equal($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTask-AzureStorage:{this.hubName}", this.scaleMonitor.Descriptor.Id);
}

[Fact]
Expand Down
1 change: 0 additions & 1 deletion test/FunctionsV2/DurableTaskTargetScalerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output)
StorageAccountClientProvider storageAccountClientProvider = null;
this.metricsProviderMock = new Mock<DurableTaskMetricsProvider>(
MockBehavior.Strict,
"FunctionName",
"HubName",
logger,
nullPerformanceMonitorMock,
Expand Down
Loading