Skip to content

Commit

Permalink
fix: atomic refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Sep 25, 2024
1 parent 8b498bb commit 1b95cb9
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions ingester-protocol/src/main/java/io/greptime/RouterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class RouterClient implements Lifecycle<RouterOptions>, Health, Display {

private static final SharedScheduledPool REFRESHER_POOL = Util.getSharedScheduledPool("route_cache_refresher", 1);

private final AtomicLong refreshSequencer = new AtomicLong(0);

private ScheduledExecutorService refresher;
private RouterOptions opts;
private RpcClient rpcClient;
Expand All @@ -67,35 +69,36 @@ public boolean init(RouterOptions opts) {

long refreshPeriod = this.opts.getRefreshPeriodSeconds();
if (refreshPeriod > 0) {
AtomicLong order = new AtomicLong(0);
this.refresher = REFRESHER_POOL.getObject();
this.refresher.scheduleWithFixedDelay(
() -> {
long thisOrder = order.incrementAndGet();
long thisSequence = this.refreshSequencer.incrementAndGet();
checkHealth().whenComplete((r, t) -> {
if (t != null) {
LOG.warn("Failed to check health", t);
return;
}

// I don't want to worry about the overflow issue with long anymore,
// because assuming one increment per second, it will take 292 years
// to overflow. I think that's sufficient.
if (thisOrder < order.get()) {
LOG.warn("Skip outdated health check result, order: {}", thisOrder);
return;
}
synchronized (this.refreshSequencer) {
// I don't want to worry about the overflow issue with long anymore,
// because assuming one increment per second, it will take 292 years
// to overflow. I think that's sufficient.
if (thisSequence < this.refreshSequencer.get()) {
LOG.warn("Skip outdated health check result, sequence: {}", thisSequence);
return;
}

List<Endpoint> activities = new ArrayList<>();
List<Endpoint> inactivities = new ArrayList<>();
for (Map.Entry<Endpoint, Boolean> entry : r.entrySet()) {
if (entry.getValue()) {
activities.add(entry.getKey());
} else {
inactivities.add(entry.getKey());
List<Endpoint> activities = new ArrayList<>();
List<Endpoint> inactivities = new ArrayList<>();
for (Map.Entry<Endpoint, Boolean> entry : r.entrySet()) {
if (entry.getValue()) {
activities.add(entry.getKey());
} else {
inactivities.add(entry.getKey());
}
}
this.router.onRefresh(activities, inactivities);
}
this.router.onRefresh(activities, inactivities);
});
},
Util.randomInitialDelay(180),
Expand Down

0 comments on commit 1b95cb9

Please sign in to comment.