Skip to content

Commit 62eacbd

Browse files
Prototype more tests
1 parent dc45a56 commit 62eacbd

File tree

13 files changed

+301
-53
lines changed

13 files changed

+301
-53
lines changed

server/src/main/java/io/spine/server/ServerEnvironment.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,8 @@ public void reset() {
319319
tracerFactory.reset();
320320
storageFactory.reset();
321321
delivery.reset();
322-
var currentEnv = environment().type();
323-
delivery.use(Delivery.local(), currentEnv);
322+
// var currentEnv = environment().type();
323+
// delivery.use(Delivery.local(), currentEnv);
324324
resetDeploymentType();
325325
}
326326

server/src/main/java/io/spine/server/aggregate/Aggregate.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,14 @@ final BatchDispatchOutcome apply(List<Event> events, int snapshotTrigger) {
348348
var versionedEvents = versionSequence.update(events);
349349
System.out.println("Batch size: " + events.size());
350350
uncommittedHistory.startTracking(snapshotTrigger);
351+
System.out.println("Uncommitted size before: " + uncommittedHistory.events().list().size());
351352
var result = play(versionedEvents);
352353
System.out.println("Batch successful ? " + result.getSuccessful());
353354
uncommittedHistory.stopTracking();
354-
System.out.println("Uncommitted size: " + uncommittedHistory.events().list().size());
355+
System.out.println("Uncommitted size after: " + uncommittedHistory.events().list().size());
356+
System.out.println();
357+
System.out.println();
358+
System.out.println();
355359
return result;
356360
}
357361

server/src/main/java/io/spine/server/aggregate/AggregateRepository.java

