diff --git a/build/version.cake b/build/version.cake index b040d186e8e..7207bb51f52 100644 --- a/build/version.cake +++ b/build/version.cake @@ -41,10 +41,6 @@ public class BuildVersion suffix += $".{buildNumber}"; metadata = $"+sha.{commitHash}"; } - else - { - metadata = $"+build.{buildNumber}.sha.{commitHash}"; - } } diff --git a/src/MassTransit/Configuration/AuditConfigurationExtensions.cs b/src/MassTransit/Configuration/AuditConfigurationExtensions.cs index ce0869421a3..39e3992cebc 100644 --- a/src/MassTransit/Configuration/AuditConfigurationExtensions.cs +++ b/src/MassTransit/Configuration/AuditConfigurationExtensions.cs @@ -18,10 +18,9 @@ public static class AuditConfigurationExtensions /// The bus /// Audit store /// Filter configuration delegate - /// Message metadata factory. If omited, the default one will be used. + /// Message metadata factory. If omitted, the default one will be used. public static ConnectHandle ConnectSendAuditObservers(this T connector, IMessageAuditStore store, - Action configureFilter = null, - ISendMetadataFactory metadataFactory = null) + Action configureFilter = null, ISendMetadataFactory metadataFactory = null) where T : ISendObserverConnector, IPublishObserverConnector { var specification = new SendMessageFilterSpecification(); @@ -41,7 +40,7 @@ public static ConnectHandle ConnectSendAuditObservers(this T connector, IMess /// The bus or endpoint /// The audit store /// Filter configuration delegate - /// Message metadata factory. If omited, the default one will be used. + /// Message metadata factory. If omitted, the default one will be used. public static ConnectHandle ConnectConsumeAuditObserver(this IConsumeObserverConnector connector, IMessageAuditStore store, Action configureFilter = null, IConsumeMetadataFactory metadataFactory = null) { diff --git a/src/Persistence/MassTransit.Azure.Table/Audit/AuditRecord.cs b/src/Persistence/MassTransit.Azure.Table/Audit/AuditRecord.cs index 935bdb32aa6..4368e937eeb 100644 --- a/src/Persistence/MassTransit.Azure.Table/Audit/AuditRecord.cs +++ b/src/Persistence/MassTransit.Azure.Table/Audit/AuditRecord.cs @@ -1,15 +1,23 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Audit { using System; - using System.Collections.Generic; using System.Text; - using Audit; + using MassTransit.Audit; + using Metadata; using Microsoft.Azure.Cosmos.Table; using Newtonsoft.Json; - public class AuditRecord : TableEntity + public class AuditRecord : + TableEntity { + static readonly char[] _disallowedCharacters; + + static AuditRecord() + { + _disallowedCharacters = new[] {'/', '\\', '#', '?'}; + } + public Guid? MessageId { get; set; } public Guid? ConversationId { get; set; } public Guid? CorrelationId { get; set; } @@ -27,14 +35,12 @@ public class AuditRecord : TableEntity public string Headers { get; set; } public string Message { get; set; } - internal static AuditRecord Create(T message, string messageType, MessageAuditMetadata metadata, - Func partitionKeyStrategy) + internal static AuditRecord Create(T message, MessageAuditMetadata metadata, IPartitionKeyFormatter partitionKeyFormatter) where T : class { var record = new AuditRecord { - RowKey = - $"{DateTime.MaxValue.Subtract(metadata.SentTime ?? DateTime.UtcNow).TotalMilliseconds}", + RowKey = $"{DateTime.MaxValue.Subtract(metadata.SentTime ?? DateTime.UtcNow).TotalMilliseconds}", ContextType = metadata.ContextType, MessageId = metadata.MessageId, ConversationId = metadata.ConversationId, @@ -50,30 +56,35 @@ internal static AuditRecord Create(T message, string messageType, MessageAudi Headers = JsonConvert.SerializeObject(metadata.Headers), Custom = JsonConvert.SerializeObject(metadata.Custom), Message = JsonConvert.SerializeObject(message), - MessageType = messageType + MessageType = TypeMetadataCache.ShortName }; - record.PartitionKey = CleanDisallowedPartitionKeyCharacters(partitionKeyStrategy.Invoke(messageType, record)); + + record.PartitionKey = SanitizePartitionKey(partitionKeyFormatter.Format(record)); + return record; } - /// - /// - /// - /// - static string CleanDisallowedPartitionKeyCharacters(string candidatePartitionKey) + static string SanitizePartitionKey(string partitionKey) { - var disallowedCharacters = new HashSet - { - '/', - '\\', - '#', - '?' - }; - var key = new StringBuilder(); - foreach (var character in candidatePartitionKey) + if (partitionKey.IndexOfAny(_disallowedCharacters) < 0) + return partitionKey; + + var key = new StringBuilder(partitionKey.Length); + + foreach (var c in partitionKey) { - if (!disallowedCharacters.Contains(character)) - key.Append(character); + switch (c) + { + case '/': + case '\\': + case '#': + case '?': + continue; + + default: + key.Append(c); + break; + } } return key.ToString(); diff --git a/src/Persistence/MassTransit.Azure.Table/Configuration/Audit/AzureTableAuditBusObserver.cs b/src/Persistence/MassTransit.Azure.Table/Audit/AzureTableAuditBusObserver.cs similarity index 78% rename from src/Persistence/MassTransit.Azure.Table/Configuration/Audit/AzureTableAuditBusObserver.cs rename to src/Persistence/MassTransit.Azure.Table/Audit/AzureTableAuditBusObserver.cs index 0e520925afd..a4f2e62b853 100644 --- a/src/Persistence/MassTransit.Azure.Table/Configuration/Audit/AzureTableAuditBusObserver.cs +++ b/src/Persistence/MassTransit.Azure.Table/Audit/AzureTableAuditBusObserver.cs @@ -1,4 +1,4 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Audit { using System; using System.Threading.Tasks; @@ -11,24 +11,24 @@ public class AzureTableAuditBusObserver : IBusObserver { readonly Action _filter; - readonly Func _partitionKeyStrategy; + readonly IPartitionKeyFormatter _partitionKeyFormatter; readonly CloudTable _table; - public AzureTableAuditBusObserver(CloudTable table, Action filter, - Func partitionKeyStrategy) + public AzureTableAuditBusObserver(CloudTable table, Action filter, IPartitionKeyFormatter partitionKeyFormatter) { _table = table; - _partitionKeyStrategy = partitionKeyStrategy; _filter = filter; + _partitionKeyFormatter = partitionKeyFormatter; } public async Task PostCreate(IBus bus) { - LogContext.Debug?.Log($"Connecting Azure Table Audit Store against table {_table.Name}"); - var auditStore = new AzureTableAuditStore(_table, _partitionKeyStrategy); + LogContext.Debug?.Log($"Connecting Azure Table Audit Store: {_table.Name}"); + + var auditStore = new AzureTableAuditStore(_table, _partitionKeyFormatter); + bus.ConnectSendAuditObservers(auditStore, _filter); bus.ConnectConsumeAuditObserver(auditStore, _filter); - LogContext.Debug?.Log($"Azure Table Audit store connected. {_table.Name}"); } public Task CreateFaulted(Exception exception) diff --git a/src/Persistence/MassTransit.Azure.Table/Audit/AzureTableAuditStore.cs b/src/Persistence/MassTransit.Azure.Table/Audit/AzureTableAuditStore.cs index 9f424fe9ab2..61279bccc98 100644 --- a/src/Persistence/MassTransit.Azure.Table/Audit/AzureTableAuditStore.cs +++ b/src/Persistence/MassTransit.Azure.Table/Audit/AzureTableAuditStore.cs @@ -1,26 +1,25 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Audit { - using System; using System.Threading.Tasks; - using Audit; - using Metadata; + using MassTransit.Audit; using Microsoft.Azure.Cosmos.Table; - public class AzureTableAuditStore : IMessageAuditStore + public class AzureTableAuditStore : + IMessageAuditStore { - readonly Func _partitionKeyStrategy; + readonly IPartitionKeyFormatter _partitionKeyFormatter; readonly CloudTable _table; - public AzureTableAuditStore(CloudTable table, Func partitionKeyStrategy) + public AzureTableAuditStore(CloudTable table, IPartitionKeyFormatter partitionKeyFormatter) { _table = table; - _partitionKeyStrategy = partitionKeyStrategy; + _partitionKeyFormatter = partitionKeyFormatter; } Task IMessageAuditStore.StoreMessage(T message, MessageAuditMetadata metadata) { - var auditRecord = AuditRecord.Create(message, TypeMetadataCache.ShortName, metadata, _partitionKeyStrategy); + var auditRecord = AuditRecord.Create(message, metadata, _partitionKeyFormatter); var insertOrMergeOperation = TableOperation.InsertOrMerge(auditRecord); return _table.ExecuteAsync(insertOrMergeOperation); } diff --git a/src/Persistence/MassTransit.Azure.Table/Audit/DefaultPartitionKeyFormatter.cs b/src/Persistence/MassTransit.Azure.Table/Audit/DefaultPartitionKeyFormatter.cs new file mode 100644 index 00000000000..fff6d0ce83d --- /dev/null +++ b/src/Persistence/MassTransit.Azure.Table/Audit/DefaultPartitionKeyFormatter.cs @@ -0,0 +1,12 @@ +namespace MassTransit.Azure.Table.Audit +{ + public class DefaultPartitionKeyFormatter : + IPartitionKeyFormatter + { + public string Format(AuditRecord record) + where T : class + { + return record.ContextType; + } + } +} diff --git a/src/Persistence/MassTransit.Azure.Table/Audit/IPartitionKeyFormatter.cs b/src/Persistence/MassTransit.Azure.Table/Audit/IPartitionKeyFormatter.cs new file mode 100644 index 00000000000..8e3642b6f65 --- /dev/null +++ b/src/Persistence/MassTransit.Azure.Table/Audit/IPartitionKeyFormatter.cs @@ -0,0 +1,8 @@ +namespace MassTransit.Azure.Table.Audit +{ + public interface IPartitionKeyFormatter + { + string Format(AuditRecord record) + where T : class; + } +} diff --git a/src/Persistence/MassTransit.Azure.Table/Configuration/Audit/AzureTableAuditStoreConfiguratorExtensions.cs b/src/Persistence/MassTransit.Azure.Table/Configuration/AzureTableAuditStoreConfiguratorExtensions.cs similarity index 66% rename from src/Persistence/MassTransit.Azure.Table/Configuration/Audit/AzureTableAuditStoreConfiguratorExtensions.cs rename to src/Persistence/MassTransit.Azure.Table/Configuration/AzureTableAuditStoreConfiguratorExtensions.cs index 1abe5b9467c..658b09898dd 100644 --- a/src/Persistence/MassTransit.Azure.Table/Configuration/Audit/AzureTableAuditStoreConfiguratorExtensions.cs +++ b/src/Persistence/MassTransit.Azure.Table/Configuration/AzureTableAuditStoreConfiguratorExtensions.cs @@ -1,7 +1,8 @@ namespace MassTransit { using System; - using Azure.Table; + using Azure.Table.Audit; + using BusConfigurators; using Microsoft.Azure.Cosmos.Table; @@ -18,7 +19,8 @@ public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configur var tableClient = storageAccount.CreateCloudTableClient(); var table = tableClient.GetTableReference(auditTableName); table.CreateIfNotExists(); - ConfigureAuditStore(configurator, table, null, DefaultPartitionKeyStrategy); + + ConfigureAuditStore(configurator, table); } /// @@ -34,59 +36,66 @@ public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configur var tableClient = storageAccount.CreateCloudTableClient(); var table = tableClient.GetTableReference(auditTableName); table.CreateIfNotExists(); - ConfigureAuditStore(configurator, table, configureFilter, DefaultPartitionKeyStrategy); + + ConfigureAuditStore(configurator, table, configureFilter); } /// - /// Supply your storage account, table name and partition key strategy based on the message type and audit information. No Filters will be applied. + /// Supply your storage account, table name and partition key strategy based on the message type and audit information. No Filters will be applied. /// /// /// Your cloud storage account. /// The name of the table for which the Audit Logs will be persisted. - /// Using the message type and audit information or otherwise, specify the partition key strategy + /// + /// Using the message type and audit information or otherwise, specify the partition key strategy + /// public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudStorageAccount storageAccount, string auditTableName, - Func partitionKeyStrategy) + IPartitionKeyFormatter partitionKeyFormatter) { var tableClient = storageAccount.CreateCloudTableClient(); var table = tableClient.GetTableReference(auditTableName); table.CreateIfNotExists(); - ConfigureAuditStore(configurator, table, null, partitionKeyStrategy); + + ConfigureAuditStore(configurator, table, null, partitionKeyFormatter); } /// - /// Supply your storage account, table name, partition key strategy and message filter to be applied. + /// Supply your storage account, table name, partition key strategy and message filter to be applied. /// /// /// Your cloud storage account. /// The name of the table for which the Audit Logs will be persisted. - /// Using the message type and audit information or otherwise, specify the partition key strategy + /// + /// Using the message type and audit information or otherwise, specify the partition key strategy + /// /// Message Filter to exclude or include messages from audit based on requirements public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudStorageAccount storageAccount, string auditTableName, - Func partitionKeyStrategy, Action configureFilter) + IPartitionKeyFormatter partitionKeyFormatter, Action configureFilter) { var tableClient = storageAccount.CreateCloudTableClient(); var table = tableClient.GetTableReference(auditTableName); table.CreateIfNotExists(); - Func pkStrategy = partitionKeyStrategy ?? DefaultPartitionKeyStrategy; - ConfigureAuditStore(configurator, table, configureFilter, pkStrategy); + + ConfigureAuditStore(configurator, table, configureFilter, partitionKeyFormatter); } /// - /// Supply your table, partition key strategy and message filter to be applied. + /// Supply your table, partition key strategy and message filter to be applied. /// /// /// Your Azure Cloud Table - /// Using the message type and audit information or otherwise, specify the partition key strategy + /// + /// Using the message type and audit information or otherwise, specify the partition key strategy + /// /// Message Filter to exclude or include messages from audit based on requirements public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table, - Func partitionKeyStrategy, Action configureFilter) + IPartitionKeyFormatter partitionKeyFormatter, Action configureFilter) { - Func pkStrategy = partitionKeyStrategy ?? DefaultPartitionKeyStrategy; - ConfigureAuditStore(configurator, table, configureFilter, pkStrategy); + ConfigureAuditStore(configurator, table, configureFilter, partitionKeyFormatter); } /// - /// Supply your table and message filter to be applied. Default Partition Key Strategy will be used. + /// Supply your table and message filter to be applied. Default Partition Key Strategy will be used. /// /// /// Your Azure Cloud Table @@ -94,35 +103,31 @@ public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configur public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table, Action configureFilter) { - ConfigureAuditStore(configurator, table, configureFilter, DefaultPartitionKeyStrategy); + ConfigureAuditStore(configurator, table, configureFilter); } /// - /// Supply your table and partition key strategy. No message filter will be applied + /// Supply your table and partition key strategy. No message filter will be applied /// /// /// Your Azure Cloud Table - /// Using the message type and audit information or otherwise, specify the partition key strategy - public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table, - Func partitionKeyStrategy) + /// + /// Using the message type and audit information or otherwise, specify the partition key strategy + /// + public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table, IPartitionKeyFormatter partitionKeyFormatter) { - ConfigureAuditStore(configurator, table, null, partitionKeyStrategy); + ConfigureAuditStore(configurator, table, null, partitionKeyFormatter); } public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table) { - ConfigureAuditStore(configurator, table, null, DefaultPartitionKeyStrategy); - } - - static void ConfigureAuditStore(IBusFactoryConfigurator configurator, CloudTable table, Action configureFilter, - Func partitionKeyStrategy) - { - configurator.ConnectBusObserver(new AzureTableAuditBusObserver(table, configureFilter, partitionKeyStrategy)); + ConfigureAuditStore(configurator, table); } - static string DefaultPartitionKeyStrategy(string messageType, AuditRecord record) + static void ConfigureAuditStore(IBusObserverConnector configurator, CloudTable table, Action configureFilter = default, + IPartitionKeyFormatter formatter = default) { - return record.ContextType; + configurator.ConnectBusObserver(new AzureTableAuditBusObserver(table, configureFilter, formatter ?? new DefaultPartitionKeyFormatter())); } } } diff --git a/src/Persistence/MassTransit.Azure.Table/Saga/AzureTableRepositoryRegistrationExtensions.cs b/src/Persistence/MassTransit.Azure.Table/Configuration/AzureTableRepositoryRegistrationExtensions.cs similarity index 66% rename from src/Persistence/MassTransit.Azure.Table/Saga/AzureTableRepositoryRegistrationExtensions.cs rename to src/Persistence/MassTransit.Azure.Table/Configuration/AzureTableRepositoryRegistrationExtensions.cs index 78ec5e7ec5b..24e7e126065 100644 --- a/src/Persistence/MassTransit.Azure.Table/Saga/AzureTableRepositoryRegistrationExtensions.cs +++ b/src/Persistence/MassTransit.Azure.Table/Configuration/AzureTableRepositoryRegistrationExtensions.cs @@ -2,6 +2,8 @@ namespace MassTransit { using System; using Azure.Table; + using Azure.Table.Configurators; + using Azure.Table.Saga; using Configurators; @@ -18,13 +20,13 @@ public static ISagaRegistrationConfigurator AzureTableRepository(this ISag Action> configure = null) where T : class, IVersionedSaga { - var cosmosTableConfigurator = new AzureTableSagaRepositoryConfigurator(); + var sagaRepositoryConfigurator = new AzureTableSagaRepositoryConfigurator(); - configure?.Invoke(cosmosTableConfigurator); + configure?.Invoke(sagaRepositoryConfigurator); - BusConfigurationResult.CompileResults(cosmosTableConfigurator.Validate()); + BusConfigurationResult.CompileResults(sagaRepositoryConfigurator.Validate()); - configurator.Repository(x => cosmosTableConfigurator.Register(x)); + configurator.Repository(x => sagaRepositoryConfigurator.Register(x)); return configurator; } diff --git a/src/Persistence/MassTransit.Azure.Table/Configuration/Saga/AzureTableSagaRepositoryConfigurator.cs b/src/Persistence/MassTransit.Azure.Table/Configuration/Configurators/AzureTableSagaRepositoryConfigurator.cs similarity index 90% rename from src/Persistence/MassTransit.Azure.Table/Configuration/Saga/AzureTableSagaRepositoryConfigurator.cs rename to src/Persistence/MassTransit.Azure.Table/Configuration/Configurators/AzureTableSagaRepositoryConfigurator.cs index 5e890bba448..764b004fe79 100644 --- a/src/Persistence/MassTransit.Azure.Table/Configuration/Saga/AzureTableSagaRepositoryConfigurator.cs +++ b/src/Persistence/MassTransit.Azure.Table/Configuration/Configurators/AzureTableSagaRepositoryConfigurator.cs @@ -1,8 +1,10 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Configurators { using System; using System.Collections.Generic; + using Contexts; using GreenPipes; + using MassTransit.Saga; using Microsoft.Azure.Cosmos.Table; using Registration; using Saga; @@ -11,13 +13,10 @@ namespace MassTransit.Azure.Table public class AzureTableSagaRepositoryConfigurator : IAzureTableSagaRepositoryConfigurator, ISpecification + where TSaga : class, ISaga { Func _connectionFactory; - public AzureTableSagaRepositoryConfigurator() - { - } - /// /// Supply factory for retrieving the Cloud Table. /// diff --git a/src/Persistence/MassTransit.Azure.Table/Configuration/Saga/IAzureTableSagaRepositoryConfigurator.cs b/src/Persistence/MassTransit.Azure.Table/Configuration/IAzureTableSagaRepositoryConfigurator.cs similarity index 89% rename from src/Persistence/MassTransit.Azure.Table/Configuration/Saga/IAzureTableSagaRepositoryConfigurator.cs rename to src/Persistence/MassTransit.Azure.Table/Configuration/IAzureTableSagaRepositoryConfigurator.cs index 130a650b120..23718569e57 100644 --- a/src/Persistence/MassTransit.Azure.Table/Configuration/Saga/IAzureTableSagaRepositoryConfigurator.cs +++ b/src/Persistence/MassTransit.Azure.Table/Configuration/IAzureTableSagaRepositoryConfigurator.cs @@ -1,9 +1,17 @@ namespace MassTransit.Azure.Table { using System; + using MassTransit.Saga; using Microsoft.Azure.Cosmos.Table; + public interface IAzureTableSagaRepositoryConfigurator : + IAzureTableSagaRepositoryConfigurator + where TSaga : class, ISaga + { + } + + public interface IAzureTableSagaRepositoryConfigurator { /// @@ -12,10 +20,4 @@ public interface IAzureTableSagaRepositoryConfigurator /// void ConnectionFactory(Func connectionFactory); } - - - public interface IAzureTableSagaRepositoryConfigurator : - IAzureTableSagaRepositoryConfigurator - { - } } diff --git a/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableDatabaseContext.cs b/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableDatabaseContext.cs index f2f04e313d2..736f3cbd56a 100644 --- a/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableDatabaseContext.cs +++ b/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableDatabaseContext.cs @@ -1,10 +1,11 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Contexts { using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Table; + using Saga; public class AzureTableDatabaseContext : @@ -22,20 +23,34 @@ public AzureTableDatabaseContext(CloudTable table) public Task Add(SagaConsumeContext context) { - return Insert(context.Saga); + return Insert(context.Saga, context.CancellationToken); } - public Task Insert(TSaga instance) + public async Task Insert(TSaga instance, CancellationToken cancellationToken) { - Guid? rowKey = instance.CorrelationId; - IDictionary dynamicEntity = TableEntity.Flatten(instance, new OperationContext()); - return _table.ExecuteAsync(TableOperation.InsertOrReplace(new DynamicTableEntity(_partitionKey, rowKey.ToString()) {Properties = dynamicEntity}), - CancellationToken.None); + IDictionary entityProperties = TableEntity.Flatten(instance, new OperationContext()); + + var operation = TableOperation.Insert(new DynamicTableEntity(_partitionKey, instance.CorrelationId.ToString()) {Properties = entityProperties}); + var result = await _table.ExecuteAsync(operation, cancellationToken).ConfigureAwait(false); + + instance.ETag = result.Etag; } - public Task Load(Guid correlationId) + public async Task Load(Guid correlationId, CancellationToken cancellationToken) { - return Get(correlationId); + var operation = TableOperation.Retrieve(_partitionKey, correlationId.ToString()); + var result = await _table.ExecuteAsync(operation, cancellationToken).ConfigureAwait(false); + + if (result.Result is DynamicTableEntity tableEntity) + { + var instance = TableEntity.ConvertBack(tableEntity.Properties, new OperationContext()); + + instance.ETag = tableEntity.ETag; + + return instance; + } + + return default; } public async Task Update(SagaConsumeContext context) @@ -44,18 +59,13 @@ public async Task Update(SagaConsumeContext context) try { - var existingInstanceTableEntity = await GetTableEntity(instance.CorrelationId).ConfigureAwait(false); - var existingInstance = ConvertTableEntityToSagaInstance(existingInstanceTableEntity); - existingInstance.ETag = existingInstanceTableEntity.ETag; - - Guid? rowKey = instance.CorrelationId; - IDictionary dynamicEntity = TableEntity.Flatten(instance, new OperationContext()); - var replacementOperation = TableOperation.Replace(new DynamicTableEntity(_partitionKey, rowKey.ToString()) + IDictionary entityProperties = TableEntity.Flatten(instance, new OperationContext()); + var operation = TableOperation.Replace(new DynamicTableEntity(_partitionKey, instance.CorrelationId.ToString()) { - Properties = dynamicEntity, - ETag = existingInstance.ETag + Properties = entityProperties, + ETag = instance.ETag }); - await _table.ExecuteAsync(replacementOperation, context.CancellationToken).ConfigureAwait(false); + await _table.ExecuteAsync(operation, context.CancellationToken).ConfigureAwait(false); } catch (Exception exception) { @@ -63,39 +73,15 @@ public async Task Update(SagaConsumeContext context) } } - public Task Delete(SagaConsumeContext context) - { - return Delete(context.Saga.CorrelationId); - } - - public ValueTask DisposeAsync() - { - return default; - } - - async Task GetTableEntity(Guid correlationId) + public async Task Delete(SagaConsumeContext context) { - Guid? rowKey = correlationId; - var op = TableOperation.Retrieve(_partitionKey, rowKey.ToString()); - var entity = await _table.ExecuteAsync(op); - return entity.Result as DynamicTableEntity; - } + var instance = context.Saga; - async Task Get(Guid correlationId) - { - var tableEntity = await GetTableEntity(correlationId).ConfigureAwait(false); - return ConvertTableEntityToSagaInstance(tableEntity); - } + IDictionary entityProperties = TableEntity.Flatten(instance, new OperationContext()); - static TSaga ConvertTableEntityToSagaInstance(DynamicTableEntity tableEntity) - { - return tableEntity != null ? TableEntity.ConvertBack(tableEntity.Properties, new OperationContext()) : null; - } + var operation = TableOperation.Delete(new DynamicTableEntity(_partitionKey, instance.CorrelationId.ToString(), instance.ETag, entityProperties)); - async Task Delete(Guid correlationId) - { - var tableEntity = await GetTableEntity(correlationId).ConfigureAwait(false); - await _table.ExecuteAsync(TableOperation.Delete(tableEntity)); + await _table.ExecuteAsync(operation, context.CancellationToken).ConfigureAwait(false); } } } diff --git a/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContext.cs b/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContext.cs index 23b3f851b5b..8022cb48c5d 100644 --- a/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContext.cs +++ b/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContext.cs @@ -1,18 +1,18 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Contexts { using System; using System.Threading; using System.Threading.Tasks; using Context; using GreenPipes; + using MassTransit.Saga; using Saga; using Util; public class AzureTableSagaRepositoryContext : ConsumeContextScope, - SagaRepositoryContext, - IAsyncDisposable + SagaRepositoryContext where TSaga : class, IVersionedSaga where TMessage : class { @@ -29,11 +29,6 @@ public AzureTableSagaRepositoryContext(DatabaseContext context, ConsumeCo _factory = factory; } - public ValueTask DisposeAsync() - { - return _context.DisposeAsync(); - } - public Task> Add(TSaga instance) { return _factory.CreateSagaConsumeContext(_context, _consumeContext, instance, SagaConsumeContextMode.Add); @@ -43,7 +38,7 @@ public async Task> Insert(TSaga instance) { try { - await _context.Insert(instance).ConfigureAwait(false); + await _context.Insert(instance, CancellationToken).ConfigureAwait(false); _consumeContext.LogInsert(instance.CorrelationId); @@ -59,7 +54,7 @@ public async Task> Insert(TSaga instance) public async Task> Load(Guid correlationId) { - var instance = await _context.Load(correlationId).ConfigureAwait(false); + var instance = await _context.Load(correlationId, CancellationToken).ConfigureAwait(false); if (instance == null) return default; @@ -96,8 +91,7 @@ public Task> CreateSagaConsumeContext(ConsumeCon public class CosmosTableSagaRepositoryContext : BasePipeContext, - SagaRepositoryContext, - IAsyncDisposable + SagaRepositoryContext where TSaga : class, IVersionedSaga { readonly DatabaseContext _context; @@ -108,11 +102,6 @@ public CosmosTableSagaRepositoryContext(DatabaseContext context, Cancella _context = context; } - public ValueTask DisposeAsync() - { - return _context.DisposeAsync(); - } - public Task> Query(ISagaQuery query, CancellationToken cancellationToken) { throw new NotImplementedByDesignException("Azure Table saga repository does not support queries"); @@ -120,7 +109,7 @@ public Task> Query(ISagaQuery query, Ca public Task Load(Guid correlationId) { - return _context.Load(correlationId); + return _context.Load(correlationId, CancellationToken); } } } diff --git a/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContextFactory.cs b/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContextFactory.cs index feb12a27c73..2059ceaa249 100644 --- a/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContextFactory.cs +++ b/src/Persistence/MassTransit.Azure.Table/Contexts/AzureTableSagaRepositoryContextFactory.cs @@ -1,9 +1,10 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Contexts { using System; using System.Threading; using System.Threading.Tasks; using GreenPipes; + using MassTransit.Saga; using Microsoft.Azure.Cosmos.Table; using Saga; @@ -43,16 +44,9 @@ public async Task Send(ConsumeContext context, IPipe(database); - try - { - var repositoryContext = new AzureTableSagaRepositoryContext(databaseContext, context, _factory); + var repositoryContext = new AzureTableSagaRepositoryContext(databaseContext, context, _factory); - await next.Send(repositoryContext).ConfigureAwait(false); - } - finally - { - await databaseContext.DisposeAsync().ConfigureAwait(false); - } + await next.Send(repositoryContext).ConfigureAwait(false); } public async Task SendQuery(ConsumeContext context, ISagaQuery query, IPipe> next) @@ -67,16 +61,9 @@ public async Task Execute(Func, Task> asyn var database = _databaseFactory(); var databaseContext = new AzureTableDatabaseContext(database); - try - { - var repositoryContext = new CosmosTableSagaRepositoryContext(databaseContext, cancellationToken); + var repositoryContext = new CosmosTableSagaRepositoryContext(databaseContext, cancellationToken); - return await asyncMethod(repositoryContext).ConfigureAwait(false); - } - finally - { - await databaseContext.DisposeAsync().ConfigureAwait(false); - } + return await asyncMethod(repositoryContext).ConfigureAwait(false); } } } diff --git a/src/Persistence/MassTransit.Azure.Table/Contexts/DatabaseContext.cs b/src/Persistence/MassTransit.Azure.Table/Contexts/DatabaseContext.cs index 38cbb64bd06..f41c510dfe4 100644 --- a/src/Persistence/MassTransit.Azure.Table/Contexts/DatabaseContext.cs +++ b/src/Persistence/MassTransit.Azure.Table/Contexts/DatabaseContext.cs @@ -1,18 +1,19 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Contexts { using System; + using System.Threading; using System.Threading.Tasks; + using Saga; - public interface DatabaseContext : - IAsyncDisposable + public interface DatabaseContext where TSaga : class, IVersionedSaga { Task Add(SagaConsumeContext context); - Task Insert(TSaga instance); + Task Insert(TSaga instance, CancellationToken cancellationToken); - Task Load(Guid correlationId); + Task Load(Guid correlationId, CancellationToken cancellationToken); Task Update(SagaConsumeContext context); diff --git a/src/Persistence/MassTransit.Azure.Table/MassTransit.Azure.Table.csproj.DotSettings b/src/Persistence/MassTransit.Azure.Table/MassTransit.Azure.Table.csproj.DotSettings new file mode 100644 index 00000000000..3f1b8e3e290 --- /dev/null +++ b/src/Persistence/MassTransit.Azure.Table/MassTransit.Azure.Table.csproj.DotSettings @@ -0,0 +1,3 @@ + + True + True \ No newline at end of file diff --git a/src/Persistence/MassTransit.Azure.Table/Saga/AzureTableSagaRepository.cs b/src/Persistence/MassTransit.Azure.Table/Saga/AzureTableSagaRepository.cs index 636c74f7ac5..6227df002ec 100644 --- a/src/Persistence/MassTransit.Azure.Table/Saga/AzureTableSagaRepository.cs +++ b/src/Persistence/MassTransit.Azure.Table/Saga/AzureTableSagaRepository.cs @@ -1,8 +1,9 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Saga { using System; + using Contexts; + using MassTransit.Saga; using Microsoft.Azure.Cosmos.Table; - using Saga; public static class AzureTableSagaRepository diff --git a/src/Persistence/MassTransit.Azure.Table/Saga/IVersionedSaga.cs b/src/Persistence/MassTransit.Azure.Table/Saga/IVersionedSaga.cs index 803ae98cbcd..b4157e552cb 100644 --- a/src/Persistence/MassTransit.Azure.Table/Saga/IVersionedSaga.cs +++ b/src/Persistence/MassTransit.Azure.Table/Saga/IVersionedSaga.cs @@ -1,6 +1,6 @@ -namespace MassTransit.Azure.Table +namespace MassTransit.Azure.Table.Saga { - using Saga; + using MassTransit.Saga; public interface IVersionedSaga : diff --git a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_ConsumeRecords_Specs.cs b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_ConsumeRecords_Specs.cs index 91a90407b34..c73df499674 100644 --- a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_ConsumeRecords_Specs.cs +++ b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_ConsumeRecords_Specs.cs @@ -6,6 +6,7 @@ using GreenPipes.Util; using NUnit.Framework; using Shouldly; + using Table.Audit; [TestFixture] diff --git a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Filter_Specs .cs b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Filter_Specs .cs index 671df2dca22..a1786aa374a 100644 --- a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Filter_Specs .cs +++ b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Filter_Specs .cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using NUnit.Framework; using Shouldly; + using Table.Audit; [TestFixture] @@ -37,7 +38,7 @@ public async Task SetUp() protected override void ConfigureInMemoryBus(IInMemoryBusFactoryConfigurator configurator) { - configurator.UseAzureTableAuditStore(TestCloudTable,builder => builder.Exclude(typeof(B))); + configurator.UseAzureTableAuditStore(TestCloudTable, builder => builder.Exclude(typeof(B))); base.ConfigureInMemoryBus(configurator); } diff --git a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_PartitionKey_Specs.cs b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_PartitionKey_Specs.cs index 47aad3bac13..38b4246bd9a 100644 --- a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_PartitionKey_Specs.cs +++ b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_PartitionKey_Specs.cs @@ -7,6 +7,7 @@ using Microsoft.Azure.Cosmos.Table; using NUnit.Framework; using Shouldly; + using Table.Audit; [TestFixture] @@ -32,8 +33,7 @@ public async Task SetUp() protected override void ConfigureInMemoryBus(IInMemoryBusFactoryConfigurator configurator) { - configurator.UseAzureTableAuditStore(TestCloudTable,(messageType, record) => PartitionKey); - base.ConfigureInMemoryBus(configurator); + configurator.UseAzureTableAuditStore(TestCloudTable, new ConstantPartitionKeyFormatter(PartitionKey)); } protected override void ConfigureInMemoryReceiveEndpoint(IInMemoryReceiveEndpointConfigurator configurator) @@ -42,6 +42,24 @@ protected override void ConfigureInMemoryReceiveEndpoint(IInMemoryReceiveEndpoin } + class ConstantPartitionKeyFormatter : + IPartitionKeyFormatter + { + readonly string _partitionKey; + + public ConstantPartitionKeyFormatter(string partitionKey) + { + _partitionKey = partitionKey; + } + + public string Format(AuditRecord record) + where T : class + { + return _partitionKey; + } + } + + class A { } diff --git a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_SendRecords_Specs.cs b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_SendRecords_Specs.cs index d42d7385652..0af3b0ea36a 100644 --- a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_SendRecords_Specs.cs +++ b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_SendRecords_Specs.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using NUnit.Framework; using Shouldly; + using Table.Audit; [TestFixture] diff --git a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Specs.cs b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Specs.cs index 86f2be665bb..c1a9c43cf39 100644 --- a/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Specs.cs +++ b/tests/MassTransit.Azure.Table.Tests/Audit/AuditStore_Specs.cs @@ -6,6 +6,7 @@ using GreenPipes.Util; using NUnit.Framework; using Shouldly; + using Table.Audit; [TestFixture] diff --git a/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_storage_account.cs b/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_storage_account.cs index 43d25ce1d4a..8a412f0e099 100644 --- a/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_storage_account.cs +++ b/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_storage_account.cs @@ -7,6 +7,7 @@ using Microsoft.Azure.Cosmos.Table; using NUnit.Framework; using Shouldly; + using Table.Audit; [TestFixture] diff --git a/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_table.cs b/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_table.cs index f7f248d1e9c..d019e9ac59a 100644 --- a/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_table.cs +++ b/tests/MassTransit.Azure.Table.Tests/Audit/Configure_audit_store_supply_table.cs @@ -7,6 +7,7 @@ using Microsoft.Azure.Cosmos.Table; using NUnit.Framework; using Shouldly; + using Table.Audit; [TestFixture] diff --git a/tests/MassTransit.Azure.Table.Tests/AzureTableInMemoryTestFixture.cs b/tests/MassTransit.Azure.Table.Tests/AzureTableInMemoryTestFixture.cs index 2d8f4593b1f..377d0c5d638 100644 --- a/tests/MassTransit.Azure.Table.Tests/AzureTableInMemoryTestFixture.cs +++ b/tests/MassTransit.Azure.Table.Tests/AzureTableInMemoryTestFixture.cs @@ -2,6 +2,7 @@ namespace MassTransit.Azure.Table.Tests { using System.Collections.Generic; using System.Linq; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Table; using NUnit.Framework; using TestFramework; @@ -10,14 +11,12 @@ namespace MassTransit.Azure.Table.Tests public class AzureTableInMemoryTestFixture : InMemoryTestFixture { - protected readonly string ConnectionString; protected readonly CloudTable TestCloudTable; protected readonly string TestTableName; public AzureTableInMemoryTestFixture() { - ConnectionString = Configuration.StorageAccount; TestTableName = "azuretabletests"; var storageAccount = CloudStorageAccount.Parse(ConnectionString); @@ -27,27 +26,33 @@ public AzureTableInMemoryTestFixture() public IEnumerable GetRecords() { - var query = new TableQuery(); - IEnumerable entities = TestCloudTable.ExecuteQuery(query); + IEnumerable entities = TestCloudTable.ExecuteQuery(new TableQuery()); return entities.Select(e => TableEntity.ConvertBack(e.Properties, new OperationContext())); } public IEnumerable GetTableEntities() { - var query = new TableQuery(); - return TestCloudTable.ExecuteQuery(query); + return TestCloudTable.ExecuteQuery(new TableQuery()); } [OneTimeSetUp] - public void Bring_it_up() + public async Task Bring_it_up() { TestCloudTable.CreateIfNotExists(); - } - [OneTimeTearDown] - public void Take_it_down() - { - TestCloudTable.DeleteIfExists(); + IEnumerable entities = GetTableEntities(); + + foreach (IGrouping key in entities.GroupBy(x => x.PartitionKey)) + { + // Create the batch operation. + var batchDeleteOperation = new TableBatchOperation(); + + foreach (var row in key) + batchDeleteOperation.Delete(row); + + // Execute the batch operation. + await TestCloudTable.ExecuteBatchAsync(batchDeleteOperation); + } } } } diff --git a/tests/MassTransit.Azure.Table.Tests/Configuration.cs b/tests/MassTransit.Azure.Table.Tests/Configuration.cs index 4c833c191f7..ccab40898d1 100644 --- a/tests/MassTransit.Azure.Table.Tests/Configuration.cs +++ b/tests/MassTransit.Azure.Table.Tests/Configuration.cs @@ -3,11 +3,13 @@ namespace MassTransit.Azure.Table.Tests using System; using NUnit.Framework; + public static class Configuration { public static string StorageAccount => TestContext.Parameters.Exists(nameof(StorageAccount)) ? TestContext.Parameters.Get(nameof(StorageAccount)) - : Environment.GetEnvironmentVariable("MT_AZURE_STORAGE_ACCOUNT") ?? "DefaultEndpointsProtocol=http;AccountName=localhost;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;TableEndpoint=https://localhost:8081/;"; + : Environment.GetEnvironmentVariable("MT_AZURE_STORAGE_ACCOUNT") + ?? "DefaultEndpointsProtocol=http;AccountName=localhost;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;TableEndpoint=https://localhost:8081/;"; } } diff --git a/tests/MassTransit.Azure.Table.Tests/Saga/Container_Specs.cs b/tests/MassTransit.Azure.Table.Tests/Saga/Container_Specs.cs index 29cc0c9b316..55465881a8a 100644 --- a/tests/MassTransit.Azure.Table.Tests/Saga/Container_Specs.cs +++ b/tests/MassTransit.Azure.Table.Tests/Saga/Container_Specs.cs @@ -8,6 +8,7 @@ namespace ContainerTests using GreenPipes; using Microsoft.Extensions.DependencyInjection; using NUnit.Framework; + using Table.Saga; using TestFramework.Sagas; @@ -74,7 +75,7 @@ public class TestInstance : { public string CurrentState { get; set; } public string Key { get; set; } - public string ETag { get; set; } + public string ETag { get; set; } public Guid CorrelationId { get; set; } } diff --git a/tests/MassTransit.Azure.Table.Tests/Saga/LocatingAnExistingSaga.cs b/tests/MassTransit.Azure.Table.Tests/Saga/LocatingAnExistingSaga.cs index 4aa555746a1..f9188783dd7 100644 --- a/tests/MassTransit.Azure.Table.Tests/Saga/LocatingAnExistingSaga.cs +++ b/tests/MassTransit.Azure.Table.Tests/Saga/LocatingAnExistingSaga.cs @@ -5,6 +5,7 @@ using MassTransit.Saga; using NUnit.Framework; using Shouldly; + using Table.Saga; using Testing; diff --git a/tests/MassTransit.Azure.Table.Tests/Saga/SimpleSaga.cs b/tests/MassTransit.Azure.Table.Tests/Saga/SimpleSaga.cs index 4525e254f4a..f9deac97825 100644 --- a/tests/MassTransit.Azure.Table.Tests/Saga/SimpleSaga.cs +++ b/tests/MassTransit.Azure.Table.Tests/Saga/SimpleSaga.cs @@ -3,6 +3,7 @@ using System; using System.Threading.Tasks; using MassTransit.Saga; + using Table.Saga; public class SimpleSaga : @@ -22,7 +23,7 @@ public async Task Consume(ConsumeContext context) } public Guid CorrelationId { get; set; } - public string ETag { get; set; } + public string ETag { get; set; } public async Task Consume(ConsumeContext message) {