Skip to content

Commit

Permalink
Add Publish Only
Browse files Browse the repository at this point in the history
tomohisa committed Dec 28, 2024

Verified

This commit was signed with the committer’s verified signature.
kisepichu きせ
1 parent ca06779 commit a928b93
Showing 2 changed files with 258 additions and 35 deletions.
195 changes: 160 additions & 35 deletions src/Sekiban.Pure/Class1.cs
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ public ResultBox<Aggregate> Project(IEvent ev, IAggregateProjector projector) =>
{
Payload = projector.Project(Payload, ev),
LastSortableUniqueId = ev.SortableUniqueId,
Version = ev.Version
Version = Version + 1
};
public ResultBox<Aggregate> Project(List<IEvent> events, IAggregateProjector projector) => ResultBox
.FromValue(events)
@@ -75,17 +75,30 @@ public interface IAggregateProjector
public IAggregatePayload Project(IAggregatePayload payload, IEvent ev);
public virtual string GetVersion() => "initial";
}
public interface ICommandContext<TAggregatePayload> where TAggregatePayload : IAggregatePayload
public class NoneAggregateProjector : IAggregateProjector
{
public List<IEvent> Events { get; }
internal IAggregate GetAggregateCommon();
public EventOrNone AppendEvent(IEventPayload eventPayload);
internal CommandExecuted GetCommandExecuted(List<IEvent> producedEvents) => new(
public static NoneAggregateProjector Empty => new();
public IAggregatePayload Project(IAggregatePayload payload, IEvent ev) => payload;
}
public interface ICommandContext<TAggregatePayload> : ICommandContextWithoutState
where TAggregatePayload : IAggregatePayload
{
CommandExecuted ICommandContextWithoutState.GetCommandExecuted(List<IEvent> producedEvents) => new(
GetAggregateCommon().PartitionKeys,
GetAggregateCommon().LastSortableUniqueId,
producedEvents);
internal IAggregate GetAggregateCommon();
public ResultBox<Aggregate<TAggregatePayload>> GetAggregate();
}
public interface ICommandContextWithoutState
{
public List<IEvent> Events { get; }
public PartitionKeys GetPartitionKeys();
public int GetNextVersion();
public int GetCurrentVersion();
internal CommandExecuted GetCommandExecuted(List<IEvent> producedEvents);
public ResultBox<EventOrNone> AppendEvent(IEventPayload eventPayload);
}
public interface ICommand;
public interface ICommandWithAggregateRestriction<TAggregatePayload> : ICommand
where TAggregatePayload : IAggregatePayload;
@@ -327,23 +340,54 @@ public Task<ResultBox<CommandResponse>> ExecuteGeneral<TCommand, TInject, TAggre
Delegate handler) where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
specifyPartitionKeys(command)
.ToResultBox()
.Combine(keys => Repository.Load(keys, projector))
.Verify((keys, aggregate) => VerifyAggregateType<TCommand, TAggregatePayload>(command, aggregate))
.Combine(
(partitionKeys, aggregate) => ResultBox.FromValue(
new CommandContext<TAggregatePayload>(aggregate, projector, EventTypes)))
keys => CreateCommandContextWithoutState<TCommand, TAggregatePayload, TInject>(
command,
keys,
projector,
handler))
.Combine(
(partitionKeys, aggregate, context) => RunHandler(command, context, inject, handler)
.Conveyor(eventOrNone => EventToCommandExecuted(context, eventOrNone)))
.Conveyor(values => Repository.Save(values.Value4.ProducedEvents).Remap(_ => values))
(partitionKeys, context) =>
RunHandler<TCommand, TInject, TAggregatePayload>(command, context, inject, handler)
.Conveyor(eventOrNone => EventToCommandExecuted(context, eventOrNone)))
.Conveyor(values => Repository.Save(values.Value3.ProducedEvents).Remap(_ => values))
.Conveyor(
(partitionKeys, aggregate, context, executed) => ResultBox.FromValue(
(partitionKeys, context, executed) => ResultBox.FromValue(
new CommandResponse(
partitionKeys,
executed.ProducedEvents,
executed.ProducedEvents.Count > 0
? executed.ProducedEvents.Last().Version
: aggregate.Version)));
: context.GetCurrentVersion())));
private ResultBox<ICommandContextWithoutState>
CreateCommandContextWithoutState<TCommand, TAggregatePayload, TInject>(
TCommand command,
PartitionKeys partitionKeys,
IAggregateProjector projector,
Delegate handler) where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
handler switch
{
Func<TCommand, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
Func<TCommand, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
Func<TCommand, TInject, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
Func<TCommand, TInject, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
_ => ResultBox
.Start
.Conveyor(keys => Repository.Load(partitionKeys, projector))
.Verify(aggregate => VerifyAggregateType<TCommand, TAggregatePayload>(command, aggregate))
.Conveyor(
aggregate => ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContext<TAggregatePayload>(aggregate, projector, EventTypes)))
};

