Skip to content

Commit

Permalink
Make scalers into per-task hub singletons (#2967)
Browse files Browse the repository at this point in the history
This addresses an issue where the costs associated with polling storage resources increase substantially when an app has a large number of durable-trigger functions compared to apps that only have a small number.
  • Loading branch information
cgillum authored Nov 21, 2024
1 parent ac1a650 commit 7801ed8
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;

private readonly object initLock = new object();

#if !FUNCTIONS_V1
private DurableTaskScaleMonitor singletonScaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private DurableTaskTargetScaler singletonTargetScaler;
#endif

public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageAccountProvider storageAccountProvider,
Expand Down Expand Up @@ -226,12 +236,11 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccount);
}

/// <inheritdoc/>
Expand All @@ -242,16 +251,22 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccount,
this.logger,
metricsProvider);
return true;
lock (this.initLock)
{
if (this.singletonScaleMonitor == null)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);
this.singletonScaleMonitor = new DurableTaskScaleMonitor(
hubName,
storageAccount,
this.logger,
metricsProvider);
}

scaleMonitor = this.singletonScaleMonitor;
return true;
}
}

#endif
Expand All @@ -263,11 +278,23 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
lock (this.initLock)
{
if (this.singletonTargetScaler == null)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = $"DurableTask-AzureStorage:{hubName ?? "default"}";
this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
}

targetScaler = this.singletonTargetScaler;
return true;
}
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly CloudStorageAccount storageAccount;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount)
public DurableTaskMetricsProvider(
string hubName,
ILogger logger,
DisconnectedPerformanceMonitor performanceMonitor,
CloudStorageAccount storageAccount)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
Expand All @@ -42,7 +44,7 @@ public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
}
catch (StorageException e)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}

if (heartbeat != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
{
private readonly string functionId;
private readonly string functionName;
private readonly string hubName;
private readonly CloudStorageAccount storageAccount;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
Expand All @@ -27,31 +25,27 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTrigger
private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskScaleMonitor(
string functionId,
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger,
DurableTaskMetricsProvider durableTaskMetricsProvider,
DisconnectedPerformanceMonitor performanceMonitor = null)
{
this.functionId = functionId;
this.functionName = functionName;
this.hubName = hubName;
this.storageAccount = storageAccount;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;

string id = $"DurableTaskTrigger-{this.hubName}".ToLower();
#if FUNCTIONS_V3_OR_GREATER
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
#else
#pragma warning disable CS0618 // Type or member is obsolete.

// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id);
#endif
}

Expand Down Expand Up @@ -150,9 +144,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
if (writeToUserLogs)
{
this.logger.LogInformation(
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
this.functionName);
scaleStatus.Vote,
scaleRecommendation?.Reason);
}

return scaleStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
private readonly string functionId;
private readonly string scaler;

public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
public DurableTaskTargetScaler(
string scalerId,
DurableTaskMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider,
ILogger logger)
{
this.functionId = functionId;
this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
Expand Down Expand Up @@ -68,7 +72,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
Expand All @@ -85,7 +89,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
Expand Down
7 changes: 2 additions & 5 deletions test/FunctionsV2/DurableTaskListenerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Linq;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
Expand Down Expand Up @@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue()
IScaleMonitor scaleMonitor = this.listener.GetMonitor();

Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType());
Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);

var scaleMonitor2 = this.listener.GetMonitor();
IScaleMonitor scaleMonitor2 = this.listener.GetMonitor();

Assert.Same(scaleMonitor, scaleMonitor2);
}
Expand Down
7 changes: 1 addition & 6 deletions test/FunctionsV2/DurableTaskScaleMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class DurableTaskScaleMonitorTests
{
private readonly string functionId = "DurableTaskTriggerFunctionId";
private readonly FunctionName functionName = new FunctionName("DurableTaskTriggerFunctionName");
private readonly string hubName = "DurableTaskTriggerHubName";
private readonly CloudStorageAccount storageAccount = CloudStorageAccount.Parse(TestHelpers.GetStorageConnectionString());
private readonly ITestOutputHelper output;
Expand All @@ -41,15 +39,12 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
this.traceHelper = new EndToEndTraceHelper(logger, false);
this.performanceMonitor = new Mock<DisconnectedPerformanceMonitor>(MockBehavior.Strict, this.storageAccount, this.hubName, (int?)null);
var metricsProvider = new DurableTaskMetricsProvider(
this.functionName.Name,
this.hubName,
logger,
this.performanceMonitor.Object,
this.storageAccount);

this.scaleMonitor = new DurableTaskScaleMonitor(
this.functionId,
this.functionName.Name,
this.hubName,
this.storageAccount,
logger,
Expand All @@ -61,7 +56,7 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void ScaleMonitorDescriptor_ReturnsExpectedValue()
{
Assert.Equal($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id);
}

[Fact]
Expand Down
1 change: 0 additions & 1 deletion test/FunctionsV2/DurableTaskTargetScalerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output)
CloudStorageAccount nullCloudStorageAccountMock = null;
this.metricsProviderMock = new Mock<DurableTaskMetricsProvider>(
MockBehavior.Strict,
"FunctionName",
"HubName",
logger,
nullPerformanceMonitorMock,
Expand Down

0 comments on commit 7801ed8

Please sign in to comment.