Skip to content

Commit

Permalink
Merge pull request #475 from J-Tech-Japan/474-pure-add-saveonly-command
Browse files Browse the repository at this point in the history
Add Publish Only
  • Loading branch information
tomohisa authored Dec 28, 2024
2 parents ca06779 + a928b93 commit d7fa2d5
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 35 deletions.
195 changes: 160 additions & 35 deletions src/Sekiban.Pure/Class1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ResultBox<Aggregate> Project(IEvent ev, IAggregateProjector projector) =>
{
Payload = projector.Project(Payload, ev),
LastSortableUniqueId = ev.SortableUniqueId,
Version = ev.Version
Version = Version + 1
};
public ResultBox<Aggregate> Project(List<IEvent> events, IAggregateProjector projector) => ResultBox
.FromValue(events)
Expand Down Expand Up @@ -75,17 +75,30 @@ public interface IAggregateProjector
public IAggregatePayload Project(IAggregatePayload payload, IEvent ev);
public virtual string GetVersion() => "initial";
}
public interface ICommandContext<TAggregatePayload> where TAggregatePayload : IAggregatePayload
public class NoneAggregateProjector : IAggregateProjector
{
public List<IEvent> Events { get; }
internal IAggregate GetAggregateCommon();
public EventOrNone AppendEvent(IEventPayload eventPayload);
internal CommandExecuted GetCommandExecuted(List<IEvent> producedEvents) => new(
public static NoneAggregateProjector Empty => new();
public IAggregatePayload Project(IAggregatePayload payload, IEvent ev) => payload;
}
public interface ICommandContext<TAggregatePayload> : ICommandContextWithoutState
where TAggregatePayload : IAggregatePayload
{
CommandExecuted ICommandContextWithoutState.GetCommandExecuted(List<IEvent> producedEvents) => new(
GetAggregateCommon().PartitionKeys,
GetAggregateCommon().LastSortableUniqueId,
producedEvents);
internal IAggregate GetAggregateCommon();
public ResultBox<Aggregate<TAggregatePayload>> GetAggregate();
}
public interface ICommandContextWithoutState
{
public List<IEvent> Events { get; }
public PartitionKeys GetPartitionKeys();
public int GetNextVersion();
public int GetCurrentVersion();
internal CommandExecuted GetCommandExecuted(List<IEvent> producedEvents);
public ResultBox<EventOrNone> AppendEvent(IEventPayload eventPayload);
}
public interface ICommand;
public interface ICommandWithAggregateRestriction<TAggregatePayload> : ICommand
where TAggregatePayload : IAggregatePayload;
Expand Down Expand Up @@ -327,23 +340,54 @@ public Task<ResultBox<CommandResponse>> ExecuteGeneral<TCommand, TInject, TAggre
Delegate handler) where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
specifyPartitionKeys(command)
.ToResultBox()
.Combine(keys => Repository.Load(keys, projector))
.Verify((keys, aggregate) => VerifyAggregateType<TCommand, TAggregatePayload>(command, aggregate))
.Combine(
(partitionKeys, aggregate) => ResultBox.FromValue(
new CommandContext<TAggregatePayload>(aggregate, projector, EventTypes)))
keys => CreateCommandContextWithoutState<TCommand, TAggregatePayload, TInject>(
command,
keys,
projector,
handler))
.Combine(
(partitionKeys, aggregate, context) => RunHandler(command, context, inject, handler)
.Conveyor(eventOrNone => EventToCommandExecuted(context, eventOrNone)))
.Conveyor(values => Repository.Save(values.Value4.ProducedEvents).Remap(_ => values))
(partitionKeys, context) =>
RunHandler<TCommand, TInject, TAggregatePayload>(command, context, inject, handler)
.Conveyor(eventOrNone => EventToCommandExecuted(context, eventOrNone)))
.Conveyor(values => Repository.Save(values.Value3.ProducedEvents).Remap(_ => values))
.Conveyor(
(partitionKeys, aggregate, context, executed) => ResultBox.FromValue(
(partitionKeys, context, executed) => ResultBox.FromValue(
new CommandResponse(
partitionKeys,
executed.ProducedEvents,
executed.ProducedEvents.Count > 0
? executed.ProducedEvents.Last().Version
: aggregate.Version)));
: context.GetCurrentVersion())));
private ResultBox<ICommandContextWithoutState>
CreateCommandContextWithoutState<TCommand, TAggregatePayload, TInject>(
TCommand command,
PartitionKeys partitionKeys,
IAggregateProjector projector,
Delegate handler) where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
handler switch
{
Func<TCommand, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
Func<TCommand, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
Func<TCommand, TInject, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
Func<TCommand, TInject, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1 =>
ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContextWithoutState(partitionKeys, EventTypes)),
_ => ResultBox
.Start
.Conveyor(keys => Repository.Load(partitionKeys, projector))
.Verify(aggregate => VerifyAggregateType<TCommand, TAggregatePayload>(command, aggregate))
.Conveyor(
aggregate => ResultBox<ICommandContextWithoutState>.FromValue(
new CommandContext<TAggregatePayload>(aggregate, projector, EventTypes)))
};

