Skip to content

Commit

Permalink
Revert "Reduce storage IO costs for partition management" (#523)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmrdavid authored Mar 17, 2021
1 parent 638a9ed commit 1abc088
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ static BlobLeaseManager GetBlobLeaseManager(
return new BlobLeaseManager(
settings,
leaseContainerName: settings.TaskHubName.ToLowerInvariant() + "-leases",
blobPrefix: string.Empty,
leaseType: leaseType,
storageClient: account.CreateCloudBlobClient(),
skipBlobContainerCreation: false,
Expand Down
74 changes: 13 additions & 61 deletions src/DurableTask.AzureStorage/Partitioning/BlobLease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,82 +13,34 @@

namespace DurableTask.AzureStorage.Partitioning
{
using System.IO;
using System.Text;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;

class BlobLease : Lease
{
private readonly string accountName;
private readonly AzureStorageOrchestrationServiceStats stats;
private readonly AzureStorageOrchestrationServiceSettings settings;

public BlobLease() { }

public BlobLease(BlobLease blobLease)
public BlobLease()
: base()
{
this.PartitionId = blobLease.PartitionId;
this.Blob = blobLease.Blob;
this.Owner = blobLease.Owner;
this.Epoch = blobLease.Epoch;
this.Token = blobLease.Token;
}

public BlobLease(
string partitionId,
CloudBlobDirectory leaseDirectory,
string accountName,
AzureStorageOrchestrationServiceStats stats,
AzureStorageOrchestrationServiceSettings settings)
public BlobLease(CloudBlockBlob leaseBlob)
: this()
{
this.PartitionId = partitionId;
this.Blob = leaseDirectory.GetBlockBlobReference(partitionId);
this.accountName = accountName;
this.stats = stats;
this.settings = settings;
this.Blob = leaseBlob;
}

/// <summary>Determines whether the lease is expired.</summary>
/// <returns>true if the lease is expired; otherwise, false.</returns>
public override bool IsExpired() => this.Blob.Properties.LeaseState != LeaseState.Leased;

public BlobLease(BlobLease source)
: base(source)
{
this.Blob = source.Blob;
}

// This property is a reference to the blob itself, so we do not want to serialize it when
// writing/reading from the blob content.
[JsonIgnore]
internal CloudBlockBlob Blob { get; private set; }
internal CloudBlockBlob Blob { get; set; }

public override async Task DownloadLeaseAsync()
public override bool IsExpired()
{

var serializedLease = await TimeoutHandler.ExecuteWithTimeout(
operationName: "DownloadBlobLease",
account: this.accountName,
settings: this.settings,
operation: async (context, cancelToken) =>
{
// We use DownloadToStreamAsync() because unlike many of the other Download*() APIs, this fetches
// the attributes without a second API call. See https://stackoverflow.com/a/23749639/9035640
using (var memoryStream = new MemoryStream())
{
await this.Blob.DownloadToStreamAsync(memoryStream, null, null, context, cancelToken);
memoryStream.Position = 0;
using (StreamReader reader = new StreamReader(memoryStream, Encoding.UTF8))
{
return await reader.ReadToEndAsync();
}
}
});

this.stats.StorageRequests.Increment();

BlobLease lease = JsonConvert.DeserializeObject<BlobLease>(serializedLease);
this.Epoch = lease.Epoch;
this.Owner = lease.Owner;
this.Token = lease.Token;
return this.Blob.Properties.LeaseState != LeaseState.Leased;
}
}
}
106 changes: 79 additions & 27 deletions src/DurableTask.AzureStorage/Partitioning/BlobLeaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sealed class BlobLeaseManager : ILeaseManager<BlobLease>
readonly string storageAccountName;
readonly string taskHubName;
readonly string workerName;
readonly string blobPrefix;
readonly string leaseContainerName;
readonly string leaseType;
readonly bool skipBlobContainerCreation;
Expand All @@ -45,11 +46,11 @@ sealed class BlobLeaseManager : ILeaseManager<BlobLease>
CloudBlobContainer taskHubContainer;
CloudBlobDirectory leaseDirectory;
CloudBlockBlob taskHubInfoBlob;
List<BlobLease> blobLeases;

public BlobLeaseManager(
AzureStorageOrchestrationServiceSettings settings,
string leaseContainerName,
string blobPrefix,
string leaseType,
CloudBlobClient storageClient,
bool skipBlobContainerCreation,
Expand All @@ -60,6 +61,7 @@ public BlobLeaseManager(
this.taskHubName = settings.TaskHubName;
this.workerName = settings.WorkerId;
this.leaseContainerName = leaseContainerName;
this.blobPrefix = blobPrefix;
this.leaseType = leaseType;
this.storageClient = storageClient;
this.leaseInterval = settings.LeaseInterval;
Expand Down Expand Up @@ -96,25 +98,51 @@ public async Task<bool> CreateLeaseStoreIfNotExistsAsync(TaskHubInfo eventHubInf
return result;
}

public async Task<IEnumerable<BlobLease>> ListLeasesAsync(bool downloadLeases)
public async Task<IEnumerable<BlobLease>> ListLeasesAsync()
{
if (this.blobLeases == null)
{
throw new InvalidOperationException($"{nameof(ListLeasesAsync)} cannot be called without first calling {nameof(Initialize)}");
}
var blobLeases = new List<BlobLease>();

if (downloadLeases)
BlobContinuationToken continuationToken = null;
do
{
await Task.WhenAll(this.blobLeases.Select(lease => lease.DownloadLeaseAsync()));
BlobResultSegment segment = await TimeoutHandler.ExecuteWithTimeout("ListLeases", this.storageAccountName, this.settings, (context, timeoutToken) =>
{
return this.leaseDirectory.ListBlobsSegmentedAsync(
useFlatBlobListing: true,
blobListingDetails: BlobListingDetails.Metadata,
maxResults: null,
currentToken: continuationToken,
options: null,
operationContext: context,
cancellationToken: timeoutToken);
});

continuationToken = segment.ContinuationToken;

var downloadTasks = new List<Task<BlobLease>>();
foreach (IListBlobItem blob in segment.Results)
{
CloudBlockBlob lease = blob as CloudBlockBlob;
if (lease != null)
{
downloadTasks.Add(this.DownloadLeaseBlob(lease));
}
}

await Task.WhenAll(downloadTasks);

blobLeases.AddRange(downloadTasks.Select(t => t.Result));
}
while (continuationToken != null);

return this.blobLeases;
return blobLeases;
}

public async Task CreateLeaseIfNotExistAsync(string partitionId)
{
CloudBlockBlob leaseBlob = this.leaseDirectory.GetBlockBlobReference(partitionId);
string serializedLease = JsonConvert.SerializeObject(new { PartitionId = partitionId });
BlobLease lease = new BlobLease(leaseBlob) { PartitionId = partitionId };
string serializedLease = JsonConvert.SerializeObject(lease);
try
{
this.settings.Logger.PartitionManagerInfo(
Expand All @@ -124,10 +152,11 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId)
partitionId,
string.Format(
CultureInfo.InvariantCulture,
"CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}",
"CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}. blobPrefix: {3}",
this.leaseContainerName,
this.leaseType,
partitionId));
partitionId,
this.blobPrefix ?? string.Empty));

await leaseBlob.UploadTextAsync(serializedLease, null, AccessCondition.GenerateIfNoneMatchCondition("*"), null, null);
}
Expand All @@ -144,10 +173,11 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId)
partitionId,
string.Format(
CultureInfo.InvariantCulture,
"CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}, exception: {3}",
"CreateLeaseIfNotExistAsync - leaseContainerName: {0}, leaseType: {1}, partitionId: {2}, blobPrefix: {3}, exception: {4}",
this.leaseContainerName,
this.leaseType,
partitionId,
this.blobPrefix ?? string.Empty,
se.Message));
}
}
Expand All @@ -157,6 +187,17 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId)
}
}

