Skip to content

Commit

Permalink
Add ScaleController V3 integration (#2462)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmrdavid authored Oct 6, 2023
1 parent 8d03b9a commit 731d809
Show file tree
Hide file tree
Showing 15 changed files with 520 additions and 218 deletions.
131 changes: 6 additions & 125 deletions src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ public DurableTaskExtension(
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService));
this.ResolveAppSettingOptions();
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);

ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName);

this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper();
this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
this.durabilityProviderFactory = GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider();
this.isOptionsConfigured = true;

Expand Down Expand Up @@ -249,6 +249,8 @@ public string HubName

internal DurableTaskOptions Options { get; }

internal DurabilityProvider DefaultDurabilityProvider => this.defaultDurabilityProvider;

internal HttpApiHandler HttpApiHandler { get; private set; }

internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; }
Expand Down Expand Up @@ -296,7 +298,7 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}

private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
internal static IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
{
bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType);

Expand Down Expand Up @@ -578,32 +580,13 @@ private void StopLocalGrpcServer()
}
#endif

private void ResolveAppSettingOptions()
{
if (this.Options == null)
{
throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings.");
}

if (this.nameResolver == null)
{
throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings.");
}

if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName))
{
// use the resolved task hub name
this.Options.HubName = taskHubName;
}
}

private void InitializeForFunctionsV1(ExtensionConfigContext context)
{
#if FUNCTIONS_V1
context.ApplyConfig(this.Options, "DurableTask");
this.nameResolver = context.Config.NameResolver;
this.loggerFactory = context.Config.LoggerFactory;
this.ResolveAppSettingOptions();
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);
ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName);
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.connectionInfoResolver = new WebJobsConnectionInfoProvider();
Expand Down Expand Up @@ -1573,108 +1556,6 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat
activity.AddTag("DurableFunctionsRuntimeStatus", statusStr);
}
}

internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string connectionName)
{
if (this.defaultDurabilityProvider.TryGetScaleMonitor(
functionId,
functionName.Name,
this.Options.HubName,
connectionName,
out IScaleMonitor scaleMonitor))
{
return scaleMonitor;
}
else
{
// the durability provider does not support runtime scaling.
// Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower(), functionId);
}
}
#endif
#if FUNCTIONS_V3_OR_GREATER

internal ITargetScaler GetTargetScaler(string functionId, FunctionName functionName, string connectionName)
{
if (this.defaultDurabilityProvider.TryGetTargetScaler(
functionId,
functionName.Name,
this.Options.HubName,
connectionName,
out ITargetScaler targetScaler))
{
return targetScaler;
}
else
{
// the durability provider does not support target-based scaling.
// Create an empty target scaler to avoid exceptions (unless target-based scaling is actually turned on).
return new NoOpTargetScaler(functionId);
}
}

private sealed class NoOpTargetScaler : ITargetScaler
{
/// <summary>
/// Construct a placeholder target scaler.
/// </summary>
/// <param name="functionId">The function ID.</param>
public NoOpTargetScaler(string functionId)
{
this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId);
}

public TargetScalerDescriptor TargetScalerDescriptor { get; }

public Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
{
throw new NotImplementedException("The current DurableTask backend configuration does not support target-based scaling");
}
}
#endif

#if !FUNCTIONS_V1
/// <summary>
/// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
/// This is required to allow operation of those providers even if runtime scaling is turned off
/// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
/// </summary>
private sealed class NoOpScaleMonitor : IScaleMonitor
{
/// <summary>
/// Construct a placeholder scale monitor.
/// </summary>
/// <param name="name">A descriptive name.</param>
/// <param name="functionId">The function ID.</param>
public NoOpScaleMonitor(string name, string functionId)
{
#if FUNCTIONS_V3_OR_GREATER
this.Descriptor = new ScaleMonitorDescriptor(name, functionId);
#else
#pragma warning disable CS0618 // Type or member is obsolete
this.Descriptor = new ScaleMonitorDescriptor(name);
#pragma warning restore CS0618 // Type or member is obsolete
#endif
}

/// <summary>
/// A descriptive name.
/// </summary>
public ScaleMonitorDescriptor Descriptor { get; private set; }

/// <inheritdoc/>
Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
}

/// <inheritdoc/>
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
}
}
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
#else
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
Expand Down Expand Up @@ -107,6 +112,27 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder)
return builder;
}

