Skip to content

Commit

Permalink
Read Logical Time Manager timestamp from the actor state (#356)
Browse files Browse the repository at this point in the history
* Add AnyCPU configuration in code.sln

* Read the logical time manager value from the state provider

Make StateProviderInitRetryDelayMilliseconds a const field

Wait for Logical Time Manager to start before loading reminders

Rename InitializeAndStartLogicalTimeManager

* Remove extra ReleaseAssert

* minor variable name change

* Read LogicalTimeManager Timestamp from state for VolatileActorStateProvider also

* KVS: Better exception handling; wait for read status

* Volatile: Better exception handling; wait for read status

* Add traces when waiting for WriteStatus to be Granted

* resolve some comments
  • Loading branch information
yashagarwal23 committed Jan 31, 2024
1 parent d736d01 commit 28982e1
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace Microsoft.ServiceFabric.Actors.Runtime
using CopyCompletionCallback = System.Action<System.Fabric.KeyValueStoreEnumerator>;
using DataLossCallback = System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<bool>>;
using FabricDirectory = Microsoft_ServiceFabric_Internal::System.Fabric.Common.FabricDirectory;
using ReleaseAssert = Microsoft_ServiceFabric_Internal::System.Fabric.Common.ReleaseAssert;
using ReplicationCallback = System.Action<System.Collections.Generic.IEnumerator<System.Fabric.KeyValueStoreNotification>>;
using Requires = Microsoft_ServiceFabric_Internal::System.Fabric.Common.Requires;
using RestoreCompletedCallback = System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>;
Expand All @@ -47,6 +48,7 @@ public abstract class KvsActorStateProviderBase
private const string BackupRootFolderPrefix = "kvsasp_";
private const string KvsHealthSourceId = "KvsActorStateProvider";
private const string BackupCallbackSlowCancellationHealthProperty = "BackupCallbackSlowCancellation";
private const int StateProviderInitRetryDelayMilliseconds = 500;
private static readonly byte[] ActorPresenceValue = { byte.MinValue };

private readonly DataContractSerializer reminderSerializer;
Expand All @@ -61,7 +63,6 @@ public abstract class KvsActorStateProviderBase
/// Used to synchronize between backup callback invocation and replica close/abort
/// </summary>
private readonly SemaphoreSlim backupCallbackLock;

private ReplicaRole replicaRole;
private IStatefulServicePartition partition;
private string traceId;
Expand All @@ -83,6 +84,9 @@ public abstract class KvsActorStateProviderBase
private CancellationTokenSource backupCallbackCts;
private Task<bool> backupCallbackTask;
private bool isClosingOrAborting;
private bool isLogicalTimeManagerInitialized;
private CancellationTokenSource stateProviderInitCts;
private Task stateProviderInitTask;

internal KvsActorStateProviderBase(ReplicatorSettings replicatorSettings)
{
Expand All @@ -105,6 +109,9 @@ internal KvsActorStateProviderBase(ReplicatorSettings replicatorSettings)
this.backupCallbackCts = null;
this.backupCallbackTask = null;
this.isClosingOrAborting = false;
this.isLogicalTimeManagerInitialized = false;
this.stateProviderInitCts = null;
this.stateProviderInitTask = null;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -246,13 +253,15 @@ Task IActorStateProvider.ActorActivatedAsync(ActorId actorId, CancellationToken
/// <returns>
/// A task that represents the asynchronous reminder callback completed notification processing.
/// </returns>
Task IActorStateProvider.ReminderCallbackCompletedAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken)
async Task IActorStateProvider.ReminderCallbackCompletedAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken)
{
await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken);

var key = ActorStateProviderHelper.CreateReminderCompletedStorageKey(actorId, reminder.Name);
var data = new ReminderCompletedData(this.logicalTimeManager.CurrentLogicalTime, DateTime.UtcNow);
var buffer = this.SerializeReminderCompletedData(data);

return this.actorStateProviderHelper.ExecuteWithRetriesAsync(
await this.actorStateProviderHelper.ExecuteWithRetriesAsync(
() =>
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -445,6 +454,8 @@ Task<PagedResult<ActorId>> IActorStateProvider.GetActorsAsync(
/// <inheritdoc/>
async Task<ReminderPagedResult<KeyValuePair<ActorId, List<ActorReminderState>>>> IActorStateProvider.GetRemindersAsync(int numItemsToReturn, ActorId actorId, ContinuationToken continuationToken, CancellationToken cancellationToken)
{
await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken);

return await this.actorStateProviderHelper.ExecuteWithRetriesAsync(
async () =>
{
Expand Down Expand Up @@ -548,15 +559,17 @@ async Task<ReminderPagedResult<KeyValuePair<ActorId, List<ActorReminderState>>>>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous save operation.</returns>
/// <exception cref="OperationCanceledException">The operation was canceled.</exception>
Task IActorStateProvider.SaveReminderAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken)
async Task IActorStateProvider.SaveReminderAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken)
{
await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken);

var reminderKey = CreateReminderStorageKey(actorId, reminder.Name);
var data = new ActorReminderData(actorId, reminder, this.logicalTimeManager.CurrentLogicalTime);
var buffer = this.SerializeReminder(data);

var reminderCompletedKey = ActorStateProviderHelper.CreateReminderCompletedStorageKey(actorId, reminder.Name);

return this.actorStateProviderHelper.ExecuteWithRetriesAsync(
await this.actorStateProviderHelper.ExecuteWithRetriesAsync(
() =>
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -611,9 +624,11 @@ Task IActorStateProvider.DeleteRemindersAsync(IReadOnlyDictionary<ActorId, IRead
/// parameter is a collection of all actor reminders contained in the actor state provider.
/// </returns>
/// <exception cref="OperationCanceledException">The operation was canceled.</exception>
Task<IActorReminderCollection> IActorStateProvider.LoadRemindersAsync(CancellationToken cancellationToken)
async Task<IActorReminderCollection> IActorStateProvider.LoadRemindersAsync(CancellationToken cancellationToken)
{
return this.actorStateProviderHelper.ExecuteWithRetriesAsync(
await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken);

return await this.actorStateProviderHelper.ExecuteWithRetriesAsync(
() => this.EnumerateReminderAsync(cancellationToken),
"LoadRemindersAsync",
cancellationToken);
Expand Down Expand Up @@ -685,11 +700,12 @@ async Task IStateProviderReplica.ChangeRoleAsync(ReplicaRole newRole, Cancellati
switch (newRole)
{
case ReplicaRole.Primary:
this.logicalTimeManager.Start();
this.stateProviderInitCts = new CancellationTokenSource();
this.stateProviderInitTask = this.StartStateProviderInitializationAsync(this.stateProviderInitCts.Token);
break;

default:
this.logicalTimeManager.Stop();
await this.CancelStateProviderInitializationAsync();
break;
}

Expand All @@ -713,6 +729,7 @@ async Task IStateProviderReplica.CloseAsync(CancellationToken cancellationToken)
// with actual ESE backup finishing with error. However, if ESE backup has finished successfully and
// backup callback is in-flight, it does not wait for the backup callback to finish, .
await this.CancelAndAwaitBackupCallbackIfAnyAsync();
await this.CancelStateProviderInitializationAsync();
}

/// <summary>
Expand All @@ -728,6 +745,9 @@ void IStateProviderReplica.Abort()
this.CancelAndAwaitBackupCallbackIfAnyAsync().ContinueWith(
t => t.Exception,
TaskContinuationOptions.OnlyOnFaulted);
this.CancelStateProviderInitializationAsync().ContinueWith(
t => t.Exception,
TaskContinuationOptions.OnlyOnFaulted);
}

/// <summary>
Expand Down Expand Up @@ -972,6 +992,162 @@ private static object Deserialize(DataContractSerializer serializer, byte[] data
}
}

private async Task StartStateProviderInitializationAsync(CancellationToken cancellationToken)
{
Exception unexpectedException = null;

try
{
cancellationToken.ThrowIfCancellationRequested();

await this.actorStateProviderHelper.ExecuteWithRetriesAsync(
async () =>
{
await this.InitializeAndStartLogicalTimeManagerAsync(cancellationToken);
},
"StartStateProviderInitializationAsync",
cancellationToken);
}
catch (OperationCanceledException opEx)
{
if (!cancellationToken.IsCancellationRequested)
{
unexpectedException = opEx;
}
}
catch (FabricObjectClosedException)
{
// This can happen when replica is closing. CancellationToken should get signaled.
// Fall through and let the task check for CancellationToken.
}
catch (FabricNotPrimaryException)
{
// This replica is no more primary. CancellationToken should get signaled.
// Fall through and let the task check for CancellationToken.
}
catch (Exception ex)
{
unexpectedException = ex;
}

if (unexpectedException != null)
{
var mssgFormat = "StartStateProviderInitializationAsync() failed due to " +
"an unexpected Exception causing replica to fault: {0}";

ActorTrace.Source.WriteErrorWithId(
TraceType,
this.traceId,
string.Format(mssgFormat, unexpectedException.ToString()));

this.partition.ReportFault(FaultType.Transient);
}
}

private async Task CancelStateProviderInitializationAsync()
{
if (this.stateProviderInitCts != null &&
this.stateProviderInitCts.IsCancellationRequested == false)
{
ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Canceling state provider initialization...");

this.stateProviderInitCts.Cancel();

try
{
await this.stateProviderInitTask;
}
catch (Exception ex)
{
// Code should never come here.
ReleaseAssert.Failfast(
"CancelStateProviderInitializationAsync() unexpected exception: {0}.",
ex.ToString());
}
finally
{
this.stateProviderInitCts = null;
this.stateProviderInitTask = null;
}
}

this.StopLogicalTimeManager();
}

private async Task InitializeAndStartLogicalTimeManagerAsync(CancellationToken cancellationToken)
{
ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Initializing logical time manager...");

if (this.isLogicalTimeManagerInitialized == true)
{
ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Logical time manager already initialized...");
return;
}

// wait for read status
await this.WaitForReadStatusAsync(cancellationToken);

using (var tx = this.storeReplica.CreateTransaction())
{
var enumerator = this.storeReplica.Enumerate(tx, LogicalTimestampKey);

while (enumerator.MoveNext())
{
var item = enumerator.Current;
this.TryDeserializeAndApplyLogicalTimestamp(item.Metadata.Key, item.Value);
}
}

this.logicalTimeManager.Start();
Volatile.Write(ref this.isLogicalTimeManagerInitialized, true);

ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Initializing logical time manager SUCCEEDED.");
}

private void StopLogicalTimeManager()
{
// Stop logical timer if it is running
if (this.isLogicalTimeManagerInitialized == true)
{
ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Stopping logical time manager...");

this.logicalTimeManager.Stop();
this.isLogicalTimeManagerInitialized = false;
}
}

private async Task WaitForReadStatusAsync(CancellationToken cancellationToken)
{
var retryCount = 0;

while (!cancellationToken.IsCancellationRequested &&
this.partition.ReadStatus != PartitionAccessStatus.Granted)
{
retryCount++;
ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for Read Status to be Granted");
await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken);
}
}

private async Task EnsureLogicalTimeManagerInitializedAsync(CancellationToken cancellationToken)
{
var retryCount = 0;

while (this.replicaRole == ReplicaRole.Primary && !this.isLogicalTimeManagerInitialized)
{
retryCount++;
ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for logical Time manager to be initialized");
await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken);
}

ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Logical Time Manager is initialized");

if (this.replicaRole != ReplicaRole.Primary)
{
throw new FabricNotPrimaryException(FabricErrorCode.NotPrimary);
}
}

private void OnCopyComplete(KeyValueStoreEnumerator enumerator)
{
var inner = enumerator.Enumerate(LogicalTimestampKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ private async Task WaitForWriteStatusAsync(CancellationToken cancellationToken)
this.servicePartition.WriteStatus != PartitionAccessStatus.Granted)
{
retryCount++;
ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for Write Status to be Granted");
await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken);
}
}
Expand Down
Loading

0 comments on commit 28982e1

Please sign in to comment.