Skip to content

Commit

Permalink
Only trigger Epoch's callback when epoch is at the head of the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored and 游剑 committed Feb 19, 2024
1 parent d731a57 commit 11e3823
Showing 1 changed file with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
public class EpochManager {
protected static final Logger LOG = LoggerFactory.getLogger(EpochManager.class);

enum EpochStatus {
/**
* The latter non-record input has not arrived. So arriving records will be collected into this epoch.
Expand Down Expand Up @@ -51,18 +52,18 @@ public Epoch(long startId, int recordCount, RunnableWithException callback) {
this.status = EpochStatus.Open;
}

public boolean completeOneRecord() throws Exception {
ongoingRecordCount--;
public boolean tryTriggerCallback() throws Exception {
if (ongoingRecordCount == 0 && this.status == EpochStatus.Closed) {
LOG.trace("Epoch {} is finished", this);
LOG.debug("Epoch {} is finished", this);
callback.run();
return true;
}
return false;
}

public void close() {
public void close() throws Exception {
this.status = EpochStatus.Closed;
LOG.debug("Close Epoch {}", this);
}

public String toString() {
Expand All @@ -76,31 +77,42 @@ public String toString() {
public EpochManager() {
this.inputCount = 0;
this.outputQueue = new LinkedList<>();
this.outputQueue.add(new Epoch(0, 0, () -> {}));
this.outputQueue.add(new Epoch(0, 0, () -> {
LOG.info("Empty callback {}", this);
}));
}

public long onRecord() {
LOG.trace("onRecord: {}", this);
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);
LOG.trace("onRecord: {} {}", this, lastEpoch);
lastEpoch.ongoingRecordCount++;
return inputCount++;
}

public long onNonRecord(RunnableWithException callback) {
LOG.trace("onNonRecord: {}", this);
public long onNonRecord(RunnableWithException callback) throws Exception {
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);
lastEpoch.callback = callback;
lastEpoch.close();
if (outputQueue.size() == 1) {
if (lastEpoch.tryTriggerCallback()) {
outputQueue.remove(0);
}
}
LOG.debug("onNonRecord: {} {}", this, lastEpoch);
Epoch epoch = new Epoch(inputCount, 0, callback);
outputQueue.add(epoch);
return inputCount++;
}

public void completeOneRecord(long recordId) throws Exception {
LOG.trace("completeOneRecord: recordId={}, {}", recordId, this);
Tuple2<Epoch, Integer> target = findEpoch(recordId);
Epoch epoch = target.f0;
if (epoch.completeOneRecord() && target.f1 == 0) {
outputQueue.remove(0);
LOG.trace("completeOneRecord: recordId={}, {}, {}", recordId, this, target);
epoch.ongoingRecordCount--;
if (target.f1 == 0) {
if (epoch.tryTriggerCallback()) {
outputQueue.remove(0);
}
}
}

Expand Down

0 comments on commit 11e3823

Please sign in to comment.