From 52e3ee6c436095cbe1026582591c144f49d199a8 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Fri, 8 Sep 2023 15:49:25 +0200 Subject: [PATCH] [proxima-direct-core] allow partitioned InMemStorage to create CachedView --- .../core/DirectAttributeFamilyDescriptor.java | 5 +- .../direct/core/storage/InMemStorage.java | 14 ++-- .../direct/core/storage/InMemStorageTest.java | 82 +++++++++++++------ 3 files changed, 68 insertions(+), 33 deletions(-) diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyDescriptor.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyDescriptor.java index 05980711d..6f141ccd8 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyDescriptor.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyDescriptor.java @@ -24,6 +24,7 @@ import cz.o2.proxima.direct.core.commitlog.CommitLogReader; import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader; import cz.o2.proxima.direct.core.view.CachedView; +import cz.o2.proxima.internal.com.google.common.base.Preconditions; import java.io.Serializable; import java.util.List; import java.util.Objects; @@ -228,7 +229,9 @@ public Optional getCachedView() { private CachedView cachedView() { if (cachedView == null) { - cachedView = Objects.requireNonNull(cachedViewFactory).apply(repo()); + Preconditions.checkArgument( + cachedViewFactory != null, "Family %s cannot create cached view.", getDesc().getName()); + cachedView = cachedViewFactory.apply(repo()); } return cachedView; } diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java index e2395d4da..98ccbef97 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java @@ -352,6 +352,7 @@ public ObserveHandle observe(String name, Position position, CommitLogObserver o private ObserveHandle observe( String name, Position position, boolean stopAtCurrent, CommitLogObserver observer) { + return observe( name, position, @@ -439,7 +440,9 @@ private ObserveHandle doObserve( stopAtCurrent, observer); final int id = createConsumerId(stopAtCurrent); - observer.onRepartition(asRepartitionContext(getPartitions())); + observer.onRepartition( + asRepartitionContext( + offsets.stream().map(Offset::getPartition).collect(Collectors.toList()))); AtomicReference> observeFuture = new AtomicReference<>(); AtomicBoolean killSwitch = new AtomicBoolean(); Supplier> offsetSupplier = @@ -1229,17 +1232,16 @@ public DataAccessor createAccessor( if (numPartitions > 1) { randomAccessReaderFactory = null; batchLogReaderFactory = null; - cachedViewFactory = null; } else { final ReaderFactory readerFactory = new Reader(entity, uri, op.getContext().getExecutorFactory()).asFactory(); randomAccessReaderFactory = readerFactory; batchLogReaderFactory = readerFactory; - cachedViewFactory = - new LocalCachedPartitionedView( - entity, commitLogReaderFactory.apply(opRepo), writerFactory.apply(opRepo)) - .asFactory(); } + cachedViewFactory = + new LocalCachedPartitionedView( + entity, commitLogReaderFactory.apply(opRepo), writerFactory.apply(opRepo)) + .asFactory(); return new DataAccessor() { diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java index b4bddb36d..ed0d70e50 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java @@ -54,6 +54,7 @@ import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader; import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader.Listing; import cz.o2.proxima.direct.core.storage.InMemStorage.ConsumedOffset; +import cz.o2.proxima.direct.core.view.CachedView; import cz.o2.proxima.internal.com.google.common.base.Preconditions; import cz.o2.proxima.internal.com.google.common.collect.Iterables; import cz.o2.proxima.typesafe.config.ConfigFactory; @@ -66,6 +67,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -642,29 +644,14 @@ public void testObserveMultiplePartitions() throws InterruptedException { final ConcurrentMap partitionHistogram = new ConcurrentHashMap<>(); final CountDownLatch elementsReceived = new CountDownLatch(numElements); // Start observer. - final ObserveHandle observeHandle = + final ObserveHandle observeHandle1 = reader.observePartitions( - reader.getPartitions(), - new CommitLogObserver() { - - @Override - public void onRepartition(OnRepartitionContext context) { - assertEquals(numPartitions, context.partitions().size()); - } - - @Override - public boolean onNext(StreamElement ingest, OnNextContext context) { - partitionHistogram.merge(context.getPartition(), 1L, Long::sum); - context.confirm(); - elementsReceived.countDown(); - return elementsReceived.getCount() > 0; - } - - @Override - public boolean onError(Throwable error) { - throw new RuntimeException(error); - } - }); + reader.getPartitions().subList(0, numPartitions - 1), + createObserver(numPartitions - 1, partitionHistogram, elementsReceived)); + final ObserveHandle observeHandle2 = + reader.observePartitions( + reader.getPartitions().subList(numPartitions - 1, numPartitions), + createObserver(1, partitionHistogram, elementsReceived)); // Write data. for (int i = 0; i < numElements; i++) { writer @@ -684,7 +671,33 @@ public boolean onError(Throwable error) { elementsReceived.await(); assertEquals(3, partitionHistogram.size()); - assertEquals(3, observeHandle.getCurrentOffsets().size()); + assertEquals(2, observeHandle1.getCurrentOffsets().size()); + } + + private static CommitLogObserver createObserver( + int expectedPartitions, + ConcurrentMap partitionHistogram, + CountDownLatch elementsReceived) { + return new CommitLogObserver() { + + @Override + public void onRepartition(OnRepartitionContext context) { + assertEquals(expectedPartitions, context.partitions().size()); + } + + @Override + public boolean onNext(StreamElement ingest, OnNextContext context) { + partitionHistogram.merge(context.getPartition(), 1L, Long::sum); + context.confirm(); + elementsReceived.countDown(); + return elementsReceived.getCount() > 0; + } + + @Override + public boolean onError(Throwable error) { + throw new RuntimeException(error); + } + }; } @Test @@ -709,7 +722,7 @@ public void testObserveSinglePartitionOutOfMultiplePartitions() throws Interrupt @Override public void onRepartition(OnRepartitionContext context) { - assertEquals(numPartitions, context.partitions().size()); + assertEquals(1, context.partitions().size()); } @Override @@ -783,7 +796,24 @@ public void testCachedViewWithMultiplePartitions() { final DataAccessor accessor = storage.createAccessor( direct, createFamilyDescriptor(URI.create("inmem:///test"), numPartitions)); - assertFalse(accessor.getCachedView(direct.getContext()).isPresent()); + Optional maybeView = accessor.getCachedView(direct.getContext()); + assertTrue(maybeView.isPresent()); + CachedView view = maybeView.get(); + assertEquals(3, view.getPartitions().size()); + view.assign(view.getPartitions()); + StreamElement element = + StreamElement.upsert( + entity, + data, + UUID.randomUUID().toString(), + "key", + data.getName(), + System.currentTimeMillis(), + new byte[0]); + view.write(element, (succ, exc) -> assertTrue(succ)); + Optional> written = view.get("key", data); + assertTrue(written.isPresent()); + assertEquals("key", written.get().getKey()); } @Test(timeout = 10000) @@ -914,7 +944,7 @@ private AttributeFamilyDescriptor createFamilyDescriptor(URI storageUri) { private AttributeFamilyDescriptor createFamilyDescriptor(URI storageUri, int numPartitions) { final Map config = new HashMap<>(); if (numPartitions > 1) { - config.put(InMemStorage.NUM_PARTITIONS, 3); + config.put(InMemStorage.NUM_PARTITIONS, numPartitions); } return AttributeFamilyDescriptor.newBuilder() .setName("test")