Skip to content

Commit

Permalink
[proxima-direct-core] fix LocalCachedPartitionedView restart after error
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Dec 16, 2024
1 parent aecb6e6 commit 0dce0e8
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,17 @@ public boolean onNext(StreamElement element, OnNextContext context) {

@Override
public boolean onError(Throwable error) {
log.error("Error in caching data. Restarting consumption", error);
assign(partitions);
log.error("Error in caching data. Restarting consumption.", error);
assign(partitions, updateCallback, ttl);
return false;
}

@Override
public void onIdle(OnIdleContext context) {
if (ttl != null) {
lastCleanup = maybeDoCleanup(lastCleanup, ttlMs);
}
}
};

synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ public void testWriteSimpleWithCallbackCalledOnce() throws InterruptedException

@Test
public void testWriteOnCacheError() {
AtomicInteger errors = new AtomicInteger();
view.assign(
singlePartition(),
(elem, old) -> {
throw new IllegalStateException("Fail");
if (errors.incrementAndGet() < 2) {
throw new IllegalStateException("Fail");
}
});
writer.write(update("key", armed, now), (succ, exc) -> {});
assertTrue(view.get("key", armed, now).isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ void ensureSession(Session session) {
}

static PreparedStatement prepare(Session session, String statement) {
log.debug("Trying to prepare statement {}", statement);
PreparedStatement ret = session.prepare(statement);
log.info("Prepared statement {} as {}", statement, ret);
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,47 +288,51 @@ private void startHouseKeeping() {
() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
manager.houseKeeping();
long now = currentTimeMillis();
long cleanupInterval = manager.getCfg().getCleanupInterval();
long cleanup = now - cleanupInterval;
int cleaned;
try (var l = Locker.of(lock.writeLock())) {
List<Map.Entry<KeyWithAttribute, SeqIdWithTombstone>> toCleanUp =
lastUpdateSeqId.entries().stream()
.filter(e -> e.getValue().getTimestamp() < cleanup)
.collect(Collectors.toList());
cleaned = toCleanUp.size();
toCleanUp.forEach(e -> lastUpdateSeqId.remove(e.getKey(), e.getValue()));
}
// release and re-acquire lock to enable progress of any waiting threads
try (var l = Locker.of(lock.writeLock())) {
Iterator<Map<KeyWithAttribute, SeqIdWithTombstone>> it =
updatesToWildcard.values().iterator();
while (it.hasNext()) {
Map<KeyWithAttribute, SeqIdWithTombstone> value = it.next();
Iterators.removeIf(
value.values().iterator(), e -> e.getTimestamp() < cleanup);
if (value.isEmpty()) {
it.remove();
}
}
}
long duration = currentTimeMillis() - now;
log.info("Finished housekeeping in {} ms, removed {} records", duration, cleaned);
metrics.getWritesCleaned().increment(cleaned);
if (duration < cleanupInterval) {
sleep(cleanupInterval - duration);
}
doHouseKeeping();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (Throwable err) {
log.error("Error in housekeeping thread", err);
}
}
log.info("Terminated housekeeping thread by request.");
});
}

private void doHouseKeeping() throws InterruptedException {
manager.houseKeeping();
long now = currentTimeMillis();
long cleanupInterval = manager.getCfg().getCleanupInterval();
long cleanup = now - cleanupInterval;
int cleaned;
try (var l = Locker.of(lock.writeLock())) {
List<Map.Entry<KeyWithAttribute, SeqIdWithTombstone>> toCleanUp =
lastUpdateSeqId.entries().stream()
.filter(e -> e.getValue().getTimestamp() < cleanup)
.collect(Collectors.toList());
cleaned = toCleanUp.size();
toCleanUp.forEach(e -> lastUpdateSeqId.remove(e.getKey(), e.getValue()));
}
// release and re-acquire lock to enable progress of any waiting threads
try (var l = Locker.of(lock.writeLock())) {
Iterator<Map<KeyWithAttribute, SeqIdWithTombstone>> it =
updatesToWildcard.values().iterator();
while (it.hasNext()) {
Map<KeyWithAttribute, SeqIdWithTombstone> value = it.next();
Iterators.removeIf(value.values().iterator(), e -> e.getTimestamp() < cleanup);
if (value.isEmpty()) {
it.remove();
}
}
}
long duration = currentTimeMillis() - now;
log.info("Finished housekeeping in {} ms, removed {} records", duration, cleaned);
metrics.getWritesCleaned().increment(cleaned);
if (duration < cleanupInterval) {
sleep(cleanupInterval - duration);
}
}

@VisibleForTesting
void sleep(long sleepMs) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(sleepMs);
Expand Down

0 comments on commit 0dce0e8

Please sign in to comment.