private ExceptionOrNone VerifyAggregateType<TCommand, TAggregatePayload>(TCommand command, Aggregate aggregate)
where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
aggregate.GetPayload() is not TAggregatePayload
@@ -356,38 +400,44 @@ private ExceptionOrNone VerifyAggregateType<TCommand, TAggregatePayload>(TComman

private Task<ResultBox<EventOrNone>> RunHandler<TCommand, TInject, TAggregatePayload>(
TCommand command,
ICommandContext<TAggregatePayload> context,
ICommandContextWithoutState context,
OptionalValue<TInject> inject,
Delegate handler) where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
(handler, inject.HasValue) switch
(handler, inject.HasValue, context) switch
{
(Func<TCommand, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, _) => handler1(
command,
context)
.ToTask(),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, true) =>
handler1(command, inject.GetValue(), context).ToTask(),
(Func<TCommand, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, _) => handler1(
command,
context),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, true)
=> handler1(command, inject.GetValue(), context),
(Func<TCommand, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1, _, { } contextWithout) =>
Task.FromResult(handler1(command, contextWithout)),
(Func<TCommand, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1, _, { } contextWithout)
=> handler1(command, contextWithout),
(Func<TCommand, TInject, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1, true, {
} contextWithout) => Task.FromResult(handler1(command, inject.GetValue(), contextWithout)),
(Func<TCommand, TInject, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1, true, {
} contextWithout) => handler1(command, inject.GetValue(), contextWithout),
(Func<TCommand, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, _,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, stateContext).ToTask(),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, true,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, inject.GetValue(), stateContext)
.ToTask(),
(Func<TCommand, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, _,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, stateContext),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, true,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, inject.GetValue(), stateContext),
_ => ResultBox<EventOrNone>
.FromException(
new SekibanCommandHandlerNotMatchException(
$"{handler.GetType().Name} does not match as command handler"))
.ToTask()
};
private ResultBox<CommandExecuted> EventToCommandExecuted<TAggregatePayload>(
ICommandContext<TAggregatePayload> commandContext,
EventOrNone eventOrNone) where TAggregatePayload : IAggregatePayload =>
private ResultBox<CommandExecuted> EventToCommandExecuted(
ICommandContextWithoutState commandContext,
EventOrNone eventOrNone) =>
(eventOrNone.HasEvent
? EventTypes
.GenerateTypedEvent(
eventOrNone.GetValue(),
commandContext.GetAggregateCommon().PartitionKeys,
commandContext.GetPartitionKeys(),
SortableUniqueIdValue.Generate(SekibanDateProducer.GetRegistered().UtcNow, Guid.NewGuid()),
commandContext.GetAggregateCommon().Version + 1)
commandContext.GetNextVersion())
.Remap(ev => commandContext.Events.Append(ev).ToList())
: ResultBox.FromValue(commandContext.Events)).Remap(commandContext.GetCommandExecuted);
#endregion
@@ -402,8 +452,12 @@ public class CommandContext<TAggregatePayload>(
public IAggregateProjector Projector { get; } = projector;
public IEventTypes EventTypes { get; } = eventTypes;
public List<IEvent> Events { get; } = new();
public PartitionKeys GetPartitionKeys() => Aggregate.PartitionKeys;
public int GetNextVersion() => Aggregate.Version + 1;
public int GetCurrentVersion() => Aggregate.Version;
public IAggregate GetAggregateCommon() => Aggregate;
public EventOrNone AppendEvent(IEventPayload eventPayload)
public ResultBox<Aggregate<TAggregatePayload>> GetAggregate() => Aggregate.ToTypedPayload<TAggregatePayload>();
public ResultBox<EventOrNone> AppendEvent(IEventPayload eventPayload)
{
var toAdd = EventTypes.GenerateTypedEvent(
eventPayload,
@@ -418,7 +472,26 @@ public EventOrNone AppendEvent(IEventPayload eventPayload)
Events.Add(ev);
return EventOrNone.Empty;
}
public ResultBox<Aggregate<TAggregatePayload>> GetAggregate() => Aggregate.ToTypedPayload<TAggregatePayload>();
}
public record CommandContextWithoutState(PartitionKeys PartitionKeys, IEventTypes EventTypes)
: ICommandContextWithoutState
{
public List<IEvent> Events { get; } = new();
public PartitionKeys GetPartitionKeys() => PartitionKeys;
public int GetNextVersion() => 0;
public int GetCurrentVersion() => 0;
public CommandExecuted GetCommandExecuted(List<IEvent> producedEvents) => new(
PartitionKeys,
SortableUniqueIdValue.Generate(SekibanDateProducer.GetRegistered().UtcNow, Guid.NewGuid()),
producedEvents);
public ResultBox<EventOrNone> AppendEvent(IEventPayload eventPayload) => EventTypes
.GenerateTypedEvent(
eventPayload,
PartitionKeys,
SortableUniqueIdValue.Generate(SekibanDateProducer.GetRegistered().UtcNow, Guid.NewGuid()),
0)
.Do(ev => Events.Add(ev))
.Remap(_ => EventOrNone.Empty);
}
public class Repository
{
@@ -572,3 +645,55 @@ public record CommandResourceWithInjectTask<TCommand, TProjector, TAggregatePayl
public Delegate GetHandler() => Handler;
public OptionalValue<Type> GetAggregatePayloadType() => typeof(TAggregatePayload);
}
public record CommandResourcePublishOnly<TCommand>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
Func<TCommand, ICommandContextWithoutState, ResultBox<EventOrNone>> Handler) : ICommandResource<TCommand>
where TCommand : ICommand, IEquatable<TCommand>
{

public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => null;
public Delegate GetHandler() => Handler;
}
public record CommandResourcePublishOnlyWithInject<TCommand, TInject>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
TInject? Injection,
Func<TCommand, TInject, ICommandContextWithoutState, ResultBox<EventOrNone>> Handler) : ICommandResource<TCommand>
where TCommand : ICommand, IEquatable<TCommand>
{

public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => Injection;
public Delegate GetHandler() => Handler;
}
public record CommandResourcePublishOnlyTask<TCommand>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
Func<TCommand, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> Handler) : ICommandResource<TCommand>
where TCommand : ICommand, IEquatable<TCommand>
{
public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => null;
public Delegate GetHandler() => Handler;
}
public record CommandResourcePublishOnlyWithInjectTask<TCommand, TInject>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
TInject? Injection,
Func<TCommand, TInject, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> Handler)
: ICommandResource<TCommand> where TCommand : ICommand, IEquatable<TCommand>
{
public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => Injection;
public Delegate GetHandler() => Handler;
}
98 changes: 98 additions & 0 deletions tests/Pure.Domain.Test/UnitTest1.cs
Original file line number Diff line number Diff line change
@@ -415,4 +415,102 @@ public async Task ExecuteWithoutGeneric()
var result = await executor.Execute(createCommand);
Assert.True(result.IsSuccess);
}
[Fact]
public async Task ExecuteWithResource1()
{
var executor = new CommandExecutor { EventTypes = new PureDomainEventTypes() };
var createCommand = new RegisterBranch("a");
var partitionKeys = PartitionKeys.Generate();
var result = await executor.ExecuteWithResource(
createCommand,
new CommandResource<RegisterBranch, BranchProjector>(
command => partitionKeys,
(command, _) => EventOrNone.Event(new BranchCreated(command.Name))));
Assert.True(result.IsSuccess);
var aggregate = Repository.Load<BranchProjector>(partitionKeys);
Assert.NotNull(aggregate);
Assert.IsType<Branch>(aggregate.GetValue().GetPayload());
var branch = aggregate.GetValue().GetPayload() as Branch ?? throw new ApplicationException();
Assert.Equal("a", branch.Name);
Assert.Equal(1, aggregate.GetValue().Version);
}

