Skip to content

Commit

Permalink
Allow customization of SNS and SQS serializer (#23)
Browse files Browse the repository at this point in the history
Co-authored-by: Thomas Zwarts <[email protected]>
  • Loading branch information
Kralizek and Thomas-X authored Oct 28, 2022
1 parent a827581 commit 00754f5
Show file tree
Hide file tree
Showing 21 changed files with 621 additions and 139 deletions.
36 changes: 30 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,30 @@ public interface IEventHandler<TInput>
```
[Here](https://github.com/Kralizek/AWSLambdaSharpTemplate/tree/master/samples/EventFunction) you can find a sample that shows an Event function that accepts an input string and logs it into CloudWatch logs.

## Custom serializers

You can pass a custom serializer to the Handler registration to override the default behavior of parsing all messages as JSON.
```csharp

public class CustomSerializer : ISerializer
{
public T Deserialize<T>(string input)
{
// do some fancy 'serializing'
return JsonSerializer.Deserialize<T>(input);
}
}

// Function.cs
private class DummyFunction : EventFunction<SQSEvent>
{
protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
{
services.UseSqsHandler<SomeEvent, DummyHandler>(serializer: new CustomSerializer());
}
}
```

# Creating a new function

The best way to create a new AWS Lambda that uses this structure is to use the `dotnet new` template provided via NuGet.
Expand All @@ -91,12 +115,12 @@ The best way to create a new AWS Lambda that uses this structure is to use the `

Here is a list of all the available templates

Name|Short name|Description
-----|----------|------------
Lambda Empty Event Function|lambda-template-event-empty|Creates an Event function with no extra setup
Lambda Empty RequestResponse Function|lambda-template-requestresponse-empty|Creates a RequestResponse function with no extra setup
Lambda Boilerplate Event Function|lambda-template-event-boilerplate|Creates an Event function with some boilerplate added
Lambda Boilerplate RequestResponse Function|lambda-template-requestresponse-boilerplate|Creates a RequestResponse function with some boilerplate added
|Name|Short name|Description|
|-|-|-|
|Lambda Empty Event Function|lambda-template-event-empty|Creates an Event function with no extra setup|
|Lambda Empty RequestResponse Function|lambda-template-requestresponse-empty|Creates a RequestResponse function with no extra setup|
|Lambda Boilerplate Event Function|lambda-template-event-boilerplate|Creates an Event function with some boilerplate added|
|Lambda Boilerplate RequestResponse Function|lambda-template-requestresponse-boilerplate|Creates a RequestResponse function with some boilerplate added|

All the templates support the following parameters
* `--name|-n` Name of the project. It is also used as name of the function
Expand Down
4 changes: 1 addition & 3 deletions samples/SnsEventFunctionWithParallelism/Function.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ protected override void ConfigureLogging(ILoggingBuilder logging, IExecutionEnvi

protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
{
services.ConfigureSnsParallelExecution(4);

services.UseNotificationHandler<CustomNotification, CustomNotificationHandler>(enableParallelExecution: true);
services.UseNotificationHandler<CustomNotification, CustomNotificationHandler>().WithParallelExecution(maxDegreeOfParallelism: 4);
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/Kralizek.Lambda.Template.Sns/INotificationSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Text.Json;

namespace Kralizek.Lambda
{
public interface INotificationSerializer
{
public TMessage Deserialize<TMessage>(string input);
}

public class DefaultJsonNotificationSerializer : INotificationSerializer
{
public TMessage Deserialize<TMessage>(string input) => JsonSerializer.Deserialize<TMessage>(input);
}
}
9 changes: 6 additions & 3 deletions src/Kralizek.Lambda.Template.Sns/ParallelSnsEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ await input.Records.ForEachAsync(_options.MaxDegreeOfParallelism, async record =
using (var scope = _serviceProvider.CreateScope())
{
var message = record.Sns.Message;
var notification = JsonSerializer.Deserialize<TNotification>(message);
_logger.LogDebug($"Message received: {message}");

var serializer = _serviceProvider.GetRequiredService<INotificationSerializer>();
var notification = serializer.Deserialize<TNotification>(message);

_logger.LogDebug("Message received: {Message}", message);

var messageHandler = scope.ServiceProvider.GetService<INotificationHandler<TNotification>>();
if (messageHandler == null)
{
_logger.LogCritical($"No INotificationHandler<{typeof(TNotification).Name}> could be found.");
_logger.LogCritical("No {Handler} could be found", $"INotificationHandler<{typeof(TNotification).Name}>");
throw new InvalidOperationException($"No INotificationHandler<{typeof(TNotification).Name}> could be found.");
}

Expand Down
74 changes: 66 additions & 8 deletions src/Kralizek.Lambda.Template.Sns/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,94 @@
using System;
using Amazon.Lambda.SNSEvents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Kralizek.Lambda
{
public static class ServiceCollectionExtensions
{
[Obsolete("Use `services.UseNotificationHandler<TNotification,THandler>().UseParallelExecution(maxDegreeOfParallelism);` instead.")]
public static IServiceCollection ConfigureSnsParallelExecution(this IServiceCollection services, int maxDegreeOfParallelism)
{
services.Configure<ParallelSnsExecutionOptions>(option => option.MaxDegreeOfParallelism = maxDegreeOfParallelism);

return services;
}

public static IServiceCollection UseNotificationHandler<TNotification, THandler>(this IServiceCollection services, bool enableParallelExecution = false)
public static INotificationHandlerConfigurator<TNotification> WithParallelExecution<TNotification>(this INotificationHandlerConfigurator<TNotification> configurator, int? maxDegreeOfParallelism = null)
where TNotification : class
where THandler : class, INotificationHandler<TNotification>
{
services.AddOptions();
ArgumentNullException.ThrowIfNull(configurator);

if (enableParallelExecution)
if (maxDegreeOfParallelism <= 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism), $"{nameof(maxDegreeOfParallelism)} must be greater than 1");

configurator.Services.AddTransient<IEventHandler<SNSEvent>, ParallelSnsEventHandler<TNotification>>();

if (maxDegreeOfParallelism.HasValue)
{
services.AddTransient<IEventHandler<SNSEvent>, ParallelSnsEventHandler<TNotification>>();
configurator.Services.Configure<ParallelSnsExecutionOptions>(options => options.MaxDegreeOfParallelism = maxDegreeOfParallelism.Value);
}
else

return configurator;
}

public static IServiceCollection UseCustomNotificationSerializer<TSerializer>(this IServiceCollection services, ServiceLifetime lifetime = ServiceLifetime.Singleton)
where TSerializer : INotificationSerializer
{
ArgumentNullException.ThrowIfNull(services);

services.Add(ServiceDescriptor.Describe(typeof(INotificationSerializer), typeof(TSerializer), lifetime));

return services;
}

[Obsolete("Use `services.UseNotificationHandler<TNotification,THandler>().UseParallelExecution();` instead.")]
public static IServiceCollection UseNotificationHandler<TNotification, THandler>(this IServiceCollection services, bool enableParallelExecution)
where TNotification : class
where THandler : class, INotificationHandler<TNotification>
{
var configurator = UseNotificationHandler<TNotification, THandler>(services);

if (enableParallelExecution)
{
services.AddTransient<IEventHandler<SNSEvent>, SnsEventHandler<TNotification>>();
configurator.WithParallelExecution();
}

return services;
}

public static INotificationHandlerConfigurator<TNotification> UseNotificationHandler<TNotification, THandler>(this IServiceCollection services)
where TNotification : class
where THandler : class, INotificationHandler<TNotification>
{
services.AddOptions();

services.AddTransient<IEventHandler<SNSEvent>, SnsEventHandler<TNotification>>();

services.TryAddSingleton<INotificationSerializer, DefaultJsonNotificationSerializer>();

services.AddTransient<INotificationHandler<TNotification>, THandler>();

return services;
var configurator = new NotificationHandlerConfigurator<TNotification>(services);

return configurator;
}
}

public interface INotificationHandlerConfigurator<TNotification>
where TNotification : class
{
IServiceCollection Services { get; }
}

internal sealed class NotificationHandlerConfigurator<TNotification> : INotificationHandlerConfigurator<TNotification>
where TNotification : class
{
public NotificationHandlerConfigurator(IServiceCollection services)
{
Services = services ?? throw new ArgumentNullException(nameof(services));
}

public IServiceCollection Services { get; }
}
}
7 changes: 5 additions & 2 deletions src/Kralizek.Lambda.Template.Sns/SnsEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ public async Task HandleAsync(SNSEvent input, ILambdaContext context)
using (var scope = _serviceProvider.CreateScope())
{
var message = record.Sns.Message;
var notification = JsonSerializer.Deserialize<TNotification>(message);

var serializer = _serviceProvider.GetRequiredService<INotificationSerializer>();

var notification = serializer.Deserialize<TNotification>(message);

var handler = scope.ServiceProvider.GetService<INotificationHandler<TNotification>>();

if (handler == null)
{
_logger.LogCritical($"No INotificationHandler<{typeof(TNotification).Name}> could be found.");
_logger.LogCritical("No {Handler} could be found", $"INotificationHandler<{typeof(TNotification).Name}>");
throw new InvalidOperationException($"No INotificationHandler<{typeof(TNotification).Name}> could be found.");
}

Expand Down
14 changes: 14 additions & 0 deletions src/Kralizek.Lambda.Template.Sqs/IMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Text.Json;

namespace Kralizek.Lambda
{
public interface IMessageSerializer
{
public TMessage Deserialize<TMessage>(string input);
}

public class DefaultJsonMessageSerializer : IMessageSerializer
{
public TMessage Deserialize<TMessage>(string input) => JsonSerializer.Deserialize<TMessage>(input);
}
}
20 changes: 12 additions & 8 deletions src/Kralizek.Lambda.Template.Sqs/ParallelSqsEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class ParallelSqsExecutionOptions
public int MaxDegreeOfParallelism { get; set; } = System.Environment.ProcessorCount;
}

public class ParallelSqsEventHandler<TMessage>: IEventHandler<SQSEvent> where TMessage : class
public class ParallelSqsEventHandler<TMessage>: IEventHandler<SQSEvent> where TMessage : class
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
Expand All @@ -24,28 +24,32 @@ public class ParallelSqsEventHandler<TMessage>: IEventHandler<SQSEvent> where T
public ParallelSqsEventHandler(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ParallelSqsExecutionOptions> options)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_logger = loggerFactory?.CreateLogger("SqsForEachAsyncEventHandler") ?? throw new ArgumentNullException(nameof(loggerFactory));
_logger = loggerFactory?.CreateLogger("SqsForEachAsyncEventHandler") ??
throw new ArgumentNullException(nameof(loggerFactory));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
}

public async Task HandleAsync(SQSEvent input, ILambdaContext context)
{
if (input.Records.Any())
{

await input.Records.ForEachAsync(_options.MaxDegreeOfParallelism, async singleSqsMessage =>
{
using (var scope = _serviceProvider.CreateScope())
{
var sqsMessage = singleSqsMessage.Body;
_logger.LogDebug($"Message received: {sqsMessage}");
_logger.LogDebug("Message received: {Message}", sqsMessage);

var serializer = _serviceProvider.GetRequiredService<IMessageSerializer>();
var message = serializer != null
? serializer.Deserialize<TMessage>(sqsMessage)
: JsonSerializer.Deserialize<TMessage>(sqsMessage);

var message = JsonSerializer.Deserialize<TMessage>(sqsMessage);

var messageHandler = scope.ServiceProvider.GetService<IMessageHandler<TMessage>>();

if (messageHandler == null)
{
_logger.LogError($"No IMessageHandler<{typeof(TMessage).Name}> could be found.");
_logger.LogError("No {Handler} could be found", $"IMessageHandler<{typeof(TMessage).Name}>");
throw new InvalidOperationException($"No IMessageHandler<{typeof(TMessage).Name}> could be found.");
}

Expand All @@ -55,4 +59,4 @@ await input.Records.ForEachAsync(_options.MaxDegreeOfParallelism, async singleSq
}
}
}
}
}
73 changes: 66 additions & 7 deletions src/Kralizek.Lambda.Template.Sqs/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,35 +1,94 @@
using System;
using Amazon.Lambda.SQSEvents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Kralizek.Lambda
{
public static class ServiceCollectionExtensions
{
[Obsolete("Use `services.UseQueueMessageHandler<TMessage,THandler>().UseParallelExecution(maxDegreeOfParallelism);` instead.")]
public static IServiceCollection ConfigureSnsParallelExecution(this IServiceCollection services, int maxDegreeOfParallelism)
{
services.Configure<ParallelSqsExecutionOptions>(option => option.MaxDegreeOfParallelism = maxDegreeOfParallelism);

return services;
}

public static IMessageHandlerConfigurator<TMessage> WithParallelExecution<TMessage>(this IMessageHandlerConfigurator<TMessage> configurator, int? maxDegreeOfParallelism = null)
where TMessage : class
{
ArgumentNullException.ThrowIfNull(configurator);

if (maxDegreeOfParallelism <= 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism), $"{nameof(maxDegreeOfParallelism)} must be greater than 1");

configurator.Services.AddTransient<IEventHandler<SQSEvent>, ParallelSqsEventHandler<TMessage>>();

if (maxDegreeOfParallelism.HasValue)
{
configurator.Services.Configure<ParallelSqsExecutionOptions>(options => options.MaxDegreeOfParallelism = maxDegreeOfParallelism.Value);
}

return configurator;
}

public static IServiceCollection UseCustomMessageSerializer<TSerializer>(this IServiceCollection services, ServiceLifetime lifetime = ServiceLifetime.Singleton)
where TSerializer : IMessageSerializer
{
ArgumentNullException.ThrowIfNull(services);

services.Add(ServiceDescriptor.Describe(typeof(IMessageSerializer), typeof(TSerializer), lifetime));

return services;
}

[Obsolete("Use `services.UseQueueMessageHandler<TMessage, THandler>();` instead.")]
public static IServiceCollection UseSqsHandler<TMessage, THandler>(this IServiceCollection services, bool enableParallelExecution = false)
where TMessage : class
where THandler : class, IMessageHandler<TMessage>
{
services.AddOptions();
var configurator = UseQueueMessageHandler<TMessage, THandler>(services);

if (enableParallelExecution)
{
services.AddTransient<IEventHandler<SQSEvent>, ParallelSqsEventHandler<TMessage>>();
}
else
{
services.AddTransient<IEventHandler<SQSEvent>, SqsEventHandler<TMessage>>();
configurator.WithParallelExecution();
}

return services;
}

public static IMessageHandlerConfigurator<TMessage> UseQueueMessageHandler<TMessage, THandler>(this IServiceCollection services)
where TMessage : class
where THandler : class, IMessageHandler<TMessage>
{
services.AddOptions();

services.AddTransient<IEventHandler<SQSEvent>, SqsEventHandler<TMessage>>();

services.TryAddSingleton<IMessageSerializer, DefaultJsonMessageSerializer>();

services.AddTransient<IMessageHandler<TMessage>, THandler>();

return services;
var configurator = new MessageHandlerConfigurator<TMessage>(services);

return configurator;
}
}

public interface IMessageHandlerConfigurator<TMessage>
where TMessage : class
{
IServiceCollection Services { get; }
}

internal sealed class MessageHandlerConfigurator<TMessage> : IMessageHandlerConfigurator<TMessage>
where TMessage : class
{
public MessageHandlerConfigurator(IServiceCollection services)
{
Services = services ?? throw new ArgumentNullException(nameof(services));
}

public IServiceCollection Services { get; }
}
}
Loading

0 comments on commit 00754f5

Please sign in to comment.