Skip to content

Commit

Permalink
[proxima-direct-core] Allow CachedView write-through
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Sep 11, 2023
1 parent 84b75f8 commit 0454cdb
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.util.Pair;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.ObserveHandle;
Expand Down Expand Up @@ -129,6 +130,12 @@ default Collection<Partition> getPartitions() {
/** Retrieve underlying {@link CommitLogReader}. */
CommitLogReader getUnderlyingReader();

/**
* Retrieve underlying {@link OnlineAttributeWriter}. Note that using this write might not update
* the cache. In most cases, use {@link #write(StreamElement, CommitCallback)} directly.
*/
OnlineAttributeWriter getUnderlyingWriter();

/** Retrieve a running handle (if present). */
Optional<ObserveHandle> getRunningHandle();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ public CommitLogReader getUnderlyingReader() {
return reader;
}

@Override
public OnlineAttributeWriter getUnderlyingWriter() {
return writer;
}

@Override
public Optional<ObserveHandle> getRunningHandle() {
return Optional.ofNullable(handle.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ private Writer(

@Override
public void write(StreamElement data, CommitCallback statusCallback) {
int dolarSign = data.getAttributeDescriptor().toAttributePrefix().indexOf('$');
int dollarSign = data.getAttributeDescriptor().toAttributePrefix().indexOf('$');
String requiredPrefix =
dolarSign < 0
dollarSign < 0
? data.getAttributeDescriptor().toAttributePrefix()
: data.getAttributeDescriptor().toAttributePrefix().substring(dolarSign + 1);
: data.getAttributeDescriptor().toAttributePrefix().substring(dollarSign + 1);
Preconditions.checkArgument(
data.getAttribute().startsWith(requiredPrefix)
|| data.getAttribute().startsWith(data.getAttributeDescriptor().toAttributePrefix()),
Expand Down Expand Up @@ -431,15 +431,16 @@ private ObserveHandle doObserve(
CommitLogObserver observer,
@Nullable String name) {

final int id = createConsumerId(stopAtCurrent);
log.debug(
"Observing {} as {} from offset {} with position {} and stopAtCurrent {} using observer {}",
"Observing {} as {} from offset {} with position {} and stopAtCurrent {} using observer {} as id {}",
getUri(),
name,
offsets,
position,
stopAtCurrent,
observer);
final int id = createConsumerId(stopAtCurrent);
observer,
id);
observer.onRepartition(
asRepartitionContext(
offsets.stream().map(Offset::getPartition).collect(Collectors.toList())));
Expand All @@ -452,8 +453,8 @@ private ObserveHandle doObserve(
return createHandle(id, observer, offsetSupplier, killSwitch, observeFuture);
}

private int createConsumerId(boolean stopAtCurrent) {
if (!stopAtCurrent) {
private int createConsumerId(boolean isBatch) {
if (!isBatch) {
try (Locker ignore = holder().lockRead()) {
final NavigableMap<Integer, InMemIngestWriter> uriObservers = getObservers(getUri());
final int id = uriObservers.isEmpty() ? 0 : uriObservers.lastKey() + 1;
Expand Down Expand Up @@ -485,6 +486,7 @@ private ObserveHandle createHandle(

@Override
public void close() {
log.debug("Closing consumer {}", consumerId);
getObservers(getUri()).remove(consumerId);
killSwitch.set(true);
observeFuture.get().cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void testWriteSimple() {
assertTrue(view.get("key", armed, now).isPresent());

assertEquals(reader, view.getUnderlyingReader());
assertEquals(writer, view.getUnderlyingWriter());
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down

0 comments on commit 0454cdb

Please sign in to comment.