+2
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,13 @@ private void initInbox() {
185185
.withBatchListener(new BatchDeliveryListener<I>() {
186186
@Override
187187
public void onStart(I id) {
188+
System.out.println("BatchDeliveryListener started");
188189
cache.startCaching(id);
189190
}
190191

191192
@Override
192193
public void onEnd(I id) {
194+
System.out.println("BatchDeliveryListener ended");
193195
cache.stopCaching(id);
194196
}
195197
})

server/src/main/java/io/spine/server/delivery/Delivery.java

+3
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,9 @@ public Optional<DeliveryStats> deliverMessagesFrom(ShardIndex index) {
466466
private RunResult runDelivery(ShardProcessingSession session) {
467467
var index = session.shardIndex();
468468

469+
System.out.println("runDelivery");
470+
System.out.println(inboxStorage);
471+
469472
var startingPage = inboxStorage.readAll(index, pageSize);
470473
var maybePage = Optional.of(startingPage);
471474

server/src/main/java/io/spine/server/delivery/DeliveryBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,11 @@ public Delivery build() {
318318

319319
private static StorageFactory storageFactory() {
320320
var serverEnvironment = ServerEnvironment.instance();
321+
System.out.println("Getting storage factory for Delivery ...");
322+
System.out.println(serverEnvironment.type());
321323
var currentStorageFactory = serverEnvironment.optionalStorageFactory();
324+
System.out.println(currentStorageFactory.get().createInboxStorage(false));
325+
System.out.println("--------------------");
322326
return currentStorageFactory.orElseGet(InMemoryStorageFactory::newInstance);
323327
}
324328
}

server/src/main/java/io/spine/server/delivery/InboxDeliveries.java

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ final class InboxDeliveries {
4747
*/
4848
ShardedMessageDelivery<InboxMessage> get(String typeUrl) {
4949
var result = contents.get(typeUrl);
50+
contents.forEach((str, smth) -> System.out.println(str));
5051
checkState(result != null,
5152
"Cannot find the registered Inbox for the type URL `%s`.", typeUrl);
5253
return result;

server/src/main/java/io/spine/server/delivery/InboxMessageMixin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ default TenantId tenant() {
5454
}
5555

5656
/**
57-
* Generates a new {@code InboxMessageId} with a auto-generated UUID and the given shard
57+
* Generates a new {@code InboxMessageId} with an auto-generated UUID and the given shard
5858
* index as parts.
5959
*/
6060
static InboxMessageId generateIdWith(ShardIndex index) {

server/src/main/java/io/spine/server/delivery/TargetDelivery.java

+2
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ private void addMessage(InboxMessage message) {
176176
private void deliverVia(BatchDeliveryListener<I> dispatcher,
177177
InboxOfCommands<I> cmdDispatcher,
178178
InboxOfEvents<I> eventDispatcher) {
179+
System.out.println("deliverVia: " + messages.size());
180+
179181
if (messages.size() > 1) {
180182
var packedId = inboxId.value()
181183
.getEntityId()

server/src/test/java/io/spine/server/aggregate/AggregateTest.java

+119-11
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@
3737
import io.spine.core.Event;
3838
import io.spine.core.MessageId;
3939
import io.spine.core.TenantId;
40+
import io.spine.environment.Tests;
4041
import io.spine.server.BoundedContext;
4142
import io.spine.server.BoundedContextBuilder;
43+
import io.spine.server.ContextSpec;
44+
import io.spine.server.ServerEnvironment;
4245
import io.spine.server.aggregate.given.Given;
4346
import io.spine.server.aggregate.given.aggregate.AggregateWithMissingApplier;
4447
import io.spine.server.aggregate.given.aggregate.AmishAggregate;
@@ -48,14 +51,23 @@
4851
import io.spine.server.aggregate.given.aggregate.TaskAggregateRepository;
4952
import io.spine.server.aggregate.given.aggregate.TestAggregate;
5053
import io.spine.server.aggregate.given.aggregate.TestAggregateRepository;
54+
import io.spine.server.aggregate.given.salary.Employee;
5155
import io.spine.server.aggregate.given.salary.EmployeeAgg;
56+
import io.spine.server.aggregate.given.salary.PreparedInboxStorage;
57+
import io.spine.server.aggregate.given.salary.event.NewEmployed;
5258
import io.spine.server.aggregate.given.thermometer.SafeThermometer;
5359
import io.spine.server.aggregate.given.thermometer.SafeThermometerRepo;
5460
import io.spine.server.aggregate.given.thermometer.Thermometer;
5561
import io.spine.server.aggregate.given.thermometer.ThermometerId;
5662
import io.spine.server.aggregate.given.thermometer.event.TemperatureChanged;
63+
import io.spine.server.delivery.DeliveryStrategy;
64+
import io.spine.server.delivery.InboxStorage;
5765
import io.spine.server.delivery.MessageEndpoint;
5866
import io.spine.server.model.ModelError;
67+
import io.spine.server.storage.RecordSpec;
68+
import io.spine.server.storage.RecordStorage;
69+
import io.spine.server.storage.StorageFactory;
70+
import io.spine.server.storage.memory.InMemoryStorageFactory;
5971
import io.spine.server.type.CommandClass;
6072
import io.spine.server.type.CommandEnvelope;
6173
import io.spine.server.type.EventClass;
@@ -80,11 +92,11 @@
8092
import io.spine.test.aggregate.event.AggUserNotified;
8193
import io.spine.test.aggregate.rejection.Rejections.AggCannotReassignUnassignedTask;
8294
import io.spine.testing.logging.mute.MuteLogging;
83-
import io.spine.testing.server.blackbox.BlackBox;
8495
import io.spine.testing.server.blackbox.ContextAwareTest;
8596
import io.spine.testing.server.model.ModelTests;
8697
import io.spine.time.testing.BackToTheFuture;
8798
import io.spine.time.testing.FrozenMadHatterParty;
99+
import io.spine.type.TypeUrl;
88100
import org.junit.jupiter.api.AfterEach;
89101
import org.junit.jupiter.api.BeforeEach;
90102
import org.junit.jupiter.api.DisplayName;
@@ -100,6 +112,7 @@
100112
import static com.google.common.collect.Lists.newArrayList;
101113
import static com.google.common.truth.Truth.assertThat;
102114
import static com.google.common.truth.extensions.proto.ProtoTruth.assertThat;
115+
import static io.spine.base.Identifier.newUuid;
103116
import static io.spine.grpc.StreamObservers.noOpObserver;
104117
import static io.spine.protobuf.AnyPacker.unpack;
105118
import static io.spine.server.aggregate.given.Given.EventMessage.projectCreated;
@@ -113,7 +126,9 @@
113126
import static io.spine.server.aggregate.given.aggregate.AggregateTestEnv.newTenantId;
114127
import static io.spine.server.aggregate.given.aggregate.AggregateTestEnv.reassignTask;
115128
import static io.spine.server.aggregate.given.dispatch.AggregateMessageDispatcher.dispatchCommand;
129+
import static io.spine.server.aggregate.given.salary.Employees.decreaseSalary;
116130
import static io.spine.server.aggregate.given.salary.Employees.employ;
131+
import static io.spine.server.aggregate.given.salary.Employees.increaseSalary;
117132
import static io.spine.server.aggregate.given.salary.Employees.newEmployee;
118133
import static io.spine.server.aggregate.given.salary.Employees.shakeUpSalary;
119134
import static io.spine.server.aggregate.model.AggregateClass.asAggregateClass;
@@ -468,26 +483,119 @@ void addEventsToUncommittedOnlyIfApplied() {
468483
@Test
469484
@DisplayName("add events to `UncommittedHistory` only if they were successfully applied")
470485
void addEventsToUncommittedOnlyIfApplied2() {
471-
var repository = new DefaultAggregateRepository<>(EmployeeAgg.class);
472-
var context = BlackBox.from(
473-
BoundedContextBuilder.assumingTests()
474-
.add(repository)
486+
var jack = newEmployee();
487+
var shardIndex = DeliveryStrategy.newIndex(0, 1);
488+
var inboxStorage = PreparedInboxStorage.withCommands(
489+
shardIndex,
490+
TypeUrl.of(Employee.class),
491+
command(employ(jack, 250)),
492+
command(shakeUpSalary(jack))
475493
);
476-
repository.aggregateStorage().enableStateQuerying();
477494

495+
System.out.println("Setting storage factory ...");
496+
ServerEnvironment.instance().reset();
497+
ServerEnvironment.when(Tests.class)
498+
.use(new StorageFactory() {
499+
@Override
500+
public <I, R extends Message> RecordStorage<I, R> createRecordStorage(
501+
ContextSpec context, RecordSpec<I, R, ?> spec) {
502+
return InMemoryStorageFactory.newInstance().createRecordStorage(context, spec);
503+
}
504+
505+
@Override
506+
public InboxStorage createInboxStorage(boolean multitenant) {
507+
return inboxStorage;
508+
}
509+
510+
@Override
511+
public void close() {
512+
// NO OP
513+
}
514+
});
515+
516+
var repository = new DefaultAggregateRepository<>(EmployeeAgg.class);
517+
BoundedContextBuilder.assumingTests()
518+
.add(repository)
519+
.build();
520+
521+
System.out.println(ServerEnvironment.instance().type());
522+
System.out.println(ServerEnvironment.instance().storageFactory().createInboxStorage(false));
523+
524+
var stats = ServerEnvironment
525+
.instance()
526+
.delivery()
527+
.deliverMessagesFrom(shardIndex)
528+
.orElseThrow();
529+
System.out.println(stats.deliveredCount());
530+
// ServerEnvironment.instance().reset();
531+
532+
var storedEvents = repository.aggregateStorage()
533+
.read(jack)
534+
.orElseThrow()
535+
.getEventList();
536+
537+
assertThat(storedEvents.size()).isEqualTo(1);
538+
assertThat(storedEvents.get(0).enclosedMessage().getClass()).isEqualTo(NewEmployed.class);
539+
}
540+
541+
@Test
542+
@DisplayName("add events to `UncommittedHistory` only if they were successfully applied")
543+
void addEventsToUncommittedOnlyIfApplied3() {
478544
var jack = newEmployee();
479-
context.tolerateFailures()
480-
.receivesCommands(
481-
employ(jack, 2150),
482-
shakeUpSalary(jack)
483-
);
545+
var shardIndex = DeliveryStrategy.newIndex(0, 1);
546+
var inboxStorage = PreparedInboxStorage.withCommands(
547+
shardIndex,
548+
TypeUrl.of(Employee.class),
549+
command(employ(jack, 250)),
550+
command(decreaseSalary(jack, 15)),
551+
command(decreaseSalary(jack, 500)),
552+
command(increaseSalary(jack, 500))
553+
);
554+
555+
System.out.println("Setting storage factory ...");
556+
ServerEnvironment.instance().reset();
557+
ServerEnvironment.when(Tests.class)
558+
.use(new StorageFactory() {
559+
@Override
560+
public <I, R extends Message> RecordStorage<I, R> createRecordStorage(
561+
ContextSpec context, RecordSpec<I, R, ?> spec) {
562+
return InMemoryStorageFactory.newInstance().createRecordStorage(context, spec);
563+
}
564+
565+
@Override
566+
public InboxStorage createInboxStorage(boolean multitenant) {
567+
return inboxStorage;
568+
}
569+
570+
@Override
571+
public void close() {
572+
// NO OP
573+
}
574+
});
575+
576+
var repository = new DefaultAggregateRepository<>(EmployeeAgg.class);
577+
BoundedContextBuilder.assumingTests()
578+
.add(repository)
579+
.build();
580+
581+
System.out.println(ServerEnvironment.instance().type());
582+
System.out.println(ServerEnvironment.instance().storageFactory().createInboxStorage(false));
583+
584+
var stats = ServerEnvironment
585+
.instance()
586+
.delivery()
587+
.deliverMessagesFrom(shardIndex)
588+
.orElseThrow();
589+
System.out.println(stats.deliveredCount());
590+
// ServerEnvironment.instance().reset();
484591

485592
var storedEvents = repository.aggregateStorage()
486593
.read(jack)
487594
.orElseThrow()
488595
.getEventList();
489596

490597
assertThat(storedEvents.size()).isEqualTo(1);
598+
assertThat(storedEvents.get(0).enclosedMessage().getClass()).isEqualTo(NewEmployed.class);
491599
}
492600

493601
@Nested

server/src/test/java/io/spine/server/aggregate/given/salary/EmployeeAgg.java

+30-38
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,23 @@
2727

2828
import io.spine.base.CommandMessage;
2929
import io.spine.base.EventMessage;
30-
import io.spine.server.BoundedContextBuilder;
3130
import io.spine.server.aggregate.Aggregate;
3231
import io.spine.server.aggregate.Apply;
33-
import io.spine.server.aggregate.DefaultAggregateRepository;
32+
import io.spine.server.aggregate.given.command.DecreaseSalary;
3433
import io.spine.server.aggregate.given.command.Employ;
34+
import io.spine.server.aggregate.given.command.IncreaseSalary;
3535
import io.spine.server.aggregate.given.command.ShakeUpSalary;
3636
import io.spine.server.aggregate.given.dispatch.AggregateMessageDispatcher;
3737
import io.spine.server.aggregate.given.salary.event.NewEmployed;
3838
import io.spine.server.aggregate.given.salary.event.SalaryDecreased;
3939
import io.spine.server.aggregate.given.salary.event.SalaryIncreased;
4040
import io.spine.server.command.Assign;
41-
import io.spine.testing.server.blackbox.BlackBox;
42-
import org.junit.jupiter.api.DisplayName;
43-
import org.junit.jupiter.api.Test;
4441

4542
import java.util.List;
4643

47-
import static com.google.common.truth.Truth.assertThat;
4844
import static io.spine.server.aggregate.given.aggregate.AggregateTestEnv.env;
49-
import static io.spine.server.aggregate.given.salary.Employees.employ;
50-
import static io.spine.server.aggregate.given.salary.Employees.newEmployee;
51-
import static io.spine.server.aggregate.given.salary.Employees.shakeUpSalary;
45+
import static io.spine.server.aggregate.given.salary.Employees.salaryDecreased;
46+
import static io.spine.server.aggregate.given.salary.Employees.salaryIncreased;
5247

5348
public class EmployeeAgg extends Aggregate<EmployeeId, Employee, Employee.Builder> {
5449

@@ -71,49 +66,46 @@ private void on(NewEmployed event) {
7166
.setSalary(event.getSalary());
7267
}
7368

74-
@Apply
75-
private void on(SalaryIncreased event) {
76-
builder()
77-
.setId(event.getEmployee())
78-
.setSalary(state().getSalary() + event.getAmount());
79-
}
80-
81-
@Apply
82-
private void on(SalaryDecreased event) {
83-
builder()
84-
.setId(event.getEmployee())
85-
.setSalary(state().getSalary() - event.getAmount());
86-
}
87-
8869
@Assign
8970
Iterable<EventMessage> handle(ShakeUpSalary cmd) {
9071
var employee = cmd.getEmployee();
9172
return List.of(
92-
93-
// we need several events to make them processed as a batch.
9473
salaryDecreased(employee, 15),
9574
salaryIncreased(employee, 50),
9675
salaryIncreased(employee, 75),
97-
salaryIncreased(employee, 100),
9876

9977
// this one would make the aggregate's state invalid.
100-
// employee's can not be paid less than 200.
101-
salaryDecreased(employee, 1000)
78+
// employee can not be paid less than 200.
79+
salaryDecreased(employee, 1000),
80+
81+
salaryIncreased(employee, 100),
82+
salaryIncreased(employee, 75),
83+
salaryIncreased(employee, 50)
10284
);
10385
}
10486

105-
private static SalaryIncreased salaryIncreased(EmployeeId employee, int amount) {
106-
return SalaryIncreased.newBuilder()
107-
.setEmployee(employee)
108-
.setAmount(amount)
109-
.vBuild();
87+
@Assign
88+
SalaryIncreased handle(IncreaseSalary cmd) {
89+
return salaryIncreased(cmd.getEmployee(), cmd.getAmount());
11090
}
11191

112-
private static SalaryDecreased salaryDecreased(EmployeeId employee, int amount) {
113-
return SalaryDecreased.newBuilder()
114-
.setEmployee(employee)
115-
.setAmount(amount)
116-
.vBuild();
92+
@Apply
93+
private void on(SalaryIncreased event) {
94+
builder()
95+
.setId(event.getEmployee())
96+
.setSalary(state().getSalary() + event.getAmount());
97+
}
98+
99+
@Assign
100+
SalaryDecreased handle(DecreaseSalary cmd) {
101+
return salaryDecreased(cmd.getEmployee(), cmd.getAmount());
102+
}
103+
104+
@Apply
105+
private void on(SalaryDecreased event) {
106+
builder()
107+
.setId(event.getEmployee())
108+
.setSalary(state().getSalary() - event.getAmount());
117109
}
118110

119111
public void dispatchCommands(CommandMessage... commands) {

0 commit comments

Comments
 (0)