Skip to content

Commit

Permalink
addressed remaning comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ledjon Behluli authored and Ledjon Behluli committed Sep 25, 2024
1 parent a907ae1 commit 86ce2bc
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,37 @@ public sealed class ActivationRebalancerOptions
/// <summary>
/// Determines whether <see cref="AllowedEntropyDeviation"/> should be scaled dynamically
/// based on the total number of activations. When set to <see langword="true"/>, the allowed entropy
/// deviation will increase logarithmically after reaching an activation threshold (10,000 activations),
/// and will cap at the maximum (0.1 deviation).
/// deviation will increase logarithmically after reaching <see cref="ScaledEntropyDeviationActivationThreshold"/>,
/// and will cap at <see cref="MAX_SCALED_ENTROPY_DEVIATION"/>.
/// </summary>
/// <remarks>This is in place because a deviation of say 10 activations has far lesser
/// impact on a total of 100,000 activations than it does for say 1,000 activations.</remarks>
public bool ScaleAllowedEntropyDeviation { get; set; } = DEFAULT_SCALE_DEFAULT_ALLOWED_ENTROPY_DEVIATION;
public bool ScaleAllowedEntropyDeviation { get; set; } = DEFAULT_SCALE_ALLOWED_ENTROPY_DEVIATION;

/// <summary>
/// The default value of <see cref="ScaleAllowedEntropyDeviation"/>.
/// </summary>
public const bool DEFAULT_SCALE_DEFAULT_ALLOWED_ENTROPY_DEVIATION = true;
public const bool DEFAULT_SCALE_ALLOWED_ENTROPY_DEVIATION = true;

/// <summary>
/// The maximum value allowed when <see cref="ScaleAllowedEntropyDeviation"/> is <see langword="true"/>.
/// </summary>
public const double MAX_SCALED_ENTROPY_DEVIATION = 0.1d;

/// <summary>
/// Determines the number of activations that must be active during any rebalancing cycle, in order for <see cref="ScaleAllowedEntropyDeviation"/>
/// (if, and only if, its <see langword="true"/>) to begin scaling the <see cref="AllowedEntropyDeviation"/>.
/// </summary>
/// <remarks>
/// <para>Allowed range: [1000-∞)</para>
/// <para><strong>Values lower than 5000 are highly discouraged.</strong></para>
/// </remarks>
public int ScaledEntropyDeviationActivationThreshold { get; set; }

/// <summary>
/// The default value of <see cref="ScaledEntropyDeviationActivationThreshold"/>.
/// </summary>
public const int DEFAULT_SCALED_ENTROPY_DEVIATION_ACTIVATION_THRESHOLD = 10_000;

/// <summary>
/// <para>Represents the weight that is given to the number of rebalancing cycles that have passed during a rebalancing session.</para>
Expand Down Expand Up @@ -163,5 +183,10 @@ public void ValidateConfiguration()
{
throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.ActivationMigrationCountLimit)} must be greater than 0");
}

if (_options.ScaledEntropyDeviationActivationThreshold < 1_000)
{
throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.ScaledEntropyDeviationActivationThreshold)} must be greater than or equal to 1000");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -12,7 +11,6 @@
using Orleans.Configuration;
using Orleans.Placement;
using Orleans.Placement.Rebalancing;
using Orleans.Statistics;

#nullable enable

Expand Down Expand Up @@ -46,9 +44,9 @@ private enum StopReason
/// </summary>
SessionStarting,
/// <summary>
/// Current session has failed.
/// Current session has stagnated.
/// </summary>
SessionFailed,
SessionStagnated,
/// <summary>
/// Current session has completed successfully till end
/// </summary>
Expand Down Expand Up @@ -255,7 +253,7 @@ private async Task RunRebalancingCycle(CancellationToken cancellationToken)
if (_stagnantCycles >= _options.MaxStagnantCycles)
{
LogMaxStagnantCyclesReached(_stagnantCycles);
StopSession(StopReason.SessionFailed);
StopSession(StopReason.SessionStagnated);

return;
}
Expand Down Expand Up @@ -323,15 +321,12 @@ private async Task RunRebalancingCycle(CancellationToken cancellationToken)
var addressPairs = FormSiloPairs(snapshot);
var migrationTasks = new List<Task>();

Debug.Assert(addressPairs.Count % 2 == 0);

for (var i = 0; i < addressPairs.Count; i++)
{
(var lowSilo, var highSilo) = addressPairs[i];

if (lowSilo.IsSameLogicalSilo(highSilo))
{
continue;
}

var difference = Math.Abs(
(snapshot[lowSilo].ActivationCount - idealDistributions[lowSilo]) -
(snapshot[highSilo].ActivationCount - idealDistributions[highSilo]));
Expand Down Expand Up @@ -439,20 +434,17 @@ private static double ComputeHarmonicMean(Dictionary<SiloAddress, ResourceStatis

private double ComputeAllowedEntropyDeviation(int totalActivations)
{
const int ActivationThreshold = 10_000;
const double MaxAllowedEntropyDeviation = 0.1d;

if (!_options.ScaleAllowedEntropyDeviation || totalActivations < ActivationThreshold)
if (!_options.ScaleAllowedEntropyDeviation || totalActivations < _options.ScaledEntropyDeviationActivationThreshold)
{
return _options.AllowedEntropyDeviation;
}

Debug.Assert(totalActivations > 0);

var logFactor = (int)Math.Log10(totalActivations / ActivationThreshold);
var logFactor = (int)Math.Log10(totalActivations / _options.ScaledEntropyDeviationActivationThreshold);
var adjustedDeviation = _options.AllowedEntropyDeviation * Math.Pow(10, logFactor);

return Math.Min(adjustedDeviation, MaxAllowedEntropyDeviation);
return Math.Min(adjustedDeviation, ActivationRebalancerOptions.MAX_SCALED_ENTROPY_DEVIATION);
}

private double ComputeAdaptiveScaling(int siloCount, int rebalancingCycle)
Expand Down Expand Up @@ -482,11 +474,6 @@ private double ComputeAdaptiveScaling(int siloCount, int rebalancingCycle)
right--;
}

if (left == right) // Odd number of silos
{
pairs.Add((sorted[left].Key, sorted[left].Key)); // Pair this silo with itself
}

return pairs;
}