private ExceptionOrNone VerifyAggregateType<TCommand, TAggregatePayload>(TCommand command, Aggregate aggregate)
where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
aggregate.GetPayload() is not TAggregatePayload
Expand All @@ -356,38 +400,44 @@ private ExceptionOrNone VerifyAggregateType<TCommand, TAggregatePayload>(TComman

private Task<ResultBox<EventOrNone>> RunHandler<TCommand, TInject, TAggregatePayload>(
TCommand command,
ICommandContext<TAggregatePayload> context,
ICommandContextWithoutState context,
OptionalValue<TInject> inject,
Delegate handler) where TCommand : ICommand where TAggregatePayload : IAggregatePayload =>
(handler, inject.HasValue) switch
(handler, inject.HasValue, context) switch
{
(Func<TCommand, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, _) => handler1(
command,
context)
.ToTask(),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, true) =>
handler1(command, inject.GetValue(), context).ToTask(),
(Func<TCommand, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, _) => handler1(
command,
context),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, true)
=> handler1(command, inject.GetValue(), context),
(Func<TCommand, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1, _, { } contextWithout) =>
Task.FromResult(handler1(command, contextWithout)),
(Func<TCommand, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1, _, { } contextWithout)
=> handler1(command, contextWithout),
(Func<TCommand, TInject, ICommandContextWithoutState, ResultBox<EventOrNone>> handler1, true, {
} contextWithout) => Task.FromResult(handler1(command, inject.GetValue(), contextWithout)),
(Func<TCommand, TInject, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> handler1, true, {
} contextWithout) => handler1(command, inject.GetValue(), contextWithout),
(Func<TCommand, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, _,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, stateContext).ToTask(),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, ResultBox<EventOrNone>> handler1, true,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, inject.GetValue(), stateContext)
.ToTask(),
(Func<TCommand, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, _,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, stateContext),
(Func<TCommand, TInject, ICommandContext<TAggregatePayload>, Task<ResultBox<EventOrNone>>> handler1, true,
ICommandContext<TAggregatePayload> stateContext) => handler1(command, inject.GetValue(), stateContext),
_ => ResultBox<EventOrNone>
.FromException(
new SekibanCommandHandlerNotMatchException(
$"{handler.GetType().Name} does not match as command handler"))
.ToTask()
};
private ResultBox<CommandExecuted> EventToCommandExecuted<TAggregatePayload>(
ICommandContext<TAggregatePayload> commandContext,
EventOrNone eventOrNone) where TAggregatePayload : IAggregatePayload =>
private ResultBox<CommandExecuted> EventToCommandExecuted(
ICommandContextWithoutState commandContext,
EventOrNone eventOrNone) =>
(eventOrNone.HasEvent
? EventTypes
.GenerateTypedEvent(
eventOrNone.GetValue(),
commandContext.GetAggregateCommon().PartitionKeys,
commandContext.GetPartitionKeys(),
SortableUniqueIdValue.Generate(SekibanDateProducer.GetRegistered().UtcNow, Guid.NewGuid()),
commandContext.GetAggregateCommon().Version + 1)
commandContext.GetNextVersion())
.Remap(ev => commandContext.Events.Append(ev).ToList())
: ResultBox.FromValue(commandContext.Events)).Remap(commandContext.GetCommandExecuted);
#endregion
Expand All @@ -402,8 +452,12 @@ public class CommandContext<TAggregatePayload>(
public IAggregateProjector Projector { get; } = projector;
public IEventTypes EventTypes { get; } = eventTypes;
public List<IEvent> Events { get; } = new();
public PartitionKeys GetPartitionKeys() => Aggregate.PartitionKeys;
public int GetNextVersion() => Aggregate.Version + 1;
public int GetCurrentVersion() => Aggregate.Version;
public IAggregate GetAggregateCommon() => Aggregate;
public EventOrNone AppendEvent(IEventPayload eventPayload)
public ResultBox<Aggregate<TAggregatePayload>> GetAggregate() => Aggregate.ToTypedPayload<TAggregatePayload>();
public ResultBox<EventOrNone> AppendEvent(IEventPayload eventPayload)
{
var toAdd = EventTypes.GenerateTypedEvent(
eventPayload,
Expand All @@ -418,7 +472,26 @@ public EventOrNone AppendEvent(IEventPayload eventPayload)
Events.Add(ev);
return EventOrNone.Empty;
}
public ResultBox<Aggregate<TAggregatePayload>> GetAggregate() => Aggregate.ToTypedPayload<TAggregatePayload>();
}
public record CommandContextWithoutState(PartitionKeys PartitionKeys, IEventTypes EventTypes)
: ICommandContextWithoutState
{
public List<IEvent> Events { get; } = new();
public PartitionKeys GetPartitionKeys() => PartitionKeys;
public int GetNextVersion() => 0;
public int GetCurrentVersion() => 0;
public CommandExecuted GetCommandExecuted(List<IEvent> producedEvents) => new(
PartitionKeys,
SortableUniqueIdValue.Generate(SekibanDateProducer.GetRegistered().UtcNow, Guid.NewGuid()),
producedEvents);
public ResultBox<EventOrNone> AppendEvent(IEventPayload eventPayload) => EventTypes
.GenerateTypedEvent(
eventPayload,
PartitionKeys,
SortableUniqueIdValue.Generate(SekibanDateProducer.GetRegistered().UtcNow, Guid.NewGuid()),
0)
.Do(ev => Events.Add(ev))
.Remap(_ => EventOrNone.Empty);
}
public class Repository
{
Expand Down Expand Up @@ -572,3 +645,55 @@ public record CommandResourceWithInjectTask<TCommand, TProjector, TAggregatePayl
public Delegate GetHandler() => Handler;
public OptionalValue<Type> GetAggregatePayloadType() => typeof(TAggregatePayload);
}
public record CommandResourcePublishOnly<TCommand>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
Func<TCommand, ICommandContextWithoutState, ResultBox<EventOrNone>> Handler) : ICommandResource<TCommand>
where TCommand : ICommand, IEquatable<TCommand>
{

public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => null;
public Delegate GetHandler() => Handler;
}
public record CommandResourcePublishOnlyWithInject<TCommand, TInject>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
TInject? Injection,
Func<TCommand, TInject, ICommandContextWithoutState, ResultBox<EventOrNone>> Handler) : ICommandResource<TCommand>
where TCommand : ICommand, IEquatable<TCommand>
{

public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => Injection;
public Delegate GetHandler() => Handler;
}
public record CommandResourcePublishOnlyTask<TCommand>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
Func<TCommand, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> Handler) : ICommandResource<TCommand>
where TCommand : ICommand, IEquatable<TCommand>
{
public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => null;
public Delegate GetHandler() => Handler;
}
public record CommandResourcePublishOnlyWithInjectTask<TCommand, TInject>(
Func<TCommand, PartitionKeys> SpecifyPartitionKeys,
TInject? Injection,
Func<TCommand, TInject, ICommandContextWithoutState, Task<ResultBox<EventOrNone>>> Handler)
: ICommandResource<TCommand> where TCommand : ICommand, IEquatable<TCommand>
{
public Func<TCommand, PartitionKeys> GetSpecifyPartitionKeysFunc() => SpecifyPartitionKeys;
public OptionalValue<Type> GetAggregatePayloadType() => OptionalValue<Type>.Empty;
public Type GetCommandType() => typeof(TCommand);
public IAggregateProjector GetProjector() => NoneAggregateProjector.Empty;
public object? GetInjection() => Injection;
public Delegate GetHandler() => Handler;
}
Loading

0 comments on commit d7fa2d5

Please sign in to comment.