Skip to content

Latest commit

 

History

History
483 lines (384 loc) · 15.9 KB

README.md

File metadata and controls

483 lines (384 loc) · 15.9 KB

EventStore Java Client Build Status Version Javadoc

This is EventStore driver for Java, that uses Netty for network communication and GSON for object serialization/deserialization to JSON (e.g.: stream metadata, cluster information dto). Client logic implementation is the same as in the original client for .NET platform.

Requirements

  • Java 8
  • EventStore Server >= 3.2.0 (tested with 3.3.1 - 3.8.1)

Maven Dependency

<dependency>
    <groupId>com.github.msemys</groupId>
    <artifactId>esjc</artifactId>
    <version>1.5.0</version>
</dependency>

Usage

Creating a Client Instance

There are two ways to create a new client instance. The examples below demonstrate how to create default client with singe-node and cluster-node configuration in both ways.

  • creates a client using builder class
EventStore eventstore = EventStoreBuilder.newBuilder()
    .singleNodeAddress("127.0.0.1", 1113)
    .userCredentials("admin", "changeit")
    .build();
EventStore eventstore = EventStoreBuilder.newBuilder()
    .clusterNodeDiscoveryFromGossipSeeds(asList(
        new InetSocketAddress("127.0.0.1", 2113),
        new InetSocketAddress("127.0.0.1", 2213),
        new InetSocketAddress("127.0.0.1", 2313)))
    .userCredentials("admin", "changeit")
    .build();
  • creates a client by calling constructor and passing settings instance
EventStore eventstore = new EventStore(Settings.newBuilder()
    .nodeSettings(StaticNodeSettings.newBuilder()
        .address("127.0.0.1", 1113)
        .build())
    .userCredentials("admin", "changeit")
    .build());
EventStore eventstore = new EventStore(Settings.newBuilder()
    .nodeSettings(ClusterNodeSettings.forGossipSeedDiscoverer()
        .gossipSeedEndpoints(asList(
            new InetSocketAddress("127.0.0.1", 2113),
            new InetSocketAddress("127.0.0.1", 2213),
            new InetSocketAddress("127.0.0.1", 2313)))
        .build())
    .userCredentials("admin", "changeit")
    .build());

Driver uses full-duplex communication channel to server. It is recommended that only one instance per application is created.

SSL

In order to use secure channel between the cient and server, first of all we need to enable SSL on server side by providing TCP secure port and server certificate.

  • create private key file and self-signed certificate request (for testing purposes)
openssl req \
  -x509 -sha256 -nodes -days 365 -subj "/CN=test.com" \
  -newkey rsa:2048 -keyout domain.pem -out domain.csr
  • export private key file and self-signed certificate request to PKCS#12 archive
openssl pkcs12 -export -inkey domain.pem -in domain.csr -out domain.p12
  • start server with encrypted TCP connection
./run-node.sh --ext-secure-tcp-port 1119 --certificate-file domain.p12

Now we are ready to connect to single-node or cluster-node using secure channel. On the client side we are able to verify server certificate (check CN and expiration date) or accept any server certificate without verification.

// creates a client with secure connection to server whose certificate Common Name (CN) matches 'test.com'
EventStore eventstore = EventStoreBuilder.newBuilder()
    .singleNodeAddress("127.0.0.1", 1119)
    .useSslConnection("test.com")
    .userCredentials("admin", "changeit")
    .build();

// creates a client with secure connection to server without certificate verification
EventStore eventstore = EventStoreBuilder.newBuilder()
    .singleNodeAddress("127.0.0.1", 1119)
    .useSslConnection()
    .userCredentials("admin", "changeit")
    .build();

API Examples

All operations are handled fully asynchronously and returns CompletableFuture<T>. For asynchronous result handling you could use whenComplete((result, throwable) -> { ... }) or thenAccept(result -> { ... }) methods on created future object. To handle result synchronously simply use get() or join() methods on future object.

// handles result asynchronously
eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
    EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build(),
    EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build())
).thenAccept(r -> System.out.println(r.logPosition));

// handles result synchronously
eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
    EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build(),
    EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build())
).thenAccept(r -> System.out.println(r.logPosition)).get();

Writing events

eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
    EventData.newBuilder()
        .type("bar")
        .data(new byte[]{1, 2, 3, 4, 5})
        .metadata(new byte[]{6, 7, 8, 9, 0})
        .build(),
    EventData.newBuilder()
        .eventId(UUID.randomUUID())
        .type("baz")
        .data("dummy content")
        .build(),
    EventData.newBuilder()
        .type("qux")
        .jsonData("{ a : 1 }")
        .build()))
    .thenAccept(r -> System.out.println(r.logPosition));