Expand Down Expand Up @@ -519,7 +506,7 @@ private void StopSession(StopReason reason, TimeSpan? duration = null)
_suspendedUntilTs = 0;
}
break;
case StopReason.SessionFailed:
case StopReason.SessionStagnated:
{
_failedSessions++;
SuspendFor(backoffProvider.Next(_failedSessions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,32 @@ public void ConstantsShouldNotChange()
Assert.Equal(0.0001d, ActivationRebalancerOptions.DEFAULT_ALLOWED_ENTROPY_DEVIATION);
Assert.Equal(0.1d, ActivationRebalancerOptions.DEFAULT_CYCLE_NUMBER_WEIGHT);
Assert.Equal(0.1d, ActivationRebalancerOptions.DEFAULT_SILO_NUMBER_WEIGHT);
Assert.Equal(0.1d, ActivationRebalancerOptions.MAX_SCALED_ENTROPY_DEVIATION);
Assert.Equal(10_000, ActivationRebalancerOptions.DEFAULT_SCALED_ENTROPY_DEVIATION_ACTIVATION_THRESHOLD);
Assert.Equal(int.MaxValue, ActivationRebalancerOptions.DEFAULT_ACTIVATION_MIGRATION_COUNT_LIMIT);
Assert.True(ActivationRebalancerOptions.DEFAULT_SCALE_DEFAULT_ALLOWED_ENTROPY_DEVIATION);
Assert.True(ActivationRebalancerOptions.DEFAULT_SCALE_ALLOWED_ENTROPY_DEVIATION);
}

[Theory]
[InlineData(999, 1, 0.1, 0.5, 0.5, 0.5, 2, 500)]
[InlineData(1000, 1, 0.1, 0.5, 0.5, 0.5, 2, 500)]
[InlineData(1000, 1, 0, 0.5, 0.5, 0.5, 2, 500)]
[InlineData(1000, 1, 0.1, -0.001, 0.5, 0.5, 2, 500)]
[InlineData(1000, 1, 0.1, 0.011, 0.5, 0.5, 2, 500)]
[InlineData(1000, 1, 0.1, 0.001, -0.1, 0.5, 1.1, 500)]
[InlineData(1000, 1, 0.1, 0.001, 1.1, 0.5, 2, 500)]
[InlineData(1000, 1, 0.1, 0.001, 0.5, 0, 2, 500)]
[InlineData(1000, 1, 0.1, 0.001, 0.5, 1.1, 2, 500)]
[InlineData(1000, 1, 0.1, 0.001, 0.5, 0.5, -0.1, 500)]
[InlineData(1000, 1, 0.1, 0.001, 0.5, 0.5, 1.1, 1)]
[InlineData(1000, 1, 0.1, 0.001, 0.5, 0.5, 1.1, 0)]
[InlineData(1000, 1, 0, 0.2, 0.2, 0, -0.1, 0, 999)]
[InlineData(2000, 2, -1, 0.05, 0.05, 0.5, 1.1, 10, 500)]
[InlineData(1000, 1, 2, 0, 0.05, 0.5, 0.5, 10, 999)]
[InlineData(1000, 1, 2, 0.05, 0, 0.5, 0.5, 10, 999)]
[InlineData(1000, 1, 2, 0.05, 0.05, -0.1, 0.5, 10, 999)]
[InlineData(1000, 1, 2, 0.05, 0.05, 0.5, 1.1, 10, 999)]
[InlineData(1000, 1, 2, 0.05, 0.05, 0.5, 0.5, 0, 999)]
[InlineData(1000, 1, 2, 0.05, 0.05, 0.5, 0.5, 10, 999)]

public void InvalidOptionsShouldThrow(
double sessionCyclePeriodMilliseconds,
double publisherRefreshTimeSeconds,
int sessionCyclePeriodMilliseconds,
int publisherRefreshTimeSeconds,
int maxStagnantCycles,
double entropyQuantum,
double allowedEntropyDeviation,
double cycleNumberWeight,
double siloNumberWeight,
int activationMigrationCountLimit)
int activationMigrationCountLimit,
int scaledEntropyDeviationActivationThreshold)
{
var publisherOptions = new DeploymentLoadPublisherOptions
{
Expand All @@ -57,7 +57,8 @@ public void InvalidOptionsShouldThrow(
AllowedEntropyDeviation = allowedEntropyDeviation,
CycleNumberWeight = cycleNumberWeight,
SiloNumberWeight = siloNumberWeight,
ActivationMigrationCountLimit = activationMigrationCountLimit
ActivationMigrationCountLimit = activationMigrationCountLimit,
ScaledEntropyDeviationActivationThreshold = scaledEntropyDeviationActivationThreshold
};

var validator = new ActivationRebalancerOptionsValidator(Options.Create(options), Options.Create(publisherOptions));
Expand Down

0 comments on commit 86ce2bc

Please sign in to comment.