Skip to content

Commit

Permalink
Merge pull request #849: [proxima-direct-core] #318 filter target att…
Browse files Browse the repository at this point in the history
…ributes in test replication controller
  • Loading branch information
je-ik authored Oct 31, 2023
2 parents 4124f1a + be5d3a1 commit 05ef3bd
Showing 1 changed file with 57 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
package cz.o2.proxima.core.util;

import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.functional.TriFunction;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.storage.StorageType;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.BulkAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver.OnNextContext;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.ObserveHandle;
import java.net.URI;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -68,43 +72,71 @@ public static void runAttributeReplicas(
.flatMap(DirectAttributeFamilyDescriptor::getCommitLogReader));
final ObserveHandle handle;
if (writer instanceof OnlineAttributeWriter) {
final OnlineAttributeWriter onlineWriter = writer.online();
handle =
primaryCommitLogReader.observe(
af.getDesc().getName(),
(CommitLogObserver)
(ingest, context) -> {
log.debug("Replicating input {} to {}", ingest, writer);
onlineWriter.write(
ingest,
(succ, exc) -> {
context.commit(succ, exc);
onReplicated.accept(ingest);
});
return true;
});
newReplicationCommitLogObserver(attributes, writer.online(), onReplicated));
} else {
final BulkAttributeWriter bulkWriter = writer.bulk();
handle =
primaryCommitLogReader.observe(
af.getDesc().getName(),
(CommitLogObserver)
(ingest, context) -> {
log.debug("Replicating input {} to {}", ingest, writer);
bulkWriter.write(
ingest,
context.getWatermark(),
(succ, exc) -> {
context.commit(succ, exc);
onReplicated.accept(ingest);
});
return true;
});
newReplicationCommitLogObserver(attributes, writer.bulk(), onReplicated));
}
ExceptionUtils.unchecked(handle::waitUntilReady);
log.info("Started attribute replica {}", af.getDesc().getName());
});
}

static CommitLogObserver newReplicationCommitLogObserver(
List<AttributeDescriptor<?>> attributes,
OnlineAttributeWriter writer,
Consumer<StreamElement> onReplicated) {

return newReplicationObserver(
(ingest, context, commit) -> {
writer.write(ingest, commit);
return null;
},
writer.getUri(),
attributes,
onReplicated);
}

static CommitLogObserver newReplicationCommitLogObserver(
List<AttributeDescriptor<?>> attributes,
BulkAttributeWriter writer,
Consumer<StreamElement> onReplicated) {

return newReplicationObserver(
(ingest, context, commit) -> {
writer.write(ingest, context.getWatermark(), commit);
return null;
},
writer.getUri(),
attributes,
onReplicated);
}

private static CommitLogObserver newReplicationObserver(
TriFunction<StreamElement, OnNextContext, CommitCallback, Void> write,
URI uri,
List<AttributeDescriptor<?>> attributes,
Consumer<StreamElement> onReplicated) {

return (ingest, context) -> {
if (attributes.contains(ingest.getAttributeDescriptor())) {
write.apply(
ingest,
context,
(succ, exc) -> {
log.debug("Replicated input {} to {}", ingest, uri);
context.commit(succ, exc);
onReplicated.accept(ingest);
});
}
return true;
};
}

private ReplicationRunner() {}
}

0 comments on commit 05ef3bd

Please sign in to comment.