diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java
index 3450f3973..5a97134c6 100644
--- a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java
+++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java
@@ -188,10 +188,17 @@ public boolean onNext(StreamElement element, OnNextContext context) {
 
           @Override
           public boolean onError(Throwable error) {
-            log.error("Error in caching data. Restarting consumption", error);
-            assign(partitions);
+            log.error("Error in caching data. Restarting consumption.", error);
+            assign(partitions, updateCallback, ttl);
             return false;
           }
+
+          @Override
+          public void onIdle(OnIdleContext context) {
+            if (ttl != null) {
+              lastCleanup = maybeDoCleanup(lastCleanup, ttlMs);
+            }
+          }
         };
 
     synchronized (this) {
diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java
index 919db2e12..3d21f3e37 100644
--- a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java
+++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedViewTest.java
@@ -132,10 +132,13 @@ public void testWriteSimpleWithCallbackCalledOnce() throws InterruptedException
 
   @Test
   public void testWriteOnCacheError() {
+    AtomicInteger errors = new AtomicInteger();
     view.assign(
         singlePartition(),
         (elem, old) -> {
-          throw new IllegalStateException("Fail");
+          if (errors.incrementAndGet() < 2) {
+            throw new IllegalStateException("Fail");
+          }
         });
     writer.write(update("key", armed, now), (succ, exc) -> {});
     assertTrue(view.get("key", armed, now).isPresent());
diff --git a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java
index 2afb97cb2..132467373 100644
--- a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java
+++ b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java
@@ -347,6 +347,7 @@ void ensureSession(Session session) {
   }
 
   static PreparedStatement prepare(Session session, String statement) {
+    log.debug("Trying to prepare statement {}", statement);
     PreparedStatement ret = session.prepare(statement);
     log.info("Prepared statement {} as {}", statement, ret);
     return ret;
diff --git a/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java b/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java
index df98caa23..cba14f9d4 100644
--- a/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java
+++ b/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java
@@ -288,47 +288,51 @@ private void startHouseKeeping() {
             () -> {
               while (!Thread.currentThread().isInterrupted()) {
                 try {
-                  manager.houseKeeping();
-                  long now = currentTimeMillis();
-                  long cleanupInterval = manager.getCfg().getCleanupInterval();
-                  long cleanup = now - cleanupInterval;
-                  int cleaned;
-                  try (var l = Locker.of(lock.writeLock())) {
-                    List<Map.Entry<KeyWithAttribute, SeqIdWithTombstone>> toCleanUp =
-                        lastUpdateSeqId.entries().stream()
-                            .filter(e -> e.getValue().getTimestamp() < cleanup)
-                            .collect(Collectors.toList());
-                    cleaned = toCleanUp.size();
-                    toCleanUp.forEach(e -> lastUpdateSeqId.remove(e.getKey(), e.getValue()));
-                  }
-                  // release and re-acquire lock to enable progress of any waiting threads
-                  try (var l = Locker.of(lock.writeLock())) {
-                    Iterator<Map<KeyWithAttribute, SeqIdWithTombstone>> it =
-                        updatesToWildcard.values().iterator();
-                    while (it.hasNext()) {
-                      Map<KeyWithAttribute, SeqIdWithTombstone> value = it.next();
-                      Iterators.removeIf(
-                          value.values().iterator(), e -> e.getTimestamp() < cleanup);
-                      if (value.isEmpty()) {
-                        it.remove();
-                      }
-                    }
-                  }
-                  long duration = currentTimeMillis() - now;
-                  log.info("Finished housekeeping in {} ms, removed {} records", duration, cleaned);
-                  metrics.getWritesCleaned().increment(cleaned);
-                  if (duration < cleanupInterval) {
-                    sleep(cleanupInterval - duration);
-                  }
+                  doHouseKeeping();
                 } catch (InterruptedException ex) {
                   Thread.currentThread().interrupt();
                 } catch (Throwable err) {
                   log.error("Error in housekeeping thread", err);
                 }
               }
+              log.info("Terminated housekeeping thread by request.");
             });
   }
 
+  private void doHouseKeeping() throws InterruptedException {
+    manager.houseKeeping();
+    long now = currentTimeMillis();
+    long cleanupInterval = manager.getCfg().getCleanupInterval();
+    long cleanup = now - cleanupInterval;
+    int cleaned;
+    try (var l = Locker.of(lock.writeLock())) {
+      List<Map.Entry<KeyWithAttribute, SeqIdWithTombstone>> toCleanUp =
+          lastUpdateSeqId.entries().stream()
+              .filter(e -> e.getValue().getTimestamp() < cleanup)
+              .collect(Collectors.toList());
+      cleaned = toCleanUp.size();
+      toCleanUp.forEach(e -> lastUpdateSeqId.remove(e.getKey(), e.getValue()));
+    }
+    // release and re-acquire lock to enable progress of any waiting threads
+    try (var l = Locker.of(lock.writeLock())) {
+      Iterator<Map<KeyWithAttribute, SeqIdWithTombstone>> it =
+          updatesToWildcard.values().iterator();
+      while (it.hasNext()) {
+        Map<KeyWithAttribute, SeqIdWithTombstone> value = it.next();
+        Iterators.removeIf(value.values().iterator(), e -> e.getTimestamp() < cleanup);
+        if (value.isEmpty()) {
+          it.remove();
+        }
+      }
+    }
+    long duration = currentTimeMillis() - now;
+    log.info("Finished housekeeping in {} ms, removed {} records", duration, cleaned);
+    metrics.getWritesCleaned().increment(cleaned);
+    if (duration < cleanupInterval) {
+      sleep(cleanupInterval - duration);
+    }
+  }
+
   @VisibleForTesting
   void sleep(long sleepMs) throws InterruptedException {
     TimeUnit.MILLISECONDS.sleep(sleepMs);