Skip to content

Commit

Permalink
Azure Table Saga Repository refactoring, cleanup, logic optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
phatboyg committed Jun 27, 2020
1 parent 569a07c commit b27bde4
Show file tree
Hide file tree
Showing 30 changed files with 245 additions and 211 deletions.
4 changes: 0 additions & 4 deletions build/version.cake
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ public class BuildVersion
suffix += $".{buildNumber}";
metadata = $"+sha.{commitHash}";
}
else
{
metadata = $"+build.{buildNumber}.sha.{commitHash}";
}
}


Expand Down
7 changes: 3 additions & 4 deletions src/MassTransit/Configuration/AuditConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ public static class AuditConfigurationExtensions
/// <param name="connector">The bus</param>
/// <param name="store">Audit store</param>
/// <param name="configureFilter">Filter configuration delegate</param>
/// <param name="metadataFactory">Message metadata factory. If omited, the default one will be used.</param>
/// <param name="metadataFactory">Message metadata factory. If omitted, the default one will be used.</param>
public static ConnectHandle ConnectSendAuditObservers<T>(this T connector, IMessageAuditStore store,
Action<IMessageFilterConfigurator> configureFilter = null,
ISendMetadataFactory metadataFactory = null)
Action<IMessageFilterConfigurator> configureFilter = null, ISendMetadataFactory metadataFactory = null)
where T : ISendObserverConnector, IPublishObserverConnector
{
var specification = new SendMessageFilterSpecification();
Expand All @@ -41,7 +40,7 @@ public static ConnectHandle ConnectSendAuditObservers<T>(this T connector, IMess
/// <param name="connector">The bus or endpoint</param>
/// <param name="store">The audit store</param>
/// <param name="configureFilter">Filter configuration delegate</param>
/// <param name="metadataFactory">Message metadata factory. If omited, the default one will be used.</param>
/// <param name="metadataFactory">Message metadata factory. If omitted, the default one will be used.</param>
public static ConnectHandle ConnectConsumeAuditObserver(this IConsumeObserverConnector connector, IMessageAuditStore store,
Action<IMessageFilterConfigurator> configureFilter = null, IConsumeMetadataFactory metadataFactory = null)
{
Expand Down
63 changes: 37 additions & 26 deletions src/Persistence/MassTransit.Azure.Table/Audit/AuditRecord.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
namespace MassTransit.Azure.Table
namespace MassTransit.Azure.Table.Audit
{
using System;
using System.Collections.Generic;
using System.Text;
using Audit;
using MassTransit.Audit;
using Metadata;
using Microsoft.Azure.Cosmos.Table;
using Newtonsoft.Json;


public class AuditRecord : TableEntity
public class AuditRecord :
TableEntity
{
static readonly char[] _disallowedCharacters;

static AuditRecord()
{
_disallowedCharacters = new[] {'/', '\\', '#', '?'};
}

public Guid? MessageId { get; set; }
public Guid? ConversationId { get; set; }
public Guid? CorrelationId { get; set; }
Expand All @@ -27,14 +35,12 @@ public class AuditRecord : TableEntity
public string Headers { get; set; }
public string Message { get; set; }

internal static AuditRecord Create<T>(T message, string messageType, MessageAuditMetadata metadata,
Func<string, AuditRecord, string> partitionKeyStrategy)
internal static AuditRecord Create<T>(T message, MessageAuditMetadata metadata, IPartitionKeyFormatter partitionKeyFormatter)
where T : class
{
var record = new AuditRecord
{
RowKey =
$"{DateTime.MaxValue.Subtract(metadata.SentTime ?? DateTime.UtcNow).TotalMilliseconds}",
RowKey = $"{DateTime.MaxValue.Subtract(metadata.SentTime ?? DateTime.UtcNow).TotalMilliseconds}",
ContextType = metadata.ContextType,
MessageId = metadata.MessageId,
ConversationId = metadata.ConversationId,
Expand All @@ -50,30 +56,35 @@ internal static AuditRecord Create<T>(T message, string messageType, MessageAudi
Headers = JsonConvert.SerializeObject(metadata.Headers),
Custom = JsonConvert.SerializeObject(metadata.Custom),
Message = JsonConvert.SerializeObject(message),
MessageType = messageType
MessageType = TypeMetadataCache<T>.ShortName
};
record.PartitionKey = CleanDisallowedPartitionKeyCharacters(partitionKeyStrategy.Invoke(messageType, record));

record.PartitionKey = SanitizePartitionKey(partitionKeyFormatter.Format<T>(record));

return record;
}

/// <summary>
/// </summary>
/// <param name="candidatePartitionKey"></param>
/// <returns></returns>
static string CleanDisallowedPartitionKeyCharacters(string candidatePartitionKey)
static string SanitizePartitionKey(string partitionKey)
{
var disallowedCharacters = new HashSet<char>
{
'/',
'\\',
'#',
'?'
};
var key = new StringBuilder();
foreach (var character in candidatePartitionKey)
if (partitionKey.IndexOfAny(_disallowedCharacters) < 0)
return partitionKey;

var key = new StringBuilder(partitionKey.Length);

foreach (var c in partitionKey)
{
if (!disallowedCharacters.Contains(character))
key.Append(character);
switch (c)
{
case '/':
case '\\':
case '#':
case '?':
continue;

default:
key.Append(c);
break;
}
}

return key.ToString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace MassTransit.Azure.Table
namespace MassTransit.Azure.Table.Audit
{
using System;
using System.Threading.Tasks;
Expand All @@ -11,24 +11,24 @@ public class AzureTableAuditBusObserver :
IBusObserver
{
readonly Action<IMessageFilterConfigurator> _filter;
readonly Func<string, AuditRecord, string> _partitionKeyStrategy;
readonly IPartitionKeyFormatter _partitionKeyFormatter;
readonly CloudTable _table;

public AzureTableAuditBusObserver(CloudTable table, Action<IMessageFilterConfigurator> filter,
Func<string, AuditRecord, string> partitionKeyStrategy)
public AzureTableAuditBusObserver(CloudTable table, Action<IMessageFilterConfigurator> filter, IPartitionKeyFormatter partitionKeyFormatter)
{
_table = table;
_partitionKeyStrategy = partitionKeyStrategy;
_filter = filter;
_partitionKeyFormatter = partitionKeyFormatter;
}

public async Task PostCreate(IBus bus)
{
LogContext.Debug?.Log($"Connecting Azure Table Audit Store against table {_table.Name}");
var auditStore = new AzureTableAuditStore(_table, _partitionKeyStrategy);
LogContext.Debug?.Log($"Connecting Azure Table Audit Store: {_table.Name}");

var auditStore = new AzureTableAuditStore(_table, _partitionKeyFormatter);

bus.ConnectSendAuditObservers(auditStore, _filter);
bus.ConnectConsumeAuditObserver(auditStore, _filter);
LogContext.Debug?.Log($"Azure Table Audit store connected. {_table.Name}");
}

public Task CreateFaulted(Exception exception)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
namespace MassTransit.Azure.Table
namespace MassTransit.Azure.Table.Audit
{
using System;
using System.Threading.Tasks;
using Audit;
using Metadata;
using MassTransit.Audit;
using Microsoft.Azure.Cosmos.Table;


public class AzureTableAuditStore : IMessageAuditStore
public class AzureTableAuditStore :
IMessageAuditStore
{
readonly Func<string, AuditRecord, string> _partitionKeyStrategy;
readonly IPartitionKeyFormatter _partitionKeyFormatter;
readonly CloudTable _table;

public AzureTableAuditStore(CloudTable table, Func<string, AuditRecord, string> partitionKeyStrategy)
public AzureTableAuditStore(CloudTable table, IPartitionKeyFormatter partitionKeyFormatter)
{
_table = table;
_partitionKeyStrategy = partitionKeyStrategy;
_partitionKeyFormatter = partitionKeyFormatter;
}

Task IMessageAuditStore.StoreMessage<T>(T message, MessageAuditMetadata metadata)
{
var auditRecord = AuditRecord.Create(message, TypeMetadataCache<T>.ShortName, metadata, _partitionKeyStrategy);
var auditRecord = AuditRecord.Create(message, metadata, _partitionKeyFormatter);
var insertOrMergeOperation = TableOperation.InsertOrMerge(auditRecord);
return _table.ExecuteAsync(insertOrMergeOperation);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace MassTransit.Azure.Table.Audit
{
public class DefaultPartitionKeyFormatter :
IPartitionKeyFormatter
{
public string Format<T>(AuditRecord record)
where T : class
{
return record.ContextType;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace MassTransit.Azure.Table.Audit
{
public interface IPartitionKeyFormatter
{
string Format<T>(AuditRecord record)
where T : class;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
namespace MassTransit
{
using System;
using Azure.Table;
using Azure.Table.Audit;
using BusConfigurators;
using Microsoft.Azure.Cosmos.Table;


Expand All @@ -18,7 +19,8 @@ public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configur
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference(auditTableName);
table.CreateIfNotExists();
ConfigureAuditStore(configurator, table, null, DefaultPartitionKeyStrategy);

ConfigureAuditStore(configurator, table);
}

/// <summary>
Expand All @@ -34,95 +36,98 @@ public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configur
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference(auditTableName);
table.CreateIfNotExists();
ConfigureAuditStore(configurator, table, configureFilter, DefaultPartitionKeyStrategy);

ConfigureAuditStore(configurator, table, configureFilter);
}

/// <summary>
/// Supply your storage account, table name and partition key strategy based on the message type and audit information. No Filters will be applied.
/// Supply your storage account, table name and partition key strategy based on the message type and audit information. No Filters will be applied.
/// </summary>
/// <param name="configurator"></param>
/// <param name="storageAccount">Your cloud storage account.</param>
/// <param name="auditTableName">The name of the table for which the Audit Logs will be persisted.</param>
/// <param name="partitionKeyStrategy">Using the message type and audit information or otherwise, specify the partition key strategy</param>
/// <param name="partitionKeyFormatter">
/// Using the message type and audit information or otherwise, specify the partition key strategy
/// </param>
public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudStorageAccount storageAccount, string auditTableName,
Func<string, AuditRecord, string> partitionKeyStrategy)
IPartitionKeyFormatter partitionKeyFormatter)
{
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference(auditTableName);
table.CreateIfNotExists();
ConfigureAuditStore(configurator, table, null, partitionKeyStrategy);

ConfigureAuditStore(configurator, table, null, partitionKeyFormatter);
}

/// <summary>
/// Supply your storage account, table name, partition key strategy and message filter to be applied.
/// Supply your storage account, table name, partition key strategy and message filter to be applied.
/// </summary>
/// <param name="configurator"></param>
/// <param name="storageAccount">Your cloud storage account.</param>
/// <param name="auditTableName">The name of the table for which the Audit Logs will be persisted.</param>
/// <param name="partitionKeyStrategy">Using the message type and audit information or otherwise, specify the partition key strategy</param>
/// <param name="partitionKeyFormatter">
/// Using the message type and audit information or otherwise, specify the partition key strategy
/// </param>
/// <param name="configureFilter">Message Filter to exclude or include messages from audit based on requirements</param>
public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudStorageAccount storageAccount, string auditTableName,
Func<string, AuditRecord, string> partitionKeyStrategy, Action<IMessageFilterConfigurator> configureFilter)
IPartitionKeyFormatter partitionKeyFormatter, Action<IMessageFilterConfigurator> configureFilter)
{
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference(auditTableName);
table.CreateIfNotExists();
Func<string, AuditRecord, string> pkStrategy = partitionKeyStrategy ?? DefaultPartitionKeyStrategy;
ConfigureAuditStore(configurator, table, configureFilter, pkStrategy);

ConfigureAuditStore(configurator, table, configureFilter, partitionKeyFormatter);
}

/// <summary>
/// Supply your table, partition key strategy and message filter to be applied.
/// Supply your table, partition key strategy and message filter to be applied.
/// </summary>
/// <param name="configurator"></param>
/// <param name="table">Your Azure Cloud Table</param>
/// <param name="partitionKeyStrategy">Using the message type and audit information or otherwise, specify the partition key strategy</param>
/// <param name="partitionKeyFormatter">
/// Using the message type and audit information or otherwise, specify the partition key strategy
/// </param>
/// <param name="configureFilter">Message Filter to exclude or include messages from audit based on requirements</param>
public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table,
Func<string, AuditRecord, string> partitionKeyStrategy, Action<IMessageFilterConfigurator> configureFilter)
IPartitionKeyFormatter partitionKeyFormatter, Action<IMessageFilterConfigurator> configureFilter)
{
Func<string, AuditRecord, string> pkStrategy = partitionKeyStrategy ?? DefaultPartitionKeyStrategy;
ConfigureAuditStore(configurator, table, configureFilter, pkStrategy);
ConfigureAuditStore(configurator, table, configureFilter, partitionKeyFormatter);
}

/// <summary>
/// Supply your table and message filter to be applied. Default Partition Key Strategy will be used.
/// Supply your table and message filter to be applied. Default Partition Key Strategy will be used.
/// </summary>
/// <param name="configurator"></param>
/// <param name="table">Your Azure Cloud Table</param>
/// <param name="configureFilter">Message Filter to exclude or include messages from audit based on requirements</param>
public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table,
Action<IMessageFilterConfigurator> configureFilter)
{
ConfigureAuditStore(configurator, table, configureFilter, DefaultPartitionKeyStrategy);
ConfigureAuditStore(configurator, table, configureFilter);
}

/// <summary>
/// Supply your table and partition key strategy. No message filter will be applied
/// Supply your table and partition key strategy. No message filter will be applied
/// </summary>
/// <param name="configurator"></param>
/// <param name="table">Your Azure Cloud Table</param>
/// <param name="partitionKeyStrategy">Using the message type and audit information or otherwise, specify the partition key strategy</param>
public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table,
Func<string, AuditRecord, string> partitionKeyStrategy)
/// <param name="partitionKeyFormatter">
/// Using the message type and audit information or otherwise, specify the partition key strategy
/// </param>
public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table, IPartitionKeyFormatter partitionKeyFormatter)
{
ConfigureAuditStore(configurator, table, null, partitionKeyStrategy);
ConfigureAuditStore(configurator, table, null, partitionKeyFormatter);
}

public static void UseAzureTableAuditStore(this IBusFactoryConfigurator configurator, CloudTable table)
{
ConfigureAuditStore(configurator, table, null, DefaultPartitionKeyStrategy);
}

static void ConfigureAuditStore(IBusFactoryConfigurator configurator, CloudTable table, Action<IMessageFilterConfigurator> configureFilter,
Func<string, AuditRecord, string> partitionKeyStrategy)
{
configurator.ConnectBusObserver(new AzureTableAuditBusObserver(table, configureFilter, partitionKeyStrategy));
ConfigureAuditStore(configurator, table);
}

static string DefaultPartitionKeyStrategy(string messageType, AuditRecord record)
static void ConfigureAuditStore(IBusObserverConnector configurator, CloudTable table, Action<IMessageFilterConfigurator> configureFilter = default,
IPartitionKeyFormatter formatter = default)
{
return record.ContextType;
configurator.ConnectBusObserver(new AzureTableAuditBusObserver(table, configureFilter, formatter ?? new DefaultPartitionKeyFormatter()));
}
}
}
Loading

0 comments on commit b27bde4

Please sign in to comment.