#if FUNCTIONS_V3_OR_GREATER
/// <summary>
/// Adds the <see cref="IScaleMonitor"/> and <see cref="ITargetScaler"/> providers for the Durable Triggers.
/// </summary>
/// <param name="builder">The <see cref="IWebJobsBuilder"/> to configure.</param>
/// <returns>Returns the provided <see cref="IWebJobsBuilder"/>.</returns>
internal static IWebJobsBuilder AddDurableScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
// this segment adheres to the followings pattern: https://github.com/Azure/azure-sdk-for-net/pull/38756
DurableTaskTriggersScaleProvider provider = null;
builder.Services.AddSingleton(serviceProvider =>
{
provider = new DurableTaskTriggersScaleProvider(serviceProvider.GetService<IOptions<DurableTaskOptions>>(), serviceProvider.GetService<INameResolver>(), serviceProvider.GetService<ILoggerFactory>(), serviceProvider.GetService<IEnumerable<IDurabilityProviderFactory>>(), triggerMetadata);
return provider;
});
builder.Services.AddSingleton<IScaleMonitorProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
builder.Services.AddSingleton<ITargetScalerProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
return builder;
}
#endif

/// <summary>
/// Adds the Durable Task extension to the provided <see cref="IWebJobsBuilder"/>.
/// </summary>
Expand Down
15 changes: 10 additions & 5 deletions src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
using Microsoft.Azure.WebJobs.Host.Listeners;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
Expand Down Expand Up @@ -51,21 +52,25 @@ public DurableTaskListener(
this.functionName = functionName;
this.functionType = functionType;
this.connectionName = connectionName;
#if !FUNCTIONS_V1

#if !FUNCTIONS_V1
this.scaleMonitor = new Lazy<IScaleMonitor>(() =>
this.config.GetScaleMonitor(
ScaleUtils.GetScaleMonitor(
this.config.DefaultDurabilityProvider,
this.functionId,
this.functionName,
this.connectionName));
this.connectionName,
this.config.Options.HubName));

#endif
#if FUNCTIONS_V3_OR_GREATER
this.targetScaler = new Lazy<ITargetScaler>(() =>
this.config.GetTargetScaler(
ScaleUtils.GetTargetScaler(
this.config.DefaultDurabilityProvider,
this.functionId,
this.functionName,
this.connectionName));
this.connectionName,
this.config.Options.HubName));
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public DurableTaskScaleMonitor(
// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

#if FUNCTIONS_V3_OR_GREATER
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -38,46 +39,55 @@ public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider met

public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
{
// This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process.
var metrics = await this.metricsProvider.GetMetricsAsync();

// compute activityWorkers: the number of workers we need to process all activity messages
var workItemQueueLength = metrics.WorkItemQueueLength;
double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities);

var serializedControlQueueLengths = metrics.ControlQueueLengths;
var controlQueueLengths = JsonConvert.DeserializeObject<IReadOnlyList<int>>(serializedControlQueueLengths);

var controlQueueMessages = controlQueueLengths.Sum();
var activeControlQueues = controlQueueLengths.Count(x => x > 0);

// compute orchestratorWorkers: the number of workers we need to process all orchestrator messages.
// We bound this result to be no larger than the partition count
var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators);
var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers);

int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers);
this.scaleResult.TargetWorkerCount = numWorkersToRequest;

// When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table.
// This works because this code does not execute in the Functions Host process, but in the ScaleController process,
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var scaleControllerLog = $"Target worker count for {this.functionId}: {numWorkersToRequest}. " +
$"Metrics used: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";

// target worker count should never be negative
if (numWorkersToRequest < 0)
DurableTaskTriggerMetrics? metrics = null;
try
{
scaleControllerLog = "Tried to request a negative worker count." + scaleControllerLog;
this.logger.LogError(scaleControllerLog);

// Throw exception so ScaleController can handle the error.
throw new Exception(scaleControllerLog);
// This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process.
metrics = await this.metricsProvider.GetMetricsAsync();

// compute activityWorkers: the number of workers we need to process all activity messages
var workItemQueueLength = metrics.WorkItemQueueLength;
double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities);

var serializedControlQueueLengths = metrics.ControlQueueLengths;
var controlQueueLengths = JsonConvert.DeserializeObject<IReadOnlyList<int>>(serializedControlQueueLengths);

var controlQueueMessages = controlQueueLengths.Sum();
var activeControlQueues = controlQueueLengths.Count(x => x > 0);

// compute orchestratorWorkers: the number of workers we need to process all orchestrator messages.
// We bound this result to be no larger than the partition count
var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators);
var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers);

int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers);
this.scaleResult.TargetWorkerCount = numWorkersToRequest;

// When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table.
// This works because this code does not execute in the Functions Host process, but in the ScaleController process,
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
if (numWorkersToRequest < 0)
{
throw new InvalidOperationException("Number of workers to request cannot be negative");
}

this.logger.LogInformation(scaleControllerLog);
return this.scaleResult;
}
catch (Exception ex)
{
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}

this.logger.LogDebug(scaleControllerLog);
return this.scaleResult;
}
}
}
Expand Down
Loading

0 comments on commit 731d809

Please sign in to comment.