diff --git a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java index 8b8e324f3..90fb810b9 100644 --- a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java +++ b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java @@ -327,14 +327,24 @@ private void processSingleIngest(Ingest request, Consumer consumer) { if (log.isDebugEnabled()) { log.debug("Processing input ingest {}", TextFormat.shortDebugString(request)); } - Metrics.INGESTS.increment(); - try { - if (!writeRequest(request, consumer)) { - Metrics.INVALID_REQUEST.increment(); + if (request.getTransactionId().isEmpty()) { + Metrics.INGESTS.increment(); + try { + if (!writeRequest(request, consumer)) { + Metrics.INVALID_REQUEST.increment(); + } + } catch (Exception err) { + log.error("Error processing user request {}", TextFormat.shortDebugString(request), err); + consumer.accept(status(request.getUuid(), 500, err.getMessage())); + } + } else { + @Nullable StreamElement streamElement = validateAndConvertToStreamElement(request, consumer); + if (streamElement != null) { + transactionContext + .get(request.getTransactionId()) + .addOutputs(Collections.singletonList(streamElement)); + consumer.accept(ok(request.getUuid())); } - } catch (Exception err) { - log.error("Error processing user request {}", TextFormat.shortDebugString(request), err); - consumer.accept(status(request.getUuid(), 500, err.getMessage())); } } diff --git a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/RetrieveService.java b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/RetrieveService.java index a54c00ae0..e838ad35d 100644 --- a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/RetrieveService.java +++ b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/RetrieveService.java @@ -103,7 +103,7 @@ public void begin( Rpc.BeginTransactionRequest request, StreamObserver responseObserver) { - String transactionId = transactionContext.create(request.getTranscationId()); + String transactionId = transactionContext.create(request.getTransactionId()); responseObserver.onNext( Rpc.BeginTransactionResponse.newBuilder().setTransactionId(transactionId).build()); responseObserver.onCompleted(); diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java index 053df181c..42403511e 100644 --- a/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java +++ b/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java @@ -31,12 +31,14 @@ import cz.o2.proxima.direct.core.transaction.ServerTransactionManager; import cz.o2.proxima.direct.server.IngestService; import cz.o2.proxima.direct.server.RetrieveService; +import cz.o2.proxima.direct.server.rpc.proto.service.Rpc; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.BeginTransactionRequest; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.BeginTransactionResponse; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.GetRequest; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.GetResponse; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.Ingest; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.IngestBulk; +import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.Status; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.StatusBulk; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.TransactionCommitRequest; import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.TransactionCommitResponse; @@ -51,6 +53,8 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -168,6 +172,53 @@ public void testTransactionReadWrite() throws InterruptedException { assertEquals(3, getResponse.getValue().size()); } + @Test(timeout = 10000) + public void testTransactionReadWriteSingle() throws InterruptedException { + BeginTransactionResponse response = begin(); + String transactionId = response.getTransactionId(); + assertFalse(transactionId.isEmpty()); + long stamp = System.currentTimeMillis(); + replicatedLatch = new CountDownLatch(2); + Rpc.Status status = + ingest( + transactionId, + StreamElement.upsert( + gateway, + gatewayUsers, + UUID.randomUUID().toString(), + "gw1", + gatewayUsers.toAttributePrefix() + "usr1", + stamp, + new byte[] {1, 2, 3})); + + assertEquals(200, status.getStatus()); + + // written, but not yet committed + GetResponse getResponse = + get( + GetRequest.newBuilder() + .setEntity("gateway") + .setKey("gw1") + .setAttribute("user.usr1") + .build()); + assertEquals(404, getResponse.getStatus()); + + TransactionCommitResponse commitResponse = commit(transactionId); + assertEquals(TransactionCommitResponse.Status.COMMITTED, commitResponse.getStatus()); + replicatedLatch.await(); + + // verify we can read the results + getResponse = + get( + GetRequest.newBuilder() + .setEntity("gateway") + .setKey("gw1") + .setAttribute("user.usr1") + .build()); + assertEquals(200, getResponse.getStatus()); + assertEquals(3, getResponse.getValue().size()); + } + @Test(timeout = 20000) public void testTransactionCommitRejected() throws InterruptedException { // we need two simultaneous transactions @@ -336,12 +387,36 @@ private BeginTransactionResponse begin(@Nullable String transactionId) { ListStreamObserver beginObserver = intoList(); retrieve.begin( BeginTransactionRequest.newBuilder() - .setTranscationId(MoreObjects.firstNonNull(transactionId, "")) + .setTransactionId(MoreObjects.firstNonNull(transactionId, "")) .build(), beginObserver); return Iterables.getOnlyElement(beginObserver.getOutputs()); } + private Rpc.Status ingest(String transactionId, StreamElement update) + throws InterruptedException { + + BlockingQueue status = new ArrayBlockingQueue<>(1); + StreamObserver res = + new StreamObserver<>() { + @Override + public void onNext(Rpc.Status s) { + ExceptionUtils.unchecked(() -> status.put(s)); + } + + @Override + public void onError(Throwable throwable) { + log.error("Error", throwable); + throw new IllegalStateException(throwable); + } + + @Override + public void onCompleted() {} + }; + ingest.ingest(asIngest(transactionId, update), res); + return status.take(); + } + private StatusBulk ingestBulk(String transactionId, StreamElement... updates) { return ingestBulk(transactionId, Arrays.asList(updates)); } @@ -353,23 +428,25 @@ private StatusBulk ingestBulk(String transactionId, List updates) IngestBulk.newBuilder() .addAllIngest( updates.stream() - .map( - el -> - Ingest.newBuilder() - .setUuid(el.getUuid()) - .setTransactionId(transactionId) - .setEntity(el.getEntityDescriptor().getName()) - .setKey(el.getKey()) - .setAttribute(el.getAttribute()) - .setStamp(el.getStamp()) - .setValue(ByteString.copyFrom(el.getValue())) - .build()) + .map(el -> asIngest(transactionId, el)) .collect(Collectors.toList())) .build()); ingestBulk.onCompleted(); return Iterables.getOnlyElement(ingestObserver.getOutputs()); } + private static Ingest asIngest(String transactionId, StreamElement el) { + return Ingest.newBuilder() + .setUuid(el.getUuid()) + .setTransactionId(transactionId) + .setEntity(el.getEntityDescriptor().getName()) + .setKey(el.getKey()) + .setAttribute(el.getAttribute()) + .setStamp(el.getStamp()) + .setValue(ByteString.copyFrom(el.getValue())) + .build(); + } + private static ListStreamObserver intoList() { return new ListStreamObserver<>(); } diff --git a/direct/transaction-manager/src/test/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserverTest.java b/direct/transaction-manager/src/test/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserverTest.java index 107abce4e..95bb43df4 100644 --- a/direct/transaction-manager/src/test/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserverTest.java +++ b/direct/transaction-manager/src/test/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserverTest.java @@ -241,6 +241,7 @@ public void testCreateTransactionCommitWithConflictInOutputs() throws Interrupte Collections.singletonList( userGateways.upsert( t2OpenResponse.getSeqId(), "user", "1", t1OpenResponse.getStamp(), new byte[] {})); + // cannot overwrite higher sequentialId with lower one clientManager.commit(t2, t2Outputs).thenAccept(responseQueue::add); Response response = responseQueue.take(); assertEquals(Response.Flags.COMMITTED, response.getFlags()); diff --git a/rpc/src/main/proto/rpc.proto b/rpc/src/main/proto/rpc.proto index 7fc050349..e62c09c84 100644 --- a/rpc/src/main/proto/rpc.proto +++ b/rpc/src/main/proto/rpc.proto @@ -331,7 +331,7 @@ message BeginTransactionRequest { repeated KeyAttribute attributesInvolved = 1; /** An optional transaction ID. When omitted a new transaction ID will be generated. */ - string transcationId = 2; + string transactionId = 2; }