diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
index 0162cdddc..27d36a287 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
@@ -254,6 +254,7 @@ static BlobLeaseManager GetBlobLeaseManager(
return new BlobLeaseManager(
settings,
leaseContainerName: settings.TaskHubName.ToLowerInvariant() + "-leases",
+ blobPrefix: string.Empty,
leaseType: leaseType,
storageClient: account.CreateCloudBlobClient(),
skipBlobContainerCreation: false,
diff --git a/src/DurableTask.AzureStorage/Partitioning/BlobLease.cs b/src/DurableTask.AzureStorage/Partitioning/BlobLease.cs
index 23e74c80f..afd4dee8a 100644
--- a/src/DurableTask.AzureStorage/Partitioning/BlobLease.cs
+++ b/src/DurableTask.AzureStorage/Partitioning/BlobLease.cs
@@ -13,82 +13,34 @@
namespace DurableTask.AzureStorage.Partitioning
{
- using System.IO;
- using System.Text;
- using System.Threading.Tasks;
- using DurableTask.AzureStorage.Monitoring;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
class BlobLease : Lease
{
- private readonly string accountName;
- private readonly AzureStorageOrchestrationServiceStats stats;
- private readonly AzureStorageOrchestrationServiceSettings settings;
-
- public BlobLease() { }
-
- public BlobLease(BlobLease blobLease)
+ public BlobLease()
+ : base()
{
- this.PartitionId = blobLease.PartitionId;
- this.Blob = blobLease.Blob;
- this.Owner = blobLease.Owner;
- this.Epoch = blobLease.Epoch;
- this.Token = blobLease.Token;
}
- public BlobLease(
- string partitionId,
- CloudBlobDirectory leaseDirectory,
- string accountName,
- AzureStorageOrchestrationServiceStats stats,
- AzureStorageOrchestrationServiceSettings settings)
+ public BlobLease(CloudBlockBlob leaseBlob)
+ : this()
{
- this.PartitionId = partitionId;
- this.Blob = leaseDirectory.GetBlockBlobReference(partitionId);
- this.accountName = accountName;
- this.stats = stats;
- this.settings = settings;
+ this.Blob = leaseBlob;
}
- /// Determines whether the lease is expired.
- /// true if the lease is expired; otherwise, false.
- public override bool IsExpired() => this.Blob.Properties.LeaseState != LeaseState.Leased;
-
+ public BlobLease(BlobLease source)
+ : base(source)
+ {
+ this.Blob = source.Blob;
+ }
- // This property is a reference to the blob itself, so we do not want to serialize it when
- // writing/reading from the blob content.
[JsonIgnore]
- internal CloudBlockBlob Blob { get; private set; }
+ internal CloudBlockBlob Blob { get; set; }
- public override async Task DownloadLeaseAsync()
+ public override bool IsExpired()
{
-
- var serializedLease = await TimeoutHandler.ExecuteWithTimeout(
- operationName: "DownloadBlobLease",
- account: this.accountName,
- settings: this.settings,
- operation: async (context, cancelToken) =>
- {
- // We use DownloadToStreamAsync() because unlike many of the other Download*() APIs, this fetches
- // the attributes without a second API call. See https://stackoverflow.com/a/23749639/9035640
- using (var memoryStream = new MemoryStream())
- {
- await this.Blob.DownloadToStreamAsync(memoryStream, null, null, context, cancelToken);
- memoryStream.Position = 0;
- using (StreamReader reader = new StreamReader(memoryStream, Encoding.UTF8))
- {
- return await reader.ReadToEndAsync();
- }
- }
- });
-
- this.stats.StorageRequests.Increment();
-
- BlobLease lease = JsonConvert.DeserializeObject(serializedLease);
- this.Epoch = lease.Epoch;
- this.Owner = lease.Owner;
- this.Token = lease.Token;
+ return this.Blob.Properties.LeaseState != LeaseState.Leased;
}
}
}
diff --git a/src/DurableTask.AzureStorage/Partitioning/BlobLeaseManager.cs b/src/DurableTask.AzureStorage/Partitioning/BlobLeaseManager.cs
index d94e44432..dc920ba77 100644
--- a/src/DurableTask.AzureStorage/Partitioning/BlobLeaseManager.cs
+++ b/src/DurableTask.AzureStorage/Partitioning/BlobLeaseManager.cs
@@ -34,6 +34,7 @@ sealed class BlobLeaseManager : ILeaseManager
readonly string storageAccountName;
readonly string taskHubName;
readonly string workerName;
+ readonly string blobPrefix;
readonly string leaseContainerName;
readonly string leaseType;
readonly bool skipBlobContainerCreation;
@@ -45,11 +46,11 @@ sealed class BlobLeaseManager : ILeaseManager
CloudBlobContainer taskHubContainer;
CloudBlobDirectory leaseDirectory;
CloudBlockBlob taskHubInfoBlob;
- List blobLeases;
public BlobLeaseManager(
AzureStorageOrchestrationServiceSettings settings,
string leaseContainerName,
+ string blobPrefix,
string leaseType,
CloudBlobClient storageClient,
bool skipBlobContainerCreation,
@@ -60,6 +61,7 @@ public BlobLeaseManager(
this.taskHubName = settings.TaskHubName;
this.workerName = settings.WorkerId;
this.leaseContainerName = leaseContainerName;
+ this.blobPrefix = blobPrefix;
this.leaseType = leaseType;
this.storageClient = storageClient;
this.leaseInterval = settings.LeaseInterval;
@@ -96,25 +98,51 @@ public async Task CreateLeaseStoreIfNotExistsAsync(TaskHubInfo eventHubInf
return result;
}
- public async Task> ListLeasesAsync(bool downloadLeases)
+ public async Task> ListLeasesAsync()
{
- if (this.blobLeases == null)
- {
- throw new InvalidOperationException($"{nameof(ListLeasesAsync)} cannot be called without first calling {nameof(Initialize)}");
- }
+ var blobLeases = new List();
- if (downloadLeases)
+ BlobContinuationToken continuationToken = null;
+ do
{
- await Task.WhenAll(this.blobLeases.Select(lease => lease.DownloadLeaseAsync()));
+ BlobResultSegment segment = await TimeoutHandler.ExecuteWithTimeout("ListLeases", this.storageAccountName, this.settings, (context, timeoutToken) =>
+ {
+ return this.leaseDirectory.ListBlobsSegmentedAsync(
+ useFlatBlobListing: true,
+ blobListingDetails: BlobListingDetails.Metadata,
+ maxResults: null,
+ currentToken: continuationToken,
+ options: null,
+ operationContext: context,
+ cancellationToken: timeoutToken);
+ });
+
+ continuationToken = segment.ContinuationToken;
+
+ var downloadTasks = new List>();
+ foreach (IListBlobItem blob in segment.Results)
+ {
+ CloudBlockBlob lease = blob as CloudBlockBlob;
+ if (lease != null)
+ {
+ downloadTasks.Add(this.DownloadLeaseBlob(lease));
+ }
+ }
+
+ await Task.WhenAll(downloadTasks);
+
+ blobLeases.AddRange(downloadTasks.Select(t => t.Result));
}
+ while (continuationToken != null);
- return this.blobLeases;
+ return blobLeases;
}
public async Task CreateLeaseIfNotExistAsync(string partitionId)
{
CloudBlockBlob leaseBlob = this.leaseDirectory.GetBlockBlobReference(partitionId);
- string serializedLease = JsonConvert.SerializeObject(new { PartitionId = partitionId });
+ BlobLease lease = new BlobLease(leaseBlob) { PartitionId = partitionId };
+ string serializedLease = JsonConvert.SerializeObject(lease);
try
{
this.settings.Logger.PartitionManagerInfo(
@@ -124,10 +152,11 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId)
partitionId,
string.Format(
CultureInfo.InvariantCulture,
- "CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}",
+ "CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}. blobPrefix: {3}",
this.leaseContainerName,
this.leaseType,
- partitionId));
+ partitionId,
+ this.blobPrefix ?? string.Empty));
await leaseBlob.UploadTextAsync(serializedLease, null, AccessCondition.GenerateIfNoneMatchCondition("*"), null, null);
}
@@ -144,10 +173,11 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId)
partitionId,
string.Format(
CultureInfo.InvariantCulture,
- "CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}, exception: {3}",
+ "CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}, blobPrefix: {3}, exception: {4}",
this.leaseContainerName,
this.leaseType,
partitionId,
+ this.blobPrefix ?? string.Empty,
se.Message));
}
}
@@ -157,6 +187,17 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId)
}
}
+ public async Task GetLeaseAsync(string paritionId)
+ {
+ CloudBlockBlob leaseBlob = this.leaseDirectory.GetBlockBlobReference(paritionId);
+ if (await leaseBlob.ExistsAsync())
+ {
+ return await this.DownloadLeaseBlob(leaseBlob);
+ }
+
+ return null;
+ }
+
public async Task RenewAsync(BlobLease lease)
{
CloudBlockBlob leaseBlob = lease.Blob;
@@ -359,21 +400,19 @@ private bool IsStale(TaskHubInfo currentTaskHubInfo, TaskHubInfo newTaskHubInfo)
void Initialize()
{
this.storageClient.DefaultRequestOptions.MaximumExecutionTime = StorageMaximumExecutionTime;
+
this.taskHubContainer = this.storageClient.GetContainerReference(this.leaseContainerName);
- this.leaseDirectory = this.taskHubContainer.GetDirectoryReference(this.leaseType);
- this.taskHubInfoBlob = this.taskHubContainer.GetBlockBlobReference(TaskHubInfoBlobName);
- this.blobLeases = new List(this.settings.PartitionCount);
- for (int i = 0; i < this.settings.PartitionCount; i++)
- {
- this.blobLeases.Add(
- new BlobLease(
- partitionId: $"{this.taskHubName.ToLowerInvariant()}-control-{i:00}",
- leaseDirectory: this.leaseDirectory,
- accountName: this.storageAccountName,
- stats: this.stats,
- settings: this.settings)
- );
- }
+
+ string leaseDirectoryName = string.IsNullOrWhiteSpace(this.blobPrefix)
+ ? this.leaseType
+ : this.blobPrefix + this.leaseType;
+ this.leaseDirectory = this.taskHubContainer.GetDirectoryReference(leaseDirectoryName);
+
+ string taskHubInfoBlobFileName = string.IsNullOrWhiteSpace(this.blobPrefix)
+ ? TaskHubInfoBlobName
+ : this.blobPrefix + TaskHubInfoBlobName;
+
+ this.taskHubInfoBlob = this.taskHubContainer.GetBlockBlobReference(taskHubInfoBlobFileName);
}
async Task GetTaskHubInfoAsync()
@@ -391,6 +430,19 @@ async Task GetTaskHubInfoAsync()
return null;
}
+ async Task DownloadLeaseBlob(CloudBlockBlob blob)
+ {
+ string serializedLease = await blob.DownloadTextAsync();
+ this.stats.StorageRequests.Increment();
+ BlobLease deserializedLease = JsonConvert.DeserializeObject(serializedLease);
+ deserializedLease.Blob = blob;
+
+ // Workaround: for some reason storage client reports incorrect blob properties after downloading the blob
+ await blob.FetchAttributesAsync();
+ this.stats.StorageRequests.Increment();
+ return deserializedLease;
+ }
+
static Exception HandleStorageException(Lease lease, StorageException storageException, bool ignoreLeaseLost = false)
{
if (storageException.RequestInformation.HttpStatusCode == (int)HttpStatusCode.Conflict
diff --git a/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs b/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs
index 95786a1e9..1ad5b0c82 100644
--- a/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs
+++ b/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs
@@ -22,10 +22,12 @@ interface ILeaseManager where T : Lease
Task CreateLeaseStoreIfNotExistsAsync(TaskHubInfo eventHubInfo, bool checkIfStale = true);
- Task> ListLeasesAsync(bool downloadLeases);
+ Task> ListLeasesAsync();
Task CreateLeaseIfNotExistAsync(string partitionId);
+ Task GetLeaseAsync(string partitionId);
+
Task RenewAsync(T lease);
Task AcquireAsync(T lease, string owner);
@@ -38,4 +40,4 @@ interface ILeaseManager where T : Lease
Task UpdateAsync(T lease);
}
-}
\ No newline at end of file
+}
diff --git a/src/DurableTask.AzureStorage/Partitioning/Lease.cs b/src/DurableTask.AzureStorage/Partitioning/Lease.cs
index 9ad17d867..b51d5cc77 100644
--- a/src/DurableTask.AzureStorage/Partitioning/Lease.cs
+++ b/src/DurableTask.AzureStorage/Partitioning/Lease.cs
@@ -13,8 +13,6 @@
namespace DurableTask.AzureStorage.Partitioning
{
- using System.Threading.Tasks;
-
/// Contains partition ownership information.
class Lease
{
@@ -23,9 +21,9 @@ public Lease()
{
}
- /// Initializes a new instance of the
- /// class with the specified
- /// value as reference.
+ /// Initializes a new instance of the
+ /// class with the specified
+ /// value as reference.
/// The specified instance where its property values will be copied from.
public Lease(Lease source)
{
@@ -43,13 +41,13 @@ public Lease(Lease source)
/// The host owner of the partition.
public string Owner { get; set; }
- /// Gets or sets the lease token that manages concurrency between hosts. You can use this token to guarantee single access to any resource needed by the
- /// object.
+ /// Gets or sets the lease token that manages concurrency between hosts. You can use this token to guarantee single access to any resource needed by the
+ /// object.
/// The lease token.
public string Token { get; set; }
- /// Gets or sets the epoch year of the lease, which is a value
- /// you can use to determine the most recent owner of a partition between competing nodes.
+ /// Gets or sets the epoch year of the lease, which is a value
+ /// you can use to determine the most recent owner of a partition between competing nodes.
/// The epoch year of the lease.
public long Epoch { get; set; }
@@ -60,15 +58,6 @@ public virtual bool IsExpired()
return false;
}
- ///
- /// Used to download fresh data from the lease.
- ///
- public virtual Task DownloadLeaseAsync()
- {
- return Task.CompletedTask;
- }
-
-
/// Determines whether this instance is equal to the specified object.
/// The object to compare.
/// true if this instance is equal to the specified object; otherwise, false.
diff --git a/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancer.cs b/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancer.cs
index b0e5ffb7e..c04d9c286 100644
--- a/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancer.cs
+++ b/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancer.cs
@@ -34,7 +34,7 @@ sealed class LeaseCollectionBalancer where T : Lease
readonly ConcurrentDictionary currentlyOwnedShards;
readonly ConcurrentDictionary keepRenewingDuringClose;
readonly LeaseObserverManager leaseObserverManager;
- readonly Func shouldAcquireLeaseDelegate;
+ readonly Func shouldAquireLeaseDelegate;
readonly Func shouldRenewLeaseDelegate;
int isStarted;
@@ -48,9 +48,9 @@ public LeaseCollectionBalancer(
string leaseType,
AzureStorageOrchestrationServiceSettings settings,
string accountName,
- ILeaseManager leaseManager,
+ ILeaseManager leaseManager,
LeaseCollectionBalancerOptions options,
- Func shouldAcquireLeaseDelegate = null,
+ Func shouldAquireLeaseDelegate = null,
Func shouldRenewLeaseDelegate = null)
{
@@ -62,7 +62,7 @@ public LeaseCollectionBalancer(
this.options = options;
this.settings = settings;
- this.shouldAcquireLeaseDelegate = shouldAcquireLeaseDelegate ?? DefaultLeaseDecisionDelegate;
+ this.shouldAquireLeaseDelegate = shouldAquireLeaseDelegate ?? DefaultLeaseDecisionDelegate;
this.shouldRenewLeaseDelegate = shouldRenewLeaseDelegate ?? DefaultLeaseDecisionDelegate;
this.currentlyOwnedShards = new ConcurrentDictionary();
@@ -83,8 +83,7 @@ public ConcurrentDictionary GetCurrentlyOwnedLeases()
public async Task InitializeAsync()
{
var leases = new List();
- var leasesToInitialize = await this.leaseManager.ListLeasesAsync(downloadLeases: true);
- foreach (T lease in leasesToInitialize)
+ foreach (T lease in await this.leaseManager.ListLeasesAsync())
{
if (string.Compare(lease.Owner, this.workerName, StringComparison.OrdinalIgnoreCase) == 0)
{
@@ -333,14 +332,20 @@ async Task> TakeLeasesAsync()
var workerToShardCount = new Dictionary();
var expiredLeases = new List();
- // We wait to download leases until after we have filtered out leases we should
- // not acquire
- var allLeases = await this.leaseManager.ListLeasesAsync(downloadLeases: false);
- var acquirableLeases = allLeases.Where(lease => this.shouldAcquireLeaseDelegate(lease.PartitionId)).ToList();
- await Task.WhenAll(acquirableLeases.Select(blobLease => blobLease.DownloadLeaseAsync()));
-
- foreach (T lease in acquirableLeases)
+ var allLeases = await this.leaseManager.ListLeasesAsync();
+ foreach (T lease in allLeases)
{
+ if (!this.shouldAquireLeaseDelegate(lease.PartitionId))
+ {
+ this.settings.Logger.PartitionManagerInfo(
+ this.accountName,
+ this.taskHub,
+ this.workerName,
+ string.Empty /* partitionId */,
+ $"Skiping {this.leaseType} lease aquiring for {lease.PartitionId}");
+ continue;
+ }
+
allShards.Add(lease.PartitionId, lease);
if (lease.IsExpired() || string.IsNullOrWhiteSpace(lease.Owner))
{
@@ -382,7 +387,7 @@ async Task> TakeLeasesAsync()
if (moreShardsNeeded > 0)
{
- var shardsToAcquire = new HashSet();
+ HashSet shardsToAcquire = new HashSet();
if (expiredLeases.Count > 0)
{
foreach (T leaseToTake in expiredLeases)
diff --git a/src/DurableTask.AzureStorage/Partitioning/LegacyPartitionManager.cs b/src/DurableTask.AzureStorage/Partitioning/LegacyPartitionManager.cs
index 16c14b368..650275c00 100644
--- a/src/DurableTask.AzureStorage/Partitioning/LegacyPartitionManager.cs
+++ b/src/DurableTask.AzureStorage/Partitioning/LegacyPartitionManager.cs
@@ -41,6 +41,7 @@ public LegacyPartitionManager(
this.leaseManager = new BlobLeaseManager(
settings,
settings.TaskHubName.ToLowerInvariant() + "-leases",
+ string.Empty,
"default",
account.CreateCloudBlobClient(),
skipBlobContainerCreation: false,
@@ -93,7 +94,7 @@ Task IPartitionManager.DeleteLeases()
Task> IPartitionManager.GetOwnershipBlobLeases()
{
- return this.leaseManager.ListLeasesAsync(downloadLeases: true);
+ return this.leaseManager.ListLeasesAsync();
}
async Task IPartitionManager.StartAsync()
diff --git a/src/DurableTask.AzureStorage/Partitioning/SafePartitionManager.cs b/src/DurableTask.AzureStorage/Partitioning/SafePartitionManager.cs
index 0bb23b40a..d63e4c3f4 100644
--- a/src/DurableTask.AzureStorage/Partitioning/SafePartitionManager.cs
+++ b/src/DurableTask.AzureStorage/Partitioning/SafePartitionManager.cs
@@ -49,6 +49,7 @@ public SafePartitionManager(
this.intentLeaseManager = new BlobLeaseManager(
settings,
settings.TaskHubName.ToLowerInvariant() + "-leases",
+ string.Empty,
"intent",
account.CreateCloudBlobClient(),
skipBlobContainerCreation: false,
@@ -71,6 +72,7 @@ public SafePartitionManager(
this.ownershipLeaseManager = new BlobLeaseManager(
settings,
settings.TaskHubName.ToLowerInvariant() + "-leases",
+ string.Empty,
"ownership",
account.CreateCloudBlobClient(),
skipBlobContainerCreation: false,
@@ -83,12 +85,12 @@ public SafePartitionManager(
this.ownershipLeaseManager,
new LeaseCollectionBalancerOptions
{
- AcquireInterval = settings.LeaseAcquireInterval,
- RenewInterval = settings.LeaseRenewInterval,
- LeaseInterval = settings.LeaseInterval,
+ AcquireInterval = TimeSpan.FromSeconds(5),
+ RenewInterval = TimeSpan.FromSeconds(10),
+ LeaseInterval = TimeSpan.FromSeconds(15),
ShouldStealLeases = false
},
- shouldAcquireLeaseDelegate: leaseKey => currentlyOwnedIntentLeases.ContainsKey(leaseKey),
+ shouldAquireLeaseDelegate: leaseKey => currentlyOwnedIntentLeases.ContainsKey(leaseKey),
shouldRenewLeaseDelegate: leaseKey => currentlyOwnedIntentLeases.ContainsKey(leaseKey)
|| this.sessionManager.IsControlQueueReceivingMessages(leaseKey)
|| this.sessionManager.IsControlQueueProcessingMessages(leaseKey));
@@ -96,7 +98,7 @@ public SafePartitionManager(
Task> IPartitionManager.GetOwnershipBlobLeases()
{
- return this.ownershipLeaseManager.ListLeasesAsync(downloadLeases: true);
+ return this.ownershipLeaseManager.ListLeasesAsync();
}
Task IPartitionManager.CreateLeaseStore()
diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
index 23fd272b6..7166a4e07 100644
--- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
+++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
@@ -430,7 +430,7 @@ public async Task PartitionLost_AbandonPrefetchedSession()
await TestHelpers.WaitFor(
condition: () => service.OwnedControlQueues.Any(),
- timeout: TimeSpan.FromSeconds(20));
+ timeout: TimeSpan.FromSeconds(10));
ControlQueue controlQueue = service.OwnedControlQueues.Single();
List messages = Enumerable.Range(0, 100).Select(i => new TaskMessage
diff --git a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
index e207ca4a0..f39f089ba 100644
--- a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
+++ b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
@@ -35,7 +35,6 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
ExtendedSessionsEnabled = enableExtendedSessions,
ExtendedSessionIdleTimeout = TimeSpan.FromSeconds(extendedSessionTimeoutInSeconds),
FetchLargeMessageDataEnabled = fetchLargeMessages,
- UseLegacyPartitionManagement = true,
// Setting up a logger factory to enable the new DurableTask.Core logs
// TODO: Add a logger provider so we can collect these logs in memory.