From 0454cdb45a067b3e78ba4bab92c5b5184197480c Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 11 Sep 2023 14:23:50 +0200 Subject: [PATCH] [proxima-direct-core] Allow CachedView write-through --- .../proxima/direct/core/view/CachedView.java | 7 +++++++ .../core/view/LocalCachedPartitionedView.java | 5 +++++ .../direct/core/storage/InMemStorage.java | 18 ++++++++++-------- .../view/LocalCachedPartitionedViewTest.java | 1 + 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/CachedView.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/CachedView.java index 6c5d116be..49854f45b 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/CachedView.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/CachedView.java @@ -20,6 +20,7 @@ import cz.o2.proxima.core.storage.Partition; import cz.o2.proxima.core.storage.StreamElement; import cz.o2.proxima.core.util.Pair; +import cz.o2.proxima.direct.core.CommitCallback; import cz.o2.proxima.direct.core.OnlineAttributeWriter; import cz.o2.proxima.direct.core.commitlog.CommitLogReader; import cz.o2.proxima.direct.core.commitlog.ObserveHandle; @@ -129,6 +130,12 @@ default Collection getPartitions() { /** Retrieve underlying {@link CommitLogReader}. */ CommitLogReader getUnderlyingReader(); + /** + * Retrieve underlying {@link OnlineAttributeWriter}. Note that using this write might not update + * the cache. In most cases, use {@link #write(StreamElement, CommitCallback)} directly. + */ + OnlineAttributeWriter getUnderlyingWriter(); + /** Retrieve a running handle (if present). */ Optional getRunningHandle(); diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java index 196adfb51..1585ae680 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java @@ -427,6 +427,11 @@ public CommitLogReader getUnderlyingReader() { return reader; } + @Override + public OnlineAttributeWriter getUnderlyingWriter() { + return writer; + } + @Override public Optional getRunningHandle() { return Optional.ofNullable(handle.get()); diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java index 98ccbef97..865245483 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java @@ -243,11 +243,11 @@ private Writer( @Override public void write(StreamElement data, CommitCallback statusCallback) { - int dolarSign = data.getAttributeDescriptor().toAttributePrefix().indexOf('$'); + int dollarSign = data.getAttributeDescriptor().toAttributePrefix().indexOf('$'); String requiredPrefix = - dolarSign < 0 + dollarSign < 0 ? data.getAttributeDescriptor().toAttributePrefix() - : data.getAttributeDescriptor().toAttributePrefix().substring(dolarSign + 1); + : data.getAttributeDescriptor().toAttributePrefix().substring(dollarSign + 1); Preconditions.checkArgument( data.getAttribute().startsWith(requiredPrefix) || data.getAttribute().startsWith(data.getAttributeDescriptor().toAttributePrefix()), @@ -431,15 +431,16 @@ private ObserveHandle doObserve( CommitLogObserver observer, @Nullable String name) { + final int id = createConsumerId(stopAtCurrent); log.debug( - "Observing {} as {} from offset {} with position {} and stopAtCurrent {} using observer {}", + "Observing {} as {} from offset {} with position {} and stopAtCurrent {} using observer {} as id {}", getUri(), name, offsets, position, stopAtCurrent, - observer); - final int id = createConsumerId(stopAtCurrent); + observer, + id); observer.onRepartition( asRepartitionContext( offsets.stream().map(Offset::getPartition).collect(Collectors.toList()))); @@ -452,8 +453,8 @@ private ObserveHandle doObserve( return createHandle(id, observer, offsetSupplier, killSwitch, observeFuture); } - private int createConsumerId(boolean stopAtCurrent) { - if (!stopAtCurrent) { + private int createConsumerId(boolean isBatch) { + if (!isBatch) { try (Locker ignore = holder().lockRead()) { final NavigableMap uriObservers = getObservers(getUri()); final int id = uriObservers.isEmpty() ? 0 : uriObservers.lastKey() + 1; @@ -485,6 +486,7 @@ private ObserveHandle createHandle( @Override public void close() { + log.debug("Closing consumer {}", consumerId); getObservers(getUri()).remove(consumerId); killSwitch.set(true); observeFuture.get().cancel(true); diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java index adaa8234e..92b3d7cf2 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java @@ -97,6 +97,7 @@ public void testWriteSimple() { assertTrue(view.get("key", armed, now).isPresent()); assertEquals(reader, view.getUnderlyingReader()); + assertEquals(writer, view.getUnderlyingWriter()); } @SuppressWarnings({"unchecked", "rawtypes"})