This is EventStore driver for Java, that uses Netty for network communication and GSON for object serialization/deserialization to JSON (e.g.: stream metadata, cluster information dto). Client logic implementation is the same as in the original client for .NET platform.
- Java 8
- EventStore Server >= 3.2.0 (tested with 3.3.1 - 3.8.1)
<dependency>
<groupId>com.github.msemys</groupId>
<artifactId>esjc</artifactId>
<version>1.5.0</version>
</dependency>
There are two ways to create a new client instance. The examples below demonstrate how to create default client with singe-node and cluster-node configuration in both ways.
- creates a client using builder class
EventStore eventstore = EventStoreBuilder.newBuilder()
.singleNodeAddress("127.0.0.1", 1113)
.userCredentials("admin", "changeit")
.build();
EventStore eventstore = EventStoreBuilder.newBuilder()
.clusterNodeDiscoveryFromGossipSeeds(asList(
new InetSocketAddress("127.0.0.1", 2113),
new InetSocketAddress("127.0.0.1", 2213),
new InetSocketAddress("127.0.0.1", 2313)))
.userCredentials("admin", "changeit")
.build();
- creates a client by calling constructor and passing settings instance
EventStore eventstore = new EventStore(Settings.newBuilder()
.nodeSettings(StaticNodeSettings.newBuilder()
.address("127.0.0.1", 1113)
.build())
.userCredentials("admin", "changeit")
.build());
EventStore eventstore = new EventStore(Settings.newBuilder()
.nodeSettings(ClusterNodeSettings.forGossipSeedDiscoverer()
.gossipSeedEndpoints(asList(
new InetSocketAddress("127.0.0.1", 2113),
new InetSocketAddress("127.0.0.1", 2213),
new InetSocketAddress("127.0.0.1", 2313)))
.build())
.userCredentials("admin", "changeit")
.build());
Driver uses full-duplex communication channel to server. It is recommended that only one instance per application is created.
In order to use secure channel between the cient and server, first of all we need to enable SSL on server side by providing TCP secure port and server certificate.
- create private key file and self-signed certificate request (for testing purposes)
openssl req \
-x509 -sha256 -nodes -days 365 -subj "/CN=test.com" \
-newkey rsa:2048 -keyout domain.pem -out domain.csr
- export private key file and self-signed certificate request to PKCS#12 archive
openssl pkcs12 -export -inkey domain.pem -in domain.csr -out domain.p12
- start server with encrypted TCP connection
./run-node.sh --ext-secure-tcp-port 1119 --certificate-file domain.p12
Now we are ready to connect to single-node or cluster-node using secure channel. On the client side we are able to verify server certificate (check CN and expiration date) or accept any server certificate without verification.
// creates a client with secure connection to server whose certificate Common Name (CN) matches 'test.com'
EventStore eventstore = EventStoreBuilder.newBuilder()
.singleNodeAddress("127.0.0.1", 1119)
.useSslConnection("test.com")
.userCredentials("admin", "changeit")
.build();
// creates a client with secure connection to server without certificate verification
EventStore eventstore = EventStoreBuilder.newBuilder()
.singleNodeAddress("127.0.0.1", 1119)
.useSslConnection()
.userCredentials("admin", "changeit")
.build();
All operations are handled fully asynchronously and returns CompletableFuture<T>
. For asynchronous result handling you could use whenComplete((result, throwable) -> { ... })
or thenAccept(result -> { ... })
methods on created future object. To handle result synchronously simply use get()
or join()
methods on future object.
// handles result asynchronously
eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build(),
EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build())
).thenAccept(r -> System.out.println(r.logPosition));
// handles result synchronously
eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build(),
EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build())
).thenAccept(r -> System.out.println(r.logPosition)).get();
eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
EventData.newBuilder()
.type("bar")
.data(new byte[]{1, 2, 3, 4, 5})
.metadata(new byte[]{6, 7, 8, 9, 0})
.build(),
EventData.newBuilder()
.eventId(UUID.randomUUID())
.type("baz")
.data("dummy content")
.build(),
EventData.newBuilder()
.type("qux")
.jsonData("{ a : 1 }")
.build()))
.thenAccept(r -> System.out.println(r.logPosition));
try (Transaction t = eventstore.startTransaction("foo", ExpectedVersion.any()).get()) {
t.write(asList(EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build()));
t.write(asList(EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build()));
t.commit();
} catch (Exception e) {
e.printStackTrace();
}
eventstore.startTransaction("foo", ExpectedVersion.any()).thenAccept(t -> {
t.write(asList(EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build()));
t.write(asList(EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build()));
t.rollback();
});
eventstore.readEvent("foo", 1, false).thenAccept(e ->
System.out.format("id: '%s'; type: '%s'; data: '%s'",
e.event.originalEvent().eventId,
e.event.originalEvent().eventType,
e.event.originalEvent().data));
eventstore.readStreamEventsForward("foo", 10, 5, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
i.originalEvent().eventNumber,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
eventstore.readStreamEventsBackward("foo", 10, 5, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
i.originalEvent().eventNumber,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
eventstore.readAllEventsForward(Position.START, 10, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
i.originalPosition,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
eventstore.readAllEventsBackward(Position.END, 10, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
i.originalPosition,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToStream("foo", false,
new VolatileSubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(Subscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToStream("foo", false, (s, e) ->
System.out.println(e.originalEvent().eventType)
);
volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToAll(false,
new VolatileSubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(Subscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToAll(false, (s, e) ->
System.out.println(e.originalEvent().eventType)
);
volatileSubscription.get().close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 3,
new CatchUpSubscriptionListener() {
@Override
public void onLiveProcessingStarted(CatchUpSubscription subscription) {
System.out.println("Live processing started!");
}
@Override
public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 3, (s, e) ->
System.out.println(e.originalEvent().eventType)
);
catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToAllFrom(Position.START,
new CatchUpSubscriptionListener() {
@Override
public void onLiveProcessingStarted(CatchUpSubscription subscription) {
System.out.println("Live processing started!");
}
@Override
public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToAllFrom(Position.of(1, 1), (s, e) ->
System.out.println(e.originalEvent().eventType)
);
catchupSubscription.close();
CompletableFuture<PersistentSubscription> persistentSubscription = eventstore.subscribeToPersistent("foo", "group",
new PersistentSubscriptionListener() {
@Override
public void onEvent(PersistentSubscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(PersistentSubscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
persistentSubscription.get().close();
CompletableFuture<PersistentSubscription> persistentSubscription = eventstore.subscribeToPersistent("foo", "group", (s, e) ->
System.out.println(e.originalEvent().eventType)
);
persistentSubscription.get().stop(Duration.ofSeconds(3));
eventstore.createPersistentSubscription("foo", "group", PersistentSubscriptionSettings.newBuilder()
.resolveLinkTos(false)
.historyBufferSize(20)
.liveBufferSize(10)
.minCheckPointCount(10)
.maxCheckPointCount(1000)
.checkPointAfter(Duration.ofSeconds(2))
.maxRetryCount(500)
.maxSubscriberCount(5)
.messageTimeout(Duration.ofSeconds(30))
.readBatchSize(500)
.startFromCurrent()
.timingStatistics(false)
.namedConsumerStrategy(SystemConsumerStrategy.ROUND_ROBIN)
.build()
).thenAccept(r -> System.out.println(r.status));
eventstore.createPersistentSubscription("bar", "group").thenAccept(r -> System.out.println(r.status));
eventstore.updatePersistentSubscription("foo", "group", PersistentSubscriptionSettings.newBuilder()
.maxRetryCount(200)
.readBatchSize(100)
.build()
).thenAccept(r -> System.out.println(r.status));
eventstore.deletePersistentSubscription("bar", "group").thenAccept(r -> System.out.println(r.status));
eventstore.deleteStream("bar", ExpectedVersion.any()).thenAccept(r -> System.out.println(r.logPosition));
eventstore.setStreamMetadata("foo", ExpectedVersion.any(), StreamMetadata.newBuilder()
.aclReadRoles(asList("eric", "kyle", "stan", "kenny"))
.cacheControl(Duration.ofMinutes(10))
.maxAge(Duration.ofDays(1))
.customProperty("baz", "dummy text")
.customProperty("bar", 2)
.customProperty("quux", 3.4)
.customProperty("quuux", true)
.build()
).thenAccept(r -> System.out.println(r.logPosition));
eventstore.setStreamMetadata("foo", ExpectedVersion.any(), StreamMetadata.empty())
.thenAccept(r -> System.out.println(r.logPosition));
eventstore.getStreamMetadata("foo").thenAccept(r ->
System.out.format("deleted: %s, version: %s, stream: %s\nmetadata: %s\n",
r.isStreamDeleted,
r.metastreamVersion,
r.stream,
r.streamMetadata.toJson()));
eventstore.getStreamMetadataAsRawBytes("foo").thenAccept(r ->
System.out.format("deleted: %s, version: %s, stream: %s\nmetadata-bytes: %s\n",
r.isStreamDeleted,
r.metastreamVersion,
r.stream,
r.streamMetadata));
StreamAcl userStreamAcl = StreamAcl.newBuilder()
.readRoles(asList("eric", "kyle", "stan", "kenny"))
.writeRoles(asList("butters"))
.deleteRoles(asList("$admins"))
.metaReadRoles(asList("victoria", "mackey"))
.metaWriteRoles(asList("randy"))
.build();
StreamAcl systemStreamAcl = StreamAcl.newBuilder()
.readRoles(asList("$admins"))
.writeRoles(asList("$all"))
.deleteRoles(asList("$admins"))
.metaWriteRoles(asList("$all"))
.build();
eventstore.setSystemSettings(SystemSettings.newBuilder()
.userStreamAcl(userStreamAcl)
.systemStreamAcl(systemStreamAcl)
.build()
).thenAccept(r -> System.out.println(r.logPosition));