load(I id) {
* The method loads only the recent history of the aggregate.
*
*
The current {@link #snapshotTrigger} is used as a batch size of the read operation,
- * so the method can perform sub-optimally for some time
+ * so the method can perform suboptimally for some time
* after the {@link #snapshotTrigger} change.
*
* @param id
diff --git a/server/src/main/java/io/spine/server/aggregate/AggregateStorage.java b/server/src/main/java/io/spine/server/aggregate/AggregateStorage.java
index 8c84becf634..a4ff39ccfaf 100644
--- a/server/src/main/java/io/spine/server/aggregate/AggregateStorage.java
+++ b/server/src/main/java/io/spine/server/aggregate/AggregateStorage.java
@@ -69,10 +69,10 @@
* its further querying.
*
*
- *
Storing Aggregate events
+ * Storing Aggregate events
*
* Each Aggregate is an event-sourced Entity. To load an Aggregate instance, one plays all
- * of the events emitted by it, eventually obtaining the last known state. While the Event Store
+ * the events emitted by it, eventually obtaining the last known state. While the Event Store
* of a Bounded Context, to which some Aggregate belongs, stores all domain events, using it
* for the sake of loading an Aggregate is inefficient in most cases. An overwhelming number of
* the domain events emitted in a Bounded Context and the restrictions applied by an underlying
@@ -91,7 +91,7 @@
* created. It persists data as Protobuf message records in its pre-configured
* {@link io.spine.server.storage.RecordStorage RecordStorage}.
*
- *
Storing and querying the latest Aggregate states
+ * Storing and querying the latest Aggregate states
*
* End-users of the framework are able to set the visibility level for each Aggregate state
* by using an {@linkplain io.spine.server.entity.EntityVisibility (entity).visibility} option
@@ -108,7 +108,7 @@
*
However, even if the Aggregate visibility is set to
* {@link io.spine.option.EntityOption.Visibility#NONE NONE}, the storage still persists
* the essential bits of Aggregate as-an-Entity. Namely, its identifier, its lifecycle flags
- * and version. Such a behavior allows to speed up the execution of calls such
+ * and version. Such a behavior allows speeding up the execution of calls such
* as {@linkplain #index() obtaining an index} of Aggregate identifiers, which otherwise would
* involve major scans of event storage, with {@code DISTINCT} group operation applied.
*
@@ -208,7 +208,7 @@ public Iterator index() {
/**
* Forms and returns an {@link AggregateHistory} based on the
- * {@linkplain #historyBackward(Object, int)} aggregate history}.
+ * {@linkplain #historyBackward(Object, int) aggregate history}.
*
* @param id
* the identifier of the aggregate for which to return the history
diff --git a/server/src/main/java/io/spine/server/aggregate/AggregateTransaction.java b/server/src/main/java/io/spine/server/aggregate/AggregateTransaction.java
index 60d415b15fd..6ac037f4b7d 100644
--- a/server/src/main/java/io/spine/server/aggregate/AggregateTransaction.java
+++ b/server/src/main/java/io/spine/server/aggregate/AggregateTransaction.java
@@ -41,7 +41,7 @@
*
* @param the type of aggregate IDs
* @param the type of aggregate state
- * @param the type of a {@code ValidatingBuilder} for the aggregate state
+ * @param the type of {@code ValidatingBuilder} for the aggregate state
*/
@Internal
public class AggregateTransaction aggregate, EventEnve
return aggregate.invokeApplier(event);
}
- /**
- * {@inheritDoc}
- *
- *
Additionally, notifies the {@code Aggregate} instance that the event has been played.
- */
- @Override
- public DispatchOutcome play(EventEnvelope event) {
- var outcome = super.play(event);
- entity().onAfterEventPlayed(event);
- return outcome;
- }
-
@Override
protected VersionIncrement createVersionIncrement(EventEnvelope event) {
return VersionIncrement.fromEvent(event);
diff --git a/server/src/main/java/io/spine/server/aggregate/UncommittedHistory.java b/server/src/main/java/io/spine/server/aggregate/UncommittedHistory.java
index 46cedcd15a7..54a11c5e01a 100644
--- a/server/src/main/java/io/spine/server/aggregate/UncommittedHistory.java
+++ b/server/src/main/java/io/spine/server/aggregate/UncommittedHistory.java
@@ -28,21 +28,17 @@
import com.google.common.collect.ImmutableList;
import io.spine.core.Event;
-import io.spine.server.type.EventEnvelope;
-import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
-import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
/**
* Uncommitted events and snapshots created for this aggregate during the dispatching.
*
- *
Watches how the events {@linkplain Aggregate#invokeApplier(EventEnvelope) are sent}
- * to the {@link Aggregate} applier methods. Remembers all such events as uncommitted.
+ * Remembers all successfully applied events as uncommitted.
*
*
Once an aggregate is loaded from the storage, the {@code UncommittedHistory}
* {@linkplain #onAggregateRestored(AggregateHistory) remembers} the event count
@@ -51,13 +47,7 @@
*
If during the dispatching of events the number of events since the last snapshot exceeds
* the snapshot trigger, a snapshot is created and remembered as a part of uncommitted history.
*
- *
In order to ignore the events fed to the aggregate when it's being loaded from the storage,
- * the {@code UncommittedHistory}'s tracking is only {@linkplain #startTracking(int) activated}
- * when the new and truly un-yet-committed events are dispatched to the applier methods.
- * The tracking {@linkplain #stopTracking() stops} after all the new events have been played
- * on the aggregate instance.
- *
- * @see Aggregate#apply(List, int) on activation and deactivation of event tracking
+ * @see Aggregate#apply(List, int) remembering successfully applied events
* @see Aggregate#replay(AggregateHistory) on supplying the history stats when loading aggregate
* instances from the storage
*/
@@ -66,10 +56,7 @@ final class UncommittedHistory {
private final Supplier makeSnapshot;
private final List historySegments = new ArrayList<>();
private final List currentSegment = new ArrayList<>();
-
private int eventCountAfterLastSnapshot;
- private @Nullable Integer snapshotTrigger = null;
- private boolean enabled = false;
/**
* Creates an instance of the uncommitted history.
@@ -82,65 +69,27 @@ final class UncommittedHistory {
}
/**
- * Enables the tracking of events.
- *
- * All events {@linkplain #track(EventEnvelope) sent} to this instance will now be counted
- * as new and uncommitted events in the aggregate's history.
- *
- * @param snapshotTrigger
- * the snapshot trigger to consider each time a new event is tracked
- */
- void startTracking(int snapshotTrigger) {
- enabled = true;
- this.snapshotTrigger = snapshotTrigger;
- }
-
- /**
- * Stops the tracking of the events.
- *
- *
All the events {@linkplain #track(EventEnvelope) sent to the tracking} will be counted
- * as the events already present in the aggregate storage.
- */
- void stopTracking() {
- enabled = false;
- snapshotTrigger = null;
- }
-
- /**
- * Tracks the event dispatched to the Aggregate's applier.
- *
- *
If the tracking is not {@linkplain #startTracking(int) started}, the event is considered
- * an old one and such as not requiring storage and tracking. In this case, this method
- * does nothing.
- *
- *
If the event is a new one, it is remembered as a part of the uncommitted history.
+ * Tracks the events, successfully dispatched to the Aggregate's applier.
*
*
If the number of events since the last snapshot equals or exceeds the snapshot trigger,
* a new snapshot is made and saved to the uncommitted history.
*
- * @param envelope
- * an event to track
+ * @param events
+ * successfully applied events
+ * @param snapshotTrigger
+ * number of tracked events, exceeding which, triggers a new snapshot creation
*/
- void track(EventEnvelope envelope) {
- if (!enabled) {
- return;
- }
- requireNonNull(snapshotTrigger,
- "The snapshot trigger must be set" +
- " to track the events applied to an `Aggregate`.");
-
- var event = envelope.outerObject();
- if (event.isRejection()) {
- return;
- }
- currentSegment.add(event);
- var eventsInSegment = currentSegment.size();
- if (eventCountAfterLastSnapshot + eventsInSegment >= snapshotTrigger) {
- var snapshot = makeSnapshot.get();
- var completedSegment = historyFrom(currentSegment, snapshot);
- historySegments.add(completedSegment);
- currentSegment.clear();
- eventCountAfterLastSnapshot = 0;
+ void track(List events, int snapshotTrigger) {
+ for (var event : events) {
+ currentSegment.add(event);
+ var eventsInSegment = currentSegment.size();
+ if (eventCountAfterLastSnapshot + eventsInSegment >= snapshotTrigger) {
+ var snapshot = makeSnapshot.get();
+ var completedSegment = historyFrom(currentSegment, snapshot);
+ historySegments.add(completedSegment);
+ currentSegment.clear();
+ eventCountAfterLastSnapshot = 0;
+ }
}
}
@@ -165,8 +114,7 @@ ImmutableList get() {
*/
UncommittedEvents events() {
var events = get().stream()
- .flatMap(segment -> segment.getEventList()
- .stream())
+ .flatMap(segment -> segment.getEventList().stream())
.collect(toList());
return UncommittedEvents.ofNone()
.append(events);
diff --git a/server/src/main/java/io/spine/server/delivery/InboxMessageMixin.java b/server/src/main/java/io/spine/server/delivery/InboxMessageMixin.java
index 5bb0aa06e7c..97d0640fa22 100644
--- a/server/src/main/java/io/spine/server/delivery/InboxMessageMixin.java
+++ b/server/src/main/java/io/spine/server/delivery/InboxMessageMixin.java
@@ -54,7 +54,7 @@ default TenantId tenant() {
}
/**
- * Generates a new {@code InboxMessageId} with a auto-generated UUID and the given shard
+ * Generates a new {@code InboxMessageId} with an auto-generated UUID and the given shard
* index as parts.
*/
static InboxMessageId generateIdWith(ShardIndex index) {
diff --git a/server/src/main/java/io/spine/server/entity/EventPlayingTransaction.java b/server/src/main/java/io/spine/server/entity/EventPlayingTransaction.java
index a2c755803e4..e7ff9efbc9a 100644
--- a/server/src/main/java/io/spine/server/entity/EventPlayingTransaction.java
+++ b/server/src/main/java/io/spine/server/entity/EventPlayingTransaction.java
@@ -45,7 +45,7 @@
* @param
* the type of entity state
* @param
- * the type of a {@code ValidatingBuilder} for the entity state
+ * the type of {@code ValidatingBuilder} for the entity state
*/
@Internal
public abstract
diff --git a/server/src/main/java/io/spine/server/event/EventBus.java b/server/src/main/java/io/spine/server/event/EventBus.java
index 97e2d286de9..216d4a8c6c6 100644
--- a/server/src/main/java/io/spine/server/event/EventBus.java
+++ b/server/src/main/java/io/spine/server/event/EventBus.java
@@ -142,12 +142,12 @@ public static Builder newBuilder() {
}
@VisibleForTesting
- final Set extends EventDispatcher> dispatchersOf(EventClass eventClass) {
+ Set extends EventDispatcher> dispatchersOf(EventClass eventClass) {
return registry().dispatchersOf(eventClass);
}
@VisibleForTesting
- final boolean hasDispatchers(EventClass eventClass) {
+ boolean hasDispatchers(EventClass eventClass) {
Set> dispatchers = dispatchersOf(eventClass);
return !dispatchers.isEmpty();
}
@@ -159,7 +159,7 @@ final boolean hasDispatchers(EventClass eventClass) {
*
* @return a set of classes of supported events
*/
- public final Set registeredEventClasses() {
+ public Set registeredEventClasses() {
return registry().registeredMessageClasses();
}
@@ -195,13 +195,13 @@ public EventStore eventStore() {
* Posts the event for handling.
*
* Performs the same action as the
- * {@linkplain io.spine.server.bus.Bus#post(Signal, StreamObserver)} parent method},
+ * {@linkplain io.spine.server.bus.Bus#post(Signal, StreamObserver) parent method},
* but does not require any response observer.
*
* @param event the event to be handled
* @see io.spine.server.bus.Bus#post(Signal, StreamObserver)
*/
- public final void post(Event event) {
+ public void post(Event event) {
post(event, observer());
}
@@ -209,15 +209,15 @@ public final void post(Event event) {
* Posts the events for handling.
*
*
Performs the same action as the
- * {@linkplain io.spine.server.bus.Bus#post(Iterable, StreamObserver)} parent method}
+ * {@linkplain io.spine.server.bus.Bus#post(Iterable, StreamObserver) parent method}
* but does not require any response observer.
*
- *
This method should be used if the callee does not care about the events acknowledgement.
+ *
This method should be used if the callee does not care about the events' acknowledgement.
*
* @param events the events to be handled
* @see io.spine.server.bus.Bus#post(Signal, StreamObserver)
*/
- public final void post(Iterable events) {
+ public void post(Iterable events) {
post(events, observer());
}
@@ -230,7 +230,7 @@ StreamObserver observer() {
* Obtains the {@code EventEnricher} used by this Event Bus.
*/
@VisibleForTesting
- public final Optional enricher() {
+ public Optional enricher() {
return Optional.ofNullable(enricher);
}
diff --git a/server/src/main/java/io/spine/server/event/model/EventReactorClass.java b/server/src/main/java/io/spine/server/event/model/EventReactorClass.java
index 7440cb126eb..bdcd65e64b8 100644
--- a/server/src/main/java/io/spine/server/event/model/EventReactorClass.java
+++ b/server/src/main/java/io/spine/server/event/model/EventReactorClass.java
@@ -69,7 +69,7 @@ private EventReactorClass(Class extends S> cls) {
@SuppressWarnings("unchecked")
var result = (EventReactorClass)
get(cls, EventReactorClass.class, () -> new EventReactorClass<>(cls));
- return (result);
+ return result;
}
@Override
diff --git a/server/src/test/java/io/spine/server/aggregate/AbstractAggregateResilienceTest.java b/server/src/test/java/io/spine/server/aggregate/AbstractAggregateResilienceTest.java
new file mode 100644
index 00000000000..7884e7a0a32
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/AbstractAggregateResilienceTest.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.spine.base.CommandMessage;
+import io.spine.client.ResponseFormat;
+import io.spine.core.Command;
+import io.spine.core.Event;
+import io.spine.grpc.MemoizingObserver;
+import io.spine.server.BoundedContext;
+import io.spine.server.BoundedContextBuilder;
+import io.spine.server.aggregate.given.aggregate.AggregateTestEnv;
+import io.spine.server.aggregate.given.employee.Employee;
+import io.spine.server.aggregate.given.employee.EmployeeAgg;
+import io.spine.server.aggregate.given.employee.EmployeeId;
+import io.spine.server.aggregate.given.employee.DispatchExhaust;
+import io.spine.server.aggregate.given.employee.PersonEmployed;
+import io.spine.server.aggregate.given.employee.SalaryIncreased;
+import io.spine.server.event.EventStreamQuery;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.google.common.truth.Truth.assertThat;
+import static io.spine.server.aggregate.given.AbstractAggregateResilienceTestEnv.assertEvents;
+import static io.spine.server.aggregate.given.AbstractAggregateResilienceTestEnv.eventTypes;
+import static io.spine.server.aggregate.given.employee.EmployeeId.generate;
+import static io.spine.server.aggregate.given.employee.Employees.decreaseSalary;
+import static io.spine.server.aggregate.given.employee.Employees.decreaseSalaryThreeTimes;
+import static io.spine.server.aggregate.given.employee.Employees.employ;
+import static io.spine.server.aggregate.given.employee.Employees.increaseSalary;
+
+/**
+ * Tests how {@code Aggregate} handles the case when one of events, emitted by a command,
+ * corrupts the {@code Aggregate}'s state.
+ *
+ * @see AggregateResilienceTest
+ * @see CachedAggregateResilienceTest
+ */
+@DisplayName("Abstract resilient `Aggregate` should")
+abstract class AbstractAggregateResilienceTest {
+
+ private final EmployeeId jack = generate();
+ private AggregateRepository repository;
+ private BoundedContext context;
+
+ @BeforeEach
+ void setUp() {
+ repository = new DefaultAggregateRepository<>(EmployeeAgg.class);
+ context = BoundedContextBuilder.assumingTests()
+ .add(repository)
+ .build();
+ repository.aggregateStorage().enableStateQuerying();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ context.close();
+ }
+
+ @Nested
+ @DisplayName("not store and post events emitted by a command if one of them was not applied successfully")
+ class NotStoreAndPostEventsBundle {
+
+ @Nested
+ @DisplayName("when a faulty command emitted")
+ class WhenCommandEmitted {
+
+ @Test
+ @DisplayName("a single event")
+ void singleEvent() {
+ dispatch(
+ employ(jack, 250),
+ increaseSalary(jack, 15),
+
+ // This command emits the event that will corrupt the aggregate's state
+ // as no employee can be paid less than 200.
+ //
+ // This event should neither be stored nor posted.
+ decreaseSalary(jack, 500),
+
+ increaseSalary(jack, 500)
+ );
+
+ var exhaust = collectExhaust();
+ var expectedEvents = eventTypes(
+ PersonEmployed.class,
+ SalaryIncreased.class,
+ SalaryIncreased.class
+ );
+
+ assertEvents(exhaust.storedEvents(), expectedEvents);
+ assertEvents(exhaust.postedEvents(), expectedEvents);
+ assertThat(exhaust.state().getSalary())
+ .isEqualTo(250 + 15 + 500);
+ }
+
+ @Test
+ @DisplayName("multiple events")
+ void multipleEvents() {
+ dispatch(
+ employ(jack, 250),
+ increaseSalary(jack, 200),
+
+ // This command emits three events. Second one will corrupt
+ // the aggregate's state as no employee can be paid less than 200.
+ //
+ // These events should neither be stored nor posted.
+ decreaseSalaryThreeTimes(jack, 200),
+
+ increaseSalary(jack, 100)
+ );
+
+ var exhaust = collectExhaust();
+ var expectedEvents = eventTypes(
+ PersonEmployed.class,
+ SalaryIncreased.class,
+ SalaryIncreased.class
+ );
+
+ assertEvents(exhaust.storedEvents(), expectedEvents);
+ assertEvents(exhaust.postedEvents(), expectedEvents);
+ assertThat(exhaust.state().getSalary())
+ .isEqualTo(250 + 200 + 100);
+ }
+
+ private void dispatch(CommandMessage... messages) {
+ var commands = Arrays.stream(messages)
+ .map(AggregateTestEnv::command)
+ .collect(Collectors.toList());
+ AbstractAggregateResilienceTest.this.dispatch(commands, context);
+ }
+
+ private DispatchExhaust collectExhaust() {
+ var observer = new MemoizingObserver();
+ context.eventBus()
+ .eventStore()
+ .read(EventStreamQuery.getDefaultInstance(), observer);
+ var postedEvents = observer.responses();
+
+ var storedEvents = repository.aggregateStorage()
+ .read(jack)
+ .orElseThrow()
+ .getEventList();
+
+ var rawState = repository.aggregateStorage()
+ .readStates(ResponseFormat.getDefaultInstance())
+ .next()
+ .getState();
+
+ try {
+ var state = rawState.unpack(Employee.class);
+ return new DispatchExhaust(storedEvents, postedEvents, state);
+
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Dispatches the passed commands to the passed context.
+ *
+ * The way we dispatch commands determines whether
+ * an {@code Aggregate} would be cached or not.
+ */
+ abstract void dispatch(List commands, BoundedContext context);
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/AggregateResilienceTest.java b/server/src/test/java/io/spine/server/aggregate/AggregateResilienceTest.java
new file mode 100644
index 00000000000..186a851bb47
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/AggregateResilienceTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate;
+
+import io.spine.core.Command;
+import io.spine.grpc.StreamObservers;
+import io.spine.server.BoundedContext;
+import org.junit.jupiter.api.DisplayName;
+
+import java.util.List;
+
+/**
+ * Tests how {@code Aggregate} handles the case when one of events, emitted by a command,
+ * corrupts the {@code Aggregate}'s state.
+ *
+ * @see AbstractAggregateResilienceTest
+ * @see CachedAggregateResilienceTest
+ */
+@DisplayName("Resilient `Aggregate` should")
+final class AggregateResilienceTest extends AbstractAggregateResilienceTest {
+
+ /**
+ * @inheritDoc
+ *
+ * This method dispatches the passed commands directly to the context's
+ * {@code CommandBus} one by one.
+ */
+ @Override
+ void dispatch(List commands, BoundedContext context) {
+ var commandBus = context.commandBus();
+ for (var cmd : commands) {
+ commandBus.post(cmd, StreamObservers.noOpObserver());
+ }
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/AggregateTest.java b/server/src/test/java/io/spine/server/aggregate/AggregateTest.java
index f901fae0f51..38ac9b16407 100644
--- a/server/src/test/java/io/spine/server/aggregate/AggregateTest.java
+++ b/server/src/test/java/io/spine/server/aggregate/AggregateTest.java
@@ -450,7 +450,7 @@ class ReturnEventRecords {
@Test
@DisplayName("which are uncommitted")
- void uncommitedAfterDispatch() {
+ void uncommittedAfterDispatch() {
aggregate.dispatchCommands(command(createProject),
command(addTask),
command(startProject));
@@ -493,7 +493,7 @@ class NotHaveEventRecords {
@Test
@DisplayName("which are uncommitted")
- void uncommitedByDefault() {
+ void uncommittedByDefault() {
var events = aggregate().getUncommittedEvents();
assertFalse(events.nonEmpty());
diff --git a/server/src/test/java/io/spine/server/aggregate/CachedAggregateResilienceTest.java b/server/src/test/java/io/spine/server/aggregate/CachedAggregateResilienceTest.java
new file mode 100644
index 00000000000..50018b073cf
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/CachedAggregateResilienceTest.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate;
+
+import io.spine.core.Command;
+import io.spine.environment.Tests;
+import io.spine.server.BoundedContext;
+import io.spine.server.ServerEnvironment;
+import io.spine.server.aggregate.given.employee.PreparedInboxStorage;
+import io.spine.server.aggregate.given.employee.PreparedStorageFactory;
+import io.spine.server.delivery.DeliveryStrategy;
+import io.spine.server.delivery.ShardIndex;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+
+import java.util.List;
+
+/**
+ * Tests how cached {@code Aggregate} handles the case when one of events,
+ * emitted by a command, corrupts the {@code Aggregate}'s state.
+ *
+ * An {@code Aggregate} is cached when multiple messages are dispatched from `Inbox` at once.
+ * Under the hood, they are processed as a "batch", which triggers the aggregate
+ * to be cached for their processing.
+ *
+ *
This class uses the custom {@linkplain PreparedInboxStorage InboxStorage} which allow
+ * writing messages there directly. This storage is "fed" to the delivery which then is triggered
+ * to perform dispatching.
+ *
+ * @see AbstractAggregateResilienceTest
+ * @see AggregateResilienceTest
+ */
+@DisplayName("Cached resilient `Aggregate` should")
+final class CachedAggregateResilienceTest extends AbstractAggregateResilienceTest {
+
+ private final ShardIndex shardIndex = DeliveryStrategy.newIndex(0, 1);
+ private PreparedInboxStorage inboxStorage;
+
+ @Override
+ @BeforeEach
+ void setUp() {
+ ServerEnvironment.instance().reset();
+ inboxStorage = new PreparedInboxStorage();
+ ServerEnvironment.when(Tests.class)
+ .use(PreparedStorageFactory.with(inboxStorage));
+ super.setUp();
+ }
+
+ @Override
+ @AfterEach
+ void tearDown() throws Exception {
+ ServerEnvironment.instance().reset();
+ inboxStorage.close();
+ super.tearDown();
+ }
+
+ /**
+ * @inheritDoc
+ *
+ *
This method fills the custom {@linkplain PreparedInboxStorage InboxStorage} with the passed
+ * commands and then runs delivery. The commands, dispatched this way, will be processed
+ * withing a single "batch" which would cause an {@code Aggregate} to be cached.
+ */
+ @Override
+ void dispatch(List commands, BoundedContext context) {
+ inboxStorage.write(shardIndex, commands);
+ ServerEnvironment.instance().delivery().deliverMessagesFrom(shardIndex);
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/given/AbstractAggregateResilienceTestEnv.java b/server/src/test/java/io/spine/server/aggregate/given/AbstractAggregateResilienceTestEnv.java
new file mode 100644
index 00000000000..2bb10ef9583
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/given/AbstractAggregateResilienceTestEnv.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate.given;
+
+import io.spine.base.EventMessage;
+import io.spine.core.Event;
+
+import java.util.List;
+
+import static com.google.common.truth.Truth.assertThat;
+
+/**
+ * Environment for {@link io.spine.server.aggregate.AbstractAggregateResilienceTest}.
+ */
+public final class AbstractAggregateResilienceTestEnv {
+
+ private AbstractAggregateResilienceTestEnv() {
+ }
+
+ /**
+ * Wraps the "varargs-passed" event messages into a {@code List}.
+ */
+ @SafeVarargs
+ public static List>
+ eventTypes(Class extends EventMessage>... types) {
+ return List.of(types);
+ }
+
+ /**
+ * Assert that the passed events correspond to the passed types.
+ */
+ public static void assertEvents(List events, List> types) {
+ assertThat(events.size()).isEqualTo(types.size());
+ for (var i = 0; i < types.size(); i++) {
+ var messageClass = events.get(i)
+ .enclosedMessage()
+ .getClass();
+ assertThat(messageClass).isEqualTo(types.get(i));
+ }
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/given/employee/DispatchExhaust.java b/server/src/test/java/io/spine/server/aggregate/given/employee/DispatchExhaust.java
new file mode 100644
index 00000000000..e95dbe86949
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/given/employee/DispatchExhaust.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate.given.employee;
+
+import com.google.common.collect.ImmutableList;
+import io.spine.core.Event;
+
+import java.util.List;
+
+/**
+ * Result of {@code Message}s dispatching.
+ *
+ * Consists of:
+ *
+ * - events stored to {@code AggregateStorage};
+ * - events posted to {@code EventBus};
+ * - updated {@code Aggregate}'s state.
+ *
+ *
+ * A healthy {@code Aggregate} usually stores and posts the same set of events within
+ * dispatching. That consequently causes its state to be updated.
+ */
+public final class DispatchExhaust {
+
+ private final ImmutableList stored;
+ private final ImmutableList posted;
+ private final Employee state;
+
+ public DispatchExhaust(
+ List storedEvents,
+ List postedEvents,
+ Employee state
+ ) {
+ this.stored = ImmutableList.copyOf(storedEvents);
+ this.posted = ImmutableList.copyOf(postedEvents);
+ this.state = state;
+ }
+
+ public List storedEvents() {
+ return stored;
+ }
+
+ public List postedEvents() {
+ return posted;
+ }
+
+ public Employee state() {
+ return state;
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/given/employee/EmployeeAgg.java b/server/src/test/java/io/spine/server/aggregate/given/employee/EmployeeAgg.java
new file mode 100644
index 00000000000..ffec9026f27
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/given/employee/EmployeeAgg.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package io.spine.server.aggregate.given.employee;
+
+import io.spine.server.aggregate.Aggregate;
+import io.spine.server.aggregate.Apply;
+import io.spine.server.command.Assign;
+import io.spine.server.tuple.Triplet;
+
+import static io.spine.server.aggregate.given.employee.Employees.salaryDecreased;
+
+public class EmployeeAgg extends Aggregate {
+
+ public EmployeeAgg(EmployeeId id) {
+ super(id);
+ }
+
+ @Assign
+ PersonEmployed handle(Employ cmd) {
+ return PersonEmployed.newBuilder()
+ .setEmployee(cmd.getEmployee())
+ .setSalary(cmd.getSalary())
+ .vBuild();
+ }
+
+ @Apply
+ private void on(PersonEmployed event) {
+ builder()
+ .setId(event.getEmployee())
+ .setSalary(event.getSalary());
+ }
+
+ @Assign
+ SalaryIncreased handle(IncreaseSalary cmd) {
+ return SalaryIncreased.newBuilder()
+ .setEmployee(cmd.getEmployee())
+ .setAmount(cmd.getAmount())
+ .vBuild();
+ }
+
+ @Apply
+ private void on(SalaryIncreased event) {
+ builder()
+ .setId(event.getEmployee())
+ .setSalary(state().getSalary() + event.getAmount());
+ }
+
+ @Assign
+ SalaryDecreased handle(DecreaseSalary cmd) {
+ return SalaryDecreased.newBuilder()
+ .setEmployee(cmd.getEmployee())
+ .setAmount(cmd.getAmount())
+ .vBuild();
+ }
+
+ @Assign
+ Triplet
+ handle(DecreaseSalaryThreeTimes cmd) {
+ var employee = cmd.getEmployee();
+ var amount = cmd.getAmount();
+ return Triplet.of(
+ salaryDecreased(employee, amount),
+ salaryDecreased(employee, amount),
+ salaryDecreased(employee, amount)
+ );
+ }
+
+ @Apply
+ private void on(SalaryDecreased event) {
+ builder()
+ .setId(event.getEmployee())
+ .setSalary(state().getSalary() - event.getAmount());
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/given/employee/Employees.java b/server/src/test/java/io/spine/server/aggregate/given/employee/Employees.java
new file mode 100644
index 00000000000..1bed90bc5d4
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/given/employee/Employees.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate.given.employee;
+
+public class Employees {
+
+ private Employees() {
+ }
+
+ public static Employ employ(EmployeeId employee, int salary) {
+ return Employ.newBuilder()
+ .setEmployee(employee)
+ .setSalary(salary)
+ .vBuild();
+ }
+
+ public static IncreaseSalary increaseSalary(EmployeeId employee, int amount) {
+ return IncreaseSalary.newBuilder()
+ .setEmployee(employee)
+ .setAmount(amount)
+ .vBuild();
+ }
+
+ public static DecreaseSalary decreaseSalary(EmployeeId employee, int amount) {
+ return DecreaseSalary.newBuilder()
+ .setEmployee(employee)
+ .setAmount(amount)
+ .vBuild();
+ }
+
+ public static DecreaseSalaryThreeTimes
+ decreaseSalaryThreeTimes(EmployeeId employee, int amount) {
+ return DecreaseSalaryThreeTimes.newBuilder()
+ .setEmployee(employee)
+ .setAmount(amount)
+ .vBuild();
+ }
+
+ static SalaryDecreased salaryDecreased(EmployeeId employee, int amount) {
+ return SalaryDecreased.newBuilder()
+ .setEmployee(employee)
+ .setAmount(amount)
+ .vBuild();
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/given/employee/PreparedInboxStorage.java b/server/src/test/java/io/spine/server/aggregate/given/employee/PreparedInboxStorage.java
new file mode 100644
index 00000000000..8cd538677c4
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/given/employee/PreparedInboxStorage.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate.given.employee;
+
+import io.spine.base.Time;
+import io.spine.client.EntityId;
+import io.spine.core.Command;
+import io.spine.server.delivery.InboxId;
+import io.spine.server.delivery.InboxLabel;
+import io.spine.server.delivery.InboxMessage;
+import io.spine.server.delivery.InboxMessageMixin;
+import io.spine.server.delivery.InboxMessageStatus;
+import io.spine.server.delivery.InboxSignalId;
+import io.spine.server.delivery.InboxStorage;
+import io.spine.server.delivery.ShardIndex;
+import io.spine.server.route.CommandRouting;
+import io.spine.server.storage.memory.InMemoryStorageFactory;
+import io.spine.type.TypeUrl;
+
+import static io.spine.base.Identifier.pack;
+
+/**
+ * In-memory {@code InboxStorage} which allows putting commands there directly.
+ */
+public final class PreparedInboxStorage extends InboxStorage {
+
+ public PreparedInboxStorage() {
+ super(InMemoryStorageFactory.newInstance(), false);
+ }
+
+ /**
+ * Puts the passed commands into the storage.
+ */
+ public void write(ShardIndex shardIndex, Iterable commands) {
+ commands.forEach(cmd -> write(shardIndex, cmd));
+ }
+
+ private void write(ShardIndex shardIndex, Command cmd) {
+ var routing = CommandRouting.newInstance(EmployeeId.class);
+ var target = TypeUrl.of(Employee.class);
+
+ var inboxSignalId = InboxSignalId.newBuilder()
+ .setValue(cmd.messageId().getId().getValue().toString())
+ .vBuild();
+ var inboxMessage = InboxMessage.newBuilder()
+ .setId(InboxMessageMixin.generateIdWith(shardIndex))
+ .setSignalId(inboxSignalId)
+ .setInboxId(wrap(routing.apply(cmd.enclosedMessage(), cmd.getContext()), target))
+ .setCommand(cmd)
+ .setLabel(InboxLabel.HANDLE_COMMAND)
+ .setWhenReceived(Time.currentTime())
+ .setStatus(InboxMessageStatus.TO_DELIVER)
+ .vBuild();
+
+ write(inboxMessage);
+ }
+
+ private static InboxId wrap(T id, TypeUrl target) {
+ var entityId = EntityId.newBuilder()
+ .setId(pack(id))
+ .vBuild();
+ var inboxId = InboxId.newBuilder()
+ .setEntityId(entityId)
+ .setTypeUrl(target.value())
+ .vBuild();
+ return inboxId;
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/given/employee/PreparedStorageFactory.java b/server/src/test/java/io/spine/server/aggregate/given/employee/PreparedStorageFactory.java
new file mode 100644
index 00000000000..24330ef1079
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/given/employee/PreparedStorageFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.server.aggregate.given.employee;
+
+import com.google.protobuf.Message;
+import io.spine.server.ContextSpec;
+import io.spine.server.delivery.InboxStorage;
+import io.spine.server.storage.RecordSpec;
+import io.spine.server.storage.RecordStorage;
+import io.spine.server.storage.StorageFactory;
+import io.spine.server.storage.memory.InMemoryStorageFactory;
+
+/**
+ * In-memory {@code StorageFactory} which substitutes individual storages with the custom ones.
+ */
+public final class PreparedStorageFactory {
+
+ private PreparedStorageFactory() {
+ }
+
+ /**
+ * Returns in-memory {@code StorageFactory} with the custom {@code InboxStorage}.
+ */
+ public static StorageFactory with(InboxStorage inboxStorage) {
+ return new StorageFactory() {
+ @Override
+ public void close() {
+ // NO OP
+ }
+
+ @Override
+ public RecordStorage createRecordStorage(
+ ContextSpec context, RecordSpec spec) {
+ return InMemoryStorageFactory.newInstance().createRecordStorage(context, spec);
+ }
+
+ @Override
+ public InboxStorage createInboxStorage(boolean multitenant) {
+ return inboxStorage;
+ }
+ };
+ }
+}
diff --git a/server/src/test/java/io/spine/server/aggregate/given/employee/package-info.java b/server/src/test/java/io/spine/server/aggregate/given/employee/package-info.java
new file mode 100644
index 00000000000..7d7d3d3e677
--- /dev/null
+++ b/server/src/test/java/io/spine/server/aggregate/given/employee/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Environment classes for {@link io.spine.server.aggregate.AbstractAggregateResilienceTest}.
+ */
+@CheckReturnValue
+@ParametersAreNonnullByDefault
+package io.spine.server.aggregate.given.employee;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/server/src/test/proto/spine/test/aggregate/employee/commands.proto b/server/src/test/proto/spine/test/aggregate/employee/commands.proto
new file mode 100644
index 00000000000..f5f9c7e4278
--- /dev/null
+++ b/server/src/test/proto/spine/test/aggregate/employee/commands.proto
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+syntax = "proto3";
+
+package spine.test.aggregate.employee;
+
+import "spine/options.proto";
+
+option (type_url_prefix) = "type.spine.io";
+option java_package = "io.spine.server.aggregate.given.employee";
+option java_multiple_files = true;
+
+import "spine/test/aggregate/employee/employee.proto";
+
+message Employ {
+ EmployeeId employee = 1 [(required) = true];
+ int32 salary = 2 [(required) = true];
+}
+
+message IncreaseSalary {
+ EmployeeId employee = 1 [(required) = true];
+ int32 amount = 2 [(required) = true];
+}
+
+message DecreaseSalary {
+ EmployeeId employee = 1 [(required) = true];
+ int32 amount = 2 [(required) = true];
+}
+
+message DecreaseSalaryThreeTimes {
+ EmployeeId employee = 1 [(required) = true];
+ int32 amount = 2 [(required) = true];
+}
diff --git a/server/src/test/proto/spine/test/aggregate/employee/employee.proto b/server/src/test/proto/spine/test/aggregate/employee/employee.proto
new file mode 100644
index 00000000000..ee5c24a4cbd
--- /dev/null
+++ b/server/src/test/proto/spine/test/aggregate/employee/employee.proto
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+syntax = "proto3";
+
+package spine.test.aggregate.employee;
+
+import "spine/options.proto";
+
+option (type_url_prefix) = "type.spine.io";
+option java_package = "io.spine.server.aggregate.given.employee";
+option java_multiple_files = true;
+
+message EmployeeId {
+ string uuid = 1 [(required) = true];
+}
+
+message Employee {
+ option (entity).kind = AGGREGATE;
+
+ EmployeeId id = 1 [(required) = true];
+
+ // Employee's compensation for their work.
+ //
+ // Let's take 200$ as the lowest compensation
+ // which employers can legally pay their employees.
+ int32 salary = 2 [(min).value = "200"];
+}
diff --git a/server/src/test/proto/spine/test/aggregate/employee/events.proto b/server/src/test/proto/spine/test/aggregate/employee/events.proto
new file mode 100644
index 00000000000..01b995608a3
--- /dev/null
+++ b/server/src/test/proto/spine/test/aggregate/employee/events.proto
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+syntax = "proto3";
+
+package spine.test.aggregate.employee;
+
+import "spine/options.proto";
+
+option (type_url_prefix) = "type.spine.io";
+option java_package = "io.spine.server.aggregate.given.employee";
+option java_multiple_files = true;
+
+import "spine/test/aggregate/employee/employee.proto";
+
+message PersonEmployed {
+ EmployeeId employee = 1 [(required) = true];
+ int32 salary = 2 [(required) = true];
+}
+
+message SalaryIncreased {
+ EmployeeId employee = 1 [(required) = true];
+ int32 amount = 2 [(required) = true];
+}
+
+message SalaryDecreased {
+ EmployeeId employee = 1 [(required) = true];
+ int32 amount = 2 [(required) = true];
+}
diff --git a/server/src/test/proto/spine/test/aggregate/thermometer/thermometer.proto b/server/src/test/proto/spine/test/aggregate/thermometer/thermometer.proto
index 3d1f39e7030..78c7f0db0ee 100644
--- a/server/src/test/proto/spine/test/aggregate/thermometer/thermometer.proto
+++ b/server/src/test/proto/spine/test/aggregate/thermometer/thermometer.proto
@@ -1,3 +1,28 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
syntax = "proto3";
package spine.test.aggregate;
diff --git a/version.gradle.kts b/version.gradle.kts
index 8cdc6a01fab..fd1c69f35bf 100644
--- a/version.gradle.kts
+++ b/version.gradle.kts
@@ -32,4 +32,4 @@ val toolBaseVersion: String by extra("2.0.0-SNAPSHOT.84")
val mcJavaVersion: String by extra("2.0.0-SNAPSHOT.83")
/** The version of this library. */
-val versionToPublish: String by extra("2.0.0-SNAPSHOT.91")
+val versionToPublish: String by extra("2.0.0-SNAPSHOT.92")