diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java index 3450f3973..5a97134c6 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java @@ -188,10 +188,17 @@ public boolean onNext(StreamElement element, OnNextContext context) { @Override public boolean onError(Throwable error) { - log.error("Error in caching data. Restarting consumption", error); - assign(partitions); + log.error("Error in caching data. Restarting consumption.", error); + assign(partitions, updateCallback, ttl); return false; } + + @Override + public void onIdle(OnIdleContext context) { + if (ttl != null) { + lastCleanup = maybeDoCleanup(lastCleanup, ttlMs); + } + } }; synchronized (this) { diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java index 919db2e12..3d21f3e37 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java @@ -132,10 +132,13 @@ public void testWriteSimpleWithCallbackCalledOnce() throws InterruptedException @Test public void testWriteOnCacheError() { + AtomicInteger errors = new AtomicInteger(); view.assign( singlePartition(), (elem, old) -> { - throw new IllegalStateException("Fail"); + if (errors.incrementAndGet() < 2) { + throw new IllegalStateException("Fail"); + } }); writer.write(update("key", armed, now), (succ, exc) -> {}); assertTrue(view.get("key", armed, now).isPresent()); diff --git a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java index 2afb97cb2..132467373 100644 --- a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java +++ b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java @@ -347,6 +347,7 @@ void ensureSession(Session session) { } static PreparedStatement prepare(Session session, String statement) { + log.debug("Trying to prepare statement {}", statement); PreparedStatement ret = session.prepare(statement); log.info("Prepared statement {} as {}", statement, ret); return ret; diff --git a/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java b/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java index df98caa23..cba14f9d4 100644 --- a/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java +++ b/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java @@ -288,47 +288,51 @@ private void startHouseKeeping() { () -> { while (!Thread.currentThread().isInterrupted()) { try { - manager.houseKeeping(); - long now = currentTimeMillis(); - long cleanupInterval = manager.getCfg().getCleanupInterval(); - long cleanup = now - cleanupInterval; - int cleaned; - try (var l = Locker.of(lock.writeLock())) { - List> toCleanUp = - lastUpdateSeqId.entries().stream() - .filter(e -> e.getValue().getTimestamp() < cleanup) - .collect(Collectors.toList()); - cleaned = toCleanUp.size(); - toCleanUp.forEach(e -> lastUpdateSeqId.remove(e.getKey(), e.getValue())); - } - // release and re-acquire lock to enable progress of any waiting threads - try (var l = Locker.of(lock.writeLock())) { - Iterator> it = - updatesToWildcard.values().iterator(); - while (it.hasNext()) { - Map value = it.next(); - Iterators.removeIf( - value.values().iterator(), e -> e.getTimestamp() < cleanup); - if (value.isEmpty()) { - it.remove(); - } - } - } - long duration = currentTimeMillis() - now; - log.info("Finished housekeeping in {} ms, removed {} records", duration, cleaned); - metrics.getWritesCleaned().increment(cleaned); - if (duration < cleanupInterval) { - sleep(cleanupInterval - duration); - } + doHouseKeeping(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (Throwable err) { log.error("Error in housekeeping thread", err); } } + log.info("Terminated housekeeping thread by request."); }); } + private void doHouseKeeping() throws InterruptedException { + manager.houseKeeping(); + long now = currentTimeMillis(); + long cleanupInterval = manager.getCfg().getCleanupInterval(); + long cleanup = now - cleanupInterval; + int cleaned; + try (var l = Locker.of(lock.writeLock())) { + List> toCleanUp = + lastUpdateSeqId.entries().stream() + .filter(e -> e.getValue().getTimestamp() < cleanup) + .collect(Collectors.toList()); + cleaned = toCleanUp.size(); + toCleanUp.forEach(e -> lastUpdateSeqId.remove(e.getKey(), e.getValue())); + } + // release and re-acquire lock to enable progress of any waiting threads + try (var l = Locker.of(lock.writeLock())) { + Iterator> it = + updatesToWildcard.values().iterator(); + while (it.hasNext()) { + Map value = it.next(); + Iterators.removeIf(value.values().iterator(), e -> e.getTimestamp() < cleanup); + if (value.isEmpty()) { + it.remove(); + } + } + } + long duration = currentTimeMillis() - now; + log.info("Finished housekeeping in {} ms, removed {} records", duration, cleaned); + metrics.getWritesCleaned().increment(cleaned); + if (duration < cleanupInterval) { + sleep(cleanupInterval - duration); + } + } + @VisibleForTesting void sleep(long sleepMs) throws InterruptedException { TimeUnit.MILLISECONDS.sleep(sleepMs);