[Fact]
public async Task ExecuteWithResourcePublishOnly()
{
var executor = new CommandExecutor { EventTypes = new PureDomainEventTypes() };
var createCommand = new RegisterBranch("a");
var partitionKeys = PartitionKeys.Generate();
var result = await executor.ExecuteWithResource(
createCommand,
new CommandResourcePublishOnly<RegisterBranch>(
command => partitionKeys,
(command, _) => EventOrNone.Event(new BranchCreated(command.Name))));
Assert.True(result.IsSuccess);
var aggregate = Repository.Load<BranchProjector>(partitionKeys);
Assert.NotNull(aggregate);
Assert.IsType<Branch>(aggregate.GetValue().GetPayload());
var branch = aggregate.GetValue().GetPayload() as Branch ?? throw new ApplicationException();
Assert.Equal("a", branch.Name);
Assert.Equal(1, aggregate.GetValue().Version);
}
[Fact]
public async Task ExecuteWithResourcePublishOnlyTask()
{
var executor = new CommandExecutor { EventTypes = new PureDomainEventTypes() };
var createCommand = new RegisterBranch("a");
var partitionKeys = PartitionKeys.Generate();
var result = await executor.ExecuteWithResource(
createCommand,
new CommandResourcePublishOnlyTask<RegisterBranch>(
command => partitionKeys,
(command, _) => Task.FromResult(EventOrNone.Event(new BranchCreated(command.Name)))));
Assert.True(result.IsSuccess);
var aggregate = Repository.Load<BranchProjector>(partitionKeys);
Assert.NotNull(aggregate);
Assert.IsType<Branch>(aggregate.GetValue().GetPayload());
var branch = aggregate.GetValue().GetPayload() as Branch ?? throw new ApplicationException();
Assert.Equal("a", branch.Name);
Assert.Equal(1, aggregate.GetValue().Version);
}
[Fact]
public async Task ExecuteWithResourcePublishOnlyInject()
{
var executor = new CommandExecutor { EventTypes = new PureDomainEventTypes() };
var createCommand = new RegisterBranch("a");
var partitionKeys = PartitionKeys.Generate();
var result = await executor.ExecuteWithResource(
createCommand,
new CommandResourcePublishOnlyWithInject<RegisterBranch, Func<string, bool>>(
command => partitionKeys,
_ => false,
(command, _, _) => EventOrNone.Event(new BranchCreated(command.Name))));
Assert.True(result.IsSuccess);
var aggregate = Repository.Load<BranchProjector>(partitionKeys);
Assert.NotNull(aggregate);
Assert.IsType<Branch>(aggregate.GetValue().GetPayload());
var branch = aggregate.GetValue().GetPayload() as Branch ?? throw new ApplicationException();
Assert.Equal("a", branch.Name);
Assert.Equal(1, aggregate.GetValue().Version);
}
[Fact]
public async Task ExecuteWithResourcePublishOnlyWithInjectTask()
{
var executor = new CommandExecutor { EventTypes = new PureDomainEventTypes() };
var createCommand = new RegisterBranch("a");
var partitionKeys = PartitionKeys.Generate();
var result = await executor.ExecuteWithResource(
createCommand,
new CommandResourcePublishOnlyWithInjectTask<RegisterBranch, Func<string, bool>>(
command => partitionKeys,
_ => false,
(command, _, _) => Task.FromResult(EventOrNone.Event(new BranchCreated(command.Name)))));
Assert.True(result.IsSuccess);
var aggregate = Repository.Load<BranchProjector>(partitionKeys);
Assert.NotNull(aggregate);
Assert.IsType<Branch>(aggregate.GetValue().GetPayload());
var branch = aggregate.GetValue().GetPayload() as Branch ?? throw new ApplicationException();
Assert.Equal("a", branch.Name);
Assert.Equal(1, aggregate.GetValue().Version);
}
}

0 comments on commit a928b93

Please sign in to comment.