Skip to content

Commit

Permalink
Merge pull request #833: Allow inmem cached view for multiple partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik authored Sep 8, 2023
2 parents 07f049a + 52e3ee6 commit 84b75f8
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.core.view.CachedView;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -228,7 +229,9 @@ public Optional<CachedView> getCachedView() {

private CachedView cachedView() {
if (cachedView == null) {
cachedView = Objects.requireNonNull(cachedViewFactory).apply(repo());
Preconditions.checkArgument(
cachedViewFactory != null, "Family %s cannot create cached view.", getDesc().getName());
cachedView = cachedViewFactory.apply(repo());
}
return cachedView;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ public ObserveHandle observe(String name, Position position, CommitLogObserver o

private ObserveHandle observe(
String name, Position position, boolean stopAtCurrent, CommitLogObserver observer) {

return observe(
name,
position,
Expand Down Expand Up @@ -439,7 +440,9 @@ private ObserveHandle doObserve(
stopAtCurrent,
observer);
final int id = createConsumerId(stopAtCurrent);
observer.onRepartition(asRepartitionContext(getPartitions()));
observer.onRepartition(
asRepartitionContext(
offsets.stream().map(Offset::getPartition).collect(Collectors.toList())));
AtomicReference<Future<?>> observeFuture = new AtomicReference<>();
AtomicBoolean killSwitch = new AtomicBoolean();
Supplier<List<Offset>> offsetSupplier =
Expand Down Expand Up @@ -1229,17 +1232,16 @@ public DataAccessor createAccessor(
if (numPartitions > 1) {
randomAccessReaderFactory = null;
batchLogReaderFactory = null;
cachedViewFactory = null;
} else {
final ReaderFactory readerFactory =
new Reader(entity, uri, op.getContext().getExecutorFactory()).asFactory();
randomAccessReaderFactory = readerFactory;
batchLogReaderFactory = readerFactory;
cachedViewFactory =
new LocalCachedPartitionedView(
entity, commitLogReaderFactory.apply(opRepo), writerFactory.apply(opRepo))
.asFactory();
}
cachedViewFactory =
new LocalCachedPartitionedView(
entity, commitLogReaderFactory.apply(opRepo), writerFactory.apply(opRepo))
.asFactory();

return new DataAccessor() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader.Listing;
import cz.o2.proxima.direct.core.storage.InMemStorage.ConsumedOffset;
import cz.o2.proxima.direct.core.view.CachedView;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.collect.Iterables;
import cz.o2.proxima.typesafe.config.ConfigFactory;
Expand All @@ -66,6 +67,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -642,29 +644,14 @@ public void testObserveMultiplePartitions() throws InterruptedException {
final ConcurrentMap<Partition, Long> partitionHistogram = new ConcurrentHashMap<>();
final CountDownLatch elementsReceived = new CountDownLatch(numElements);
// Start observer.
final ObserveHandle observeHandle =
final ObserveHandle observeHandle1 =
reader.observePartitions(
reader.getPartitions(),
new CommitLogObserver() {

@Override
public void onRepartition(OnRepartitionContext context) {
assertEquals(numPartitions, context.partitions().size());
}

@Override
public boolean onNext(StreamElement ingest, OnNextContext context) {
partitionHistogram.merge(context.getPartition(), 1L, Long::sum);
context.confirm();
elementsReceived.countDown();
return elementsReceived.getCount() > 0;
}

@Override
public boolean onError(Throwable error) {
throw new RuntimeException(error);
}
});
reader.getPartitions().subList(0, numPartitions - 1),
createObserver(numPartitions - 1, partitionHistogram, elementsReceived));
final ObserveHandle observeHandle2 =
reader.observePartitions(
reader.getPartitions().subList(numPartitions - 1, numPartitions),
createObserver(1, partitionHistogram, elementsReceived));
// Write data.
for (int i = 0; i < numElements; i++) {
writer
Expand All @@ -684,7 +671,33 @@ public boolean onError(Throwable error) {
elementsReceived.await();

assertEquals(3, partitionHistogram.size());
assertEquals(3, observeHandle.getCurrentOffsets().size());
assertEquals(2, observeHandle1.getCurrentOffsets().size());
}

private static CommitLogObserver createObserver(
int expectedPartitions,
ConcurrentMap<Partition, Long> partitionHistogram,
CountDownLatch elementsReceived) {
return new CommitLogObserver() {

@Override
public void onRepartition(OnRepartitionContext context) {
assertEquals(expectedPartitions, context.partitions().size());
}

@Override
public boolean onNext(StreamElement ingest, OnNextContext context) {
partitionHistogram.merge(context.getPartition(), 1L, Long::sum);
context.confirm();
elementsReceived.countDown();
return elementsReceived.getCount() > 0;
}

@Override
public boolean onError(Throwable error) {
throw new RuntimeException(error);
}
};
}

@Test
Expand All @@ -709,7 +722,7 @@ public void testObserveSinglePartitionOutOfMultiplePartitions() throws Interrupt

@Override
public void onRepartition(OnRepartitionContext context) {
assertEquals(numPartitions, context.partitions().size());
assertEquals(1, context.partitions().size());
}

@Override
Expand Down Expand Up @@ -783,7 +796,24 @@ public void testCachedViewWithMultiplePartitions() {
final DataAccessor accessor =
storage.createAccessor(
direct, createFamilyDescriptor(URI.create("inmem:///test"), numPartitions));
assertFalse(accessor.getCachedView(direct.getContext()).isPresent());
Optional<CachedView> maybeView = accessor.getCachedView(direct.getContext());
assertTrue(maybeView.isPresent());
CachedView view = maybeView.get();
assertEquals(3, view.getPartitions().size());
view.assign(view.getPartitions());
StreamElement element =
StreamElement.upsert(
entity,
data,
UUID.randomUUID().toString(),
"key",
data.getName(),
System.currentTimeMillis(),
new byte[0]);
view.write(element, (succ, exc) -> assertTrue(succ));
Optional<? extends KeyValue<?>> written = view.get("key", data);
assertTrue(written.isPresent());
assertEquals("key", written.get().getKey());
}

@Test(timeout = 10000)
Expand Down Expand Up @@ -914,7 +944,7 @@ private AttributeFamilyDescriptor createFamilyDescriptor(URI storageUri) {
private AttributeFamilyDescriptor createFamilyDescriptor(URI storageUri, int numPartitions) {
final Map<String, Object> config = new HashMap<>();
if (numPartitions > 1) {
config.put(InMemStorage.NUM_PARTITIONS, 3);
config.put(InMemStorage.NUM_PARTITIONS, numPartitions);
}
return AttributeFamilyDescriptor.newBuilder()
.setName("test")
Expand Down

0 comments on commit 84b75f8

Please sign in to comment.