Skip to content

Commit

Permalink
Make scalers into per-task hub singletons
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum committed Nov 15, 2024
1 parent ac1a650 commit 8359496
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;

private readonly object initLock = new object();

#if !FUNCTIONS_V1
private DurableTaskScaleMonitor singletonScaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private DurableTaskTargetScaler singletonTargetScaler;
#endif

public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageAccountProvider storageAccountProvider,
Expand Down Expand Up @@ -226,12 +236,11 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccount);
}

/// <inheritdoc/>
Expand All @@ -242,16 +251,22 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccount,
this.logger,
metricsProvider);
return true;
lock (this.initLock)
{
if (this.singletonScaleMonitor == null)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);
this.singletonScaleMonitor = new DurableTaskScaleMonitor(
hubName,
storageAccount,
this.logger,
metricsProvider);
}

scaleMonitor = this.singletonScaleMonitor;
return true;
}
}

#endif
Expand All @@ -263,11 +278,23 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
lock (this.initLock)
{
if (this.singletonTargetScaler == null)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = $"DurableTask-AzureStorage:{hubName ?? "default"}";
this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
}

targetScaler = this.singletonTargetScaler;
return true;
}
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly CloudStorageAccount storageAccount;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount)
public DurableTaskMetricsProvider(
string hubName,
ILogger logger,
DisconnectedPerformanceMonitor performanceMonitor,
CloudStorageAccount storageAccount)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
Expand All @@ -42,7 +44,7 @@ public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
}
catch (StorageException e)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}

if (heartbeat != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
{
private readonly string functionId;
private readonly string functionName;
private readonly string hubName;
private readonly CloudStorageAccount storageAccount;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
Expand All @@ -27,30 +25,29 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTrigger
private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskScaleMonitor(
string functionId,
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger,
DurableTaskMetricsProvider durableTaskMetricsProvider,
DisconnectedPerformanceMonitor performanceMonitor = null)
{
this.functionId = functionId;
this.functionName = functionName;
this.hubName = hubName;
this.storageAccount = storageAccount;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;

#if FUNCTIONS_V3_OR_GREATER
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = $"DurableTask-AzureStorage:{hubName ?? "default"}";
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
#else
#pragma warning disable CS0618 // Type or member is obsolete.

// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
#endif
}
Expand Down Expand Up @@ -150,9 +147,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
if (writeToUserLogs)
{
this.logger.LogInformation(
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
this.functionName);
scaleStatus.Vote,
scaleRecommendation?.Reason);
}

return scaleStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
private readonly string functionId;
private readonly string scaler;

public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
public DurableTaskTargetScaler(
string scalerId,
DurableTaskMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider,
ILogger logger)
{
this.functionId = functionId;
this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
Expand Down Expand Up @@ -68,7 +72,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
Expand All @@ -85,7 +89,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
Expand Down

0 comments on commit 8359496

Please sign in to comment.