diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/util/ReplicationRunner.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/util/ReplicationRunner.java index da160be40..4e9b6b39b 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/util/ReplicationRunner.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/util/ReplicationRunner.java @@ -16,17 +16,21 @@ package cz.o2.proxima.core.util; import cz.o2.proxima.core.functional.Consumer; +import cz.o2.proxima.core.functional.TriFunction; import cz.o2.proxima.core.repository.AttributeDescriptor; import cz.o2.proxima.core.storage.StorageType; import cz.o2.proxima.core.storage.StreamElement; import cz.o2.proxima.direct.core.AttributeWriterBase; import cz.o2.proxima.direct.core.BulkAttributeWriter; +import cz.o2.proxima.direct.core.CommitCallback; import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor; import cz.o2.proxima.direct.core.DirectDataOperator; import cz.o2.proxima.direct.core.OnlineAttributeWriter; import cz.o2.proxima.direct.core.commitlog.CommitLogObserver; +import cz.o2.proxima.direct.core.commitlog.CommitLogObserver.OnNextContext; import cz.o2.proxima.direct.core.commitlog.CommitLogReader; import cz.o2.proxima.direct.core.commitlog.ObserveHandle; +import java.net.URI; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -68,43 +72,71 @@ public static void runAttributeReplicas( .flatMap(DirectAttributeFamilyDescriptor::getCommitLogReader)); final ObserveHandle handle; if (writer instanceof OnlineAttributeWriter) { - final OnlineAttributeWriter onlineWriter = writer.online(); handle = primaryCommitLogReader.observe( af.getDesc().getName(), - (CommitLogObserver) - (ingest, context) -> { - log.debug("Replicating input {} to {}", ingest, writer); - onlineWriter.write( - ingest, - (succ, exc) -> { - context.commit(succ, exc); - onReplicated.accept(ingest); - }); - return true; - }); + newReplicationCommitLogObserver(attributes, writer.online(), onReplicated)); } else { - final BulkAttributeWriter bulkWriter = writer.bulk(); handle = primaryCommitLogReader.observe( af.getDesc().getName(), - (CommitLogObserver) - (ingest, context) -> { - log.debug("Replicating input {} to {}", ingest, writer); - bulkWriter.write( - ingest, - context.getWatermark(), - (succ, exc) -> { - context.commit(succ, exc); - onReplicated.accept(ingest); - }); - return true; - }); + newReplicationCommitLogObserver(attributes, writer.bulk(), onReplicated)); } ExceptionUtils.unchecked(handle::waitUntilReady); log.info("Started attribute replica {}", af.getDesc().getName()); }); } + static CommitLogObserver newReplicationCommitLogObserver( + List> attributes, + OnlineAttributeWriter writer, + Consumer onReplicated) { + + return newReplicationObserver( + (ingest, context, commit) -> { + writer.write(ingest, commit); + return null; + }, + writer.getUri(), + attributes, + onReplicated); + } + + static CommitLogObserver newReplicationCommitLogObserver( + List> attributes, + BulkAttributeWriter writer, + Consumer onReplicated) { + + return newReplicationObserver( + (ingest, context, commit) -> { + writer.write(ingest, context.getWatermark(), commit); + return null; + }, + writer.getUri(), + attributes, + onReplicated); + } + + private static CommitLogObserver newReplicationObserver( + TriFunction write, + URI uri, + List> attributes, + Consumer onReplicated) { + + return (ingest, context) -> { + if (attributes.contains(ingest.getAttributeDescriptor())) { + write.apply( + ingest, + context, + (succ, exc) -> { + log.debug("Replicated input {} to {}", ingest, uri); + context.commit(succ, exc); + onReplicated.accept(ingest); + }); + } + return true; + }; + } + private ReplicationRunner() {} }