public async Task<BlobLease> GetLeaseAsync(string paritionId)
{
CloudBlockBlob leaseBlob = this.leaseDirectory.GetBlockBlobReference(paritionId);
if (await leaseBlob.ExistsAsync())
{
return await this.DownloadLeaseBlob(leaseBlob);
}

return null;
}

public async Task<bool> RenewAsync(BlobLease lease)
{
CloudBlockBlob leaseBlob = lease.Blob;
Expand Down Expand Up @@ -359,21 +400,19 @@ private bool IsStale(TaskHubInfo currentTaskHubInfo, TaskHubInfo newTaskHubInfo)
void Initialize()
{
this.storageClient.DefaultRequestOptions.MaximumExecutionTime = StorageMaximumExecutionTime;

this.taskHubContainer = this.storageClient.GetContainerReference(this.leaseContainerName);
this.leaseDirectory = this.taskHubContainer.GetDirectoryReference(this.leaseType);
this.taskHubInfoBlob = this.taskHubContainer.GetBlockBlobReference(TaskHubInfoBlobName);
this.blobLeases = new List<BlobLease>(this.settings.PartitionCount);
for (int i = 0; i < this.settings.PartitionCount; i++)
{
this.blobLeases.Add(
new BlobLease(
partitionId: $"{this.taskHubName.ToLowerInvariant()}-control-{i:00}",
leaseDirectory: this.leaseDirectory,
accountName: this.storageAccountName,
stats: this.stats,
settings: this.settings)
);
}

string leaseDirectoryName = string.IsNullOrWhiteSpace(this.blobPrefix)
? this.leaseType
: this.blobPrefix + this.leaseType;
this.leaseDirectory = this.taskHubContainer.GetDirectoryReference(leaseDirectoryName);

string taskHubInfoBlobFileName = string.IsNullOrWhiteSpace(this.blobPrefix)
? TaskHubInfoBlobName
: this.blobPrefix + TaskHubInfoBlobName;

this.taskHubInfoBlob = this.taskHubContainer.GetBlockBlobReference(taskHubInfoBlobFileName);
}

async Task<TaskHubInfo> GetTaskHubInfoAsync()
Expand All @@ -391,6 +430,19 @@ async Task<TaskHubInfo> GetTaskHubInfoAsync()
return null;
}

async Task<BlobLease> DownloadLeaseBlob(CloudBlockBlob blob)
{
string serializedLease = await blob.DownloadTextAsync();
this.stats.StorageRequests.Increment();
BlobLease deserializedLease = JsonConvert.DeserializeObject<BlobLease>(serializedLease);
deserializedLease.Blob = blob;

// Workaround: for some reason storage client reports incorrect blob properties after downloading the blob
await blob.FetchAttributesAsync();
this.stats.StorageRequests.Increment();
return deserializedLease;
}

static Exception HandleStorageException(Lease lease, StorageException storageException, bool ignoreLeaseLost = false)
{
if (storageException.RequestInformation.HttpStatusCode == (int)HttpStatusCode.Conflict
Expand Down
6 changes: 4 additions & 2 deletions src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ interface ILeaseManager<T> where T : Lease

Task<bool> CreateLeaseStoreIfNotExistsAsync(TaskHubInfo eventHubInfo, bool checkIfStale = true);

Task<IEnumerable<T>> ListLeasesAsync(bool downloadLeases);
Task<IEnumerable<T>> ListLeasesAsync();

Task CreateLeaseIfNotExistAsync(string partitionId);

Task<T> GetLeaseAsync(string partitionId);

Task<bool> RenewAsync(T lease);

Task<bool> AcquireAsync(T lease, string owner);
Expand All @@ -38,4 +40,4 @@ interface ILeaseManager<T> where T : Lease

Task<bool> UpdateAsync(T lease);
}
}
}
25 changes: 7 additions & 18 deletions src/DurableTask.AzureStorage/Partitioning/Lease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

namespace DurableTask.AzureStorage.Partitioning
{
using System.Threading.Tasks;

/// <summary>Contains partition ownership information.</summary>
class Lease
{
Expand All @@ -23,9 +21,9 @@ public Lease()
{
}

/// <summary>Initializes a new instance of the
/// <see cref="DurableTask.AzureStorage.Partitioning.Lease" /> class with the specified
/// <see cref="DurableTask.AzureStorage.Partitioning.Lease(DurableTask.AzureStorage.Partitioning.Lease)" /> value as reference.</summary>
/// <summary>Initializes a new instance of the
/// <see cref="DurableTask.AzureStorage.Partitioning.Lease" /> class with the specified
/// <see cref="DurableTask.AzureStorage.Partitioning.Lease(DurableTask.AzureStorage.Partitioning.Lease)" /> value as reference.</summary>
/// <param name="source">The specified <see cref="DurableTask.AzureStorage.Partitioning.Lease(DurableTask.AzureStorage.Partitioning.Lease)" /> instance where its property values will be copied from.</param>
public Lease(Lease source)
{
Expand All @@ -43,13 +41,13 @@ public Lease(Lease source)
/// <value>The host owner of the partition.</value>
public string Owner { get; set; }

/// <summary>Gets or sets the lease token that manages concurrency between hosts. You can use this token to guarantee single access to any resource needed by the
/// <see cref="DurableTask.AzureStorage.AzureStorageOrchestrationService" /> object.</summary>
/// <summary>Gets or sets the lease token that manages concurrency between hosts. You can use this token to guarantee single access to any resource needed by the
/// <see cref="DurableTask.AzureStorage.AzureStorageOrchestrationService" /> object.</summary>
/// <value>The lease token.</value>
public string Token { get; set; }

/// <summary>Gets or sets the epoch year of the lease, which is a value
/// you can use to determine the most recent owner of a partition between competing nodes.</summary>
/// <summary>Gets or sets the epoch year of the lease, which is a value
/// you can use to determine the most recent owner of a partition between competing nodes.</summary>
/// <value>The epoch year of the lease.</value>
public long Epoch { get; set; }

Expand All @@ -60,15 +58,6 @@ public virtual bool IsExpired()
return false;
}

/// <summary>
/// Used to download fresh data from the lease.
/// </summary>
public virtual Task DownloadLeaseAsync()
{
return Task.CompletedTask;
}


/// <summary>Determines whether this instance is equal to the specified object.</summary>
/// <param name="obj">The object to compare.</param>
/// <returns>true if this instance is equal to the specified object; otherwise, false.</returns>
Expand Down
Loading

0 comments on commit 1abc088

Please sign in to comment.