Transactional writes

try (Transaction t = eventstore.startTransaction("foo", ExpectedVersion.any()).get()) {
    t.write(asList(EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build()));
    t.write(asList(EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build()));
    t.commit();
} catch (Exception e) {
    e.printStackTrace();
}
eventstore.startTransaction("foo", ExpectedVersion.any()).thenAccept(t -> {
    t.write(asList(EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build()));
    t.write(asList(EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build()));
    t.rollback();
});

Reading a single event

eventstore.readEvent("foo", 1, false).thenAccept(e ->
    System.out.format("id: '%s'; type: '%s'; data: '%s'",
        e.event.originalEvent().eventId,
        e.event.originalEvent().eventType,
        e.event.originalEvent().data));

Reading stream events forwards

eventstore.readStreamEventsForward("foo", 10, 5, false).thenAccept(e ->
    e.events.forEach(i -> System.out.format("#%d  id: '%s'; type: '%s'; data: '%s'\n",
        i.originalEvent().eventNumber,
        i.originalEvent().eventId,
        i.originalEvent().eventType,
        new String(i.originalEvent().data))));

Reading stream events backwards

eventstore.readStreamEventsBackward("foo", 10, 5, false).thenAccept(e ->
    e.events.forEach(i -> System.out.format("#%d  id: '%s'; type: '%s'; data: '%s'\n",
        i.originalEvent().eventNumber,
        i.originalEvent().eventId,
        i.originalEvent().eventType,
        new String(i.originalEvent().data))));

Reading all events forwards

eventstore.readAllEventsForward(Position.START, 10, false).thenAccept(e ->
    e.events.forEach(i -> System.out.format("@%s  id: '%s'; type: '%s'; data: '%s'\n",
        i.originalPosition,
        i.originalEvent().eventId,
        i.originalEvent().eventType,
        new String(i.originalEvent().data))));

Reading all events backwards

eventstore.readAllEventsBackward(Position.END, 10, false).thenAccept(e ->
    e.events.forEach(i -> System.out.format("@%s  id: '%s'; type: '%s'; data: '%s'\n",
        i.originalPosition,
        i.originalEvent().eventId,
        i.originalEvent().eventType,
        new String(i.originalEvent().data))));

Subscribes to stream (volatile subscription)

CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToStream("foo", false,
    new VolatileSubscriptionListener() {
        @Override
        public void onEvent(Subscription subscription, ResolvedEvent event) {
            System.out.println(event.originalEvent().eventType);
        }

        @Override
        public void onClose(Subscription subscription, SubscriptionDropReason reason, Exception exception) {
            System.out.println("Subscription closed: " + reason);
        }
    });

volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToStream("foo", false, (s, e) ->
    System.out.println(e.originalEvent().eventType)
);

volatileSubscription.get().close();

Subscribes to ALL stream (volatile subscription)

CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToAll(false,
    new VolatileSubscriptionListener() {
        @Override
        public void onEvent(Subscription subscription, ResolvedEvent event) {
            System.out.println(event.originalEvent().eventType);
        }

        @Override
        public void onClose(Subscription subscription, SubscriptionDropReason reason, Exception exception) {
            System.out.println("Subscription closed: " + reason);
        }
    });

volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToAll(false, (s, e) -> 
    System.out.println(e.originalEvent().eventType)
);

volatileSubscription.get().close();

Subscribes to stream from event number (catch-up subscription)

CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 3,
    new CatchUpSubscriptionListener() {
        @Override
        public void onLiveProcessingStarted(CatchUpSubscription subscription) {
            System.out.println("Live processing started!");
        }

        @Override
        public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
            System.out.println(event.originalEvent().eventType);
        }

        @Override
        public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
            System.out.println("Subscription closed: " + reason);
        }
    });

catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 3, (s, e) ->
    System.out.println(e.originalEvent().eventType)
);

catchupSubscription.close();

Subscribes to ALL stream from event position (catch-up subscription)

CatchUpSubscription catchupSubscription = eventstore.subscribeToAllFrom(Position.START,
    new CatchUpSubscriptionListener() {
        @Override
        public void onLiveProcessingStarted(CatchUpSubscription subscription) {
            System.out.println("Live processing started!");
        }

        @Override
        public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
            System.out.println(event.originalEvent().eventType);
        }

        @Override
        public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
            System.out.println("Subscription closed: " + reason);
        }
    });

catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToAllFrom(Position.of(1, 1), (s, e) ->
    System.out.println(e.originalEvent().eventType)
);

catchupSubscription.close();

Subscribes to persistent subscription

CompletableFuture<PersistentSubscription> persistentSubscription = eventstore.subscribeToPersistent("foo", "group", 
    new PersistentSubscriptionListener() {
        @Override
        public void onEvent(PersistentSubscription subscription, ResolvedEvent event) {
            System.out.println(event.originalEvent().eventType);
        }
    
        @Override
        public void onClose(PersistentSubscription subscription, SubscriptionDropReason reason, Exception exception) {
            System.out.println("Subscription closed: " + reason);
        }
    });

persistentSubscription.get().close();
CompletableFuture<PersistentSubscription> persistentSubscription = eventstore.subscribeToPersistent("foo", "group", (s, e) ->
    System.out.println(e.originalEvent().eventType)
);

persistentSubscription.get().stop(Duration.ofSeconds(3));

Creates persistent subscription

eventstore.createPersistentSubscription("foo", "group", PersistentSubscriptionSettings.newBuilder()
    .resolveLinkTos(false)
    .historyBufferSize(20)
    .liveBufferSize(10)
    .minCheckPointCount(10)
    .maxCheckPointCount(1000)
    .checkPointAfter(Duration.ofSeconds(2))
    .maxRetryCount(500)
    .maxSubscriberCount(5)
    .messageTimeout(Duration.ofSeconds(30))
    .readBatchSize(500)
    .startFromCurrent()
    .timingStatistics(false)
    .namedConsumerStrategy(SystemConsumerStrategy.ROUND_ROBIN)
    .build()
).thenAccept(r -> System.out.println(r.status));
eventstore.createPersistentSubscription("bar", "group").thenAccept(r -> System.out.println(r.status));

Updates persistent subscription

eventstore.updatePersistentSubscription("foo", "group", PersistentSubscriptionSettings.newBuilder()
    .maxRetryCount(200)
    .readBatchSize(100)
    .build()
).thenAccept(r -> System.out.println(r.status));

Deletes persistent subscription

eventstore.deletePersistentSubscription("bar", "group").thenAccept(r -> System.out.println(r.status));

Deletes stream

eventstore.deleteStream("bar", ExpectedVersion.any()).thenAccept(r -> System.out.println(r.logPosition));

Sets stream metadata

eventstore.setStreamMetadata("foo", ExpectedVersion.any(), StreamMetadata.newBuilder()
    .aclReadRoles(asList("eric", "kyle", "stan", "kenny"))
    .cacheControl(Duration.ofMinutes(10))
    .maxAge(Duration.ofDays(1))
    .customProperty("baz", "dummy text")
    .customProperty("bar", 2)
    .customProperty("quux", 3.4)
    .customProperty("quuux", true)
    .build()
).thenAccept(r -> System.out.println(r.logPosition));
eventstore.setStreamMetadata("foo", ExpectedVersion.any(), StreamMetadata.empty())
    .thenAccept(r -> System.out.println(r.logPosition));

Gets stream metadata

eventstore.getStreamMetadata("foo").thenAccept(r ->
    System.out.format("deleted: %s, version: %s, stream: %s\nmetadata: %s\n",
        r.isStreamDeleted,
        r.metastreamVersion,
        r.stream,
        r.streamMetadata.toJson()));
eventstore.getStreamMetadataAsRawBytes("foo").thenAccept(r ->
    System.out.format("deleted: %s, version: %s, stream: %s\nmetadata-bytes: %s\n",
        r.isStreamDeleted,
        r.metastreamVersion,
        r.stream,
        r.streamMetadata));

Sets system settings

StreamAcl userStreamAcl = StreamAcl.newBuilder()
    .readRoles(asList("eric", "kyle", "stan", "kenny"))
    .writeRoles(asList("butters"))
    .deleteRoles(asList("$admins"))
    .metaReadRoles(asList("victoria", "mackey"))
    .metaWriteRoles(asList("randy"))
    .build();

StreamAcl systemStreamAcl = StreamAcl.newBuilder()
    .readRoles(asList("$admins"))
    .writeRoles(asList("$all"))
    .deleteRoles(asList("$admins"))
    .metaWriteRoles(asList("$all"))
    .build();

eventstore.setSystemSettings(SystemSettings.newBuilder()
    .userStreamAcl(userStreamAcl)
    .systemStreamAcl(systemStreamAcl)
    .build()
).thenAccept(r -> System.out.println(r.logPosition));