diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/EpochManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/EpochManager.java index 4fd8eb8e674..28ef1e07a7f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/EpochManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/EpochManager.java @@ -15,6 +15,7 @@ */ public class EpochManager { protected static final Logger LOG = LoggerFactory.getLogger(EpochManager.class); + enum EpochStatus { /** * The latter non-record input has not arrived. So arriving records will be collected into this epoch. @@ -51,18 +52,18 @@ public Epoch(long startId, int recordCount, RunnableWithException callback) { this.status = EpochStatus.Open; } - public boolean completeOneRecord() throws Exception { - ongoingRecordCount--; + public boolean tryTriggerCallback() throws Exception { if (ongoingRecordCount == 0 && this.status == EpochStatus.Closed) { - LOG.trace("Epoch {} is finished", this); + LOG.debug("Epoch {} is finished", this); callback.run(); return true; } return false; } - public void close() { + public void close() throws Exception { this.status = EpochStatus.Closed; + LOG.debug("Close Epoch {}", this); } public String toString() { @@ -76,31 +77,42 @@ public String toString() { public EpochManager() { this.inputCount = 0; this.outputQueue = new LinkedList<>(); - this.outputQueue.add(new Epoch(0, 0, () -> {})); + this.outputQueue.add(new Epoch(0, 0, () -> { + LOG.info("Empty callback {}", this); + })); } public long onRecord() { - LOG.trace("onRecord: {}", this); Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1); + LOG.trace("onRecord: {} {}", this, lastEpoch); lastEpoch.ongoingRecordCount++; return inputCount++; } - public long onNonRecord(RunnableWithException callback) { - LOG.trace("onNonRecord: {}", this); + public long onNonRecord(RunnableWithException callback) throws Exception { Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1); + lastEpoch.callback = callback; lastEpoch.close(); + if (outputQueue.size() == 1) { + if (lastEpoch.tryTriggerCallback()) { + outputQueue.remove(0); + } + } + LOG.debug("onNonRecord: {} {}", this, lastEpoch); Epoch epoch = new Epoch(inputCount, 0, callback); outputQueue.add(epoch); return inputCount++; } public void completeOneRecord(long recordId) throws Exception { - LOG.trace("completeOneRecord: recordId={}, {}", recordId, this); Tuple2 target = findEpoch(recordId); Epoch epoch = target.f0; - if (epoch.completeOneRecord() && target.f1 == 0) { - outputQueue.remove(0); + LOG.trace("completeOneRecord: recordId={}, {}, {}", recordId, this, target); + epoch.ongoingRecordCount--; + if (target.f1 == 0) { + if (epoch.tryTriggerCallback()) { + outputQueue.remove(0); + } } }