diff --git a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala index eb9086ec1..5179582eb 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala @@ -10,6 +10,7 @@ import java.util.UUID import akka.NotUsed import akka.annotation.InternalApi import akka.event.Logging +import akka.pattern.after import akka.persistence.cassandra.journal.CassandraJournal._ import akka.persistence.cassandra.journal.TimeBucket import akka.persistence.cassandra.formatOffset @@ -17,9 +18,9 @@ import akka.persistence.cassandra.query.TagViewSequenceNumberScanner.Session import akka.stream.Materializer import akka.stream.scaladsl.Source import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row } - -import scala.concurrent.duration.{ Deadline, FiniteDuration } +import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.{ ExecutionContext, Future } + import akka.persistence.cassandra.BucketSize import akka.stream.alpakka.cassandra.scaladsl.CassandraSession @@ -64,7 +65,6 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession scanningPeriod: FiniteDuration, whichToKeep: (TagPidSequenceNr, TagPidSequenceNr) => TagPidSequenceNr) : Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = { - val deadline: Deadline = Deadline.now + scanningPeriod def doIt(): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = { @@ -109,14 +109,13 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession } acc + (pid -> ((newTagPidSequenceNr, newTimestamp))) } - .flatMap { result => - if (deadline.hasTimeLeft()) { - doIt() - } else { - Future.successful(result) - } - } } - doIt() + + if (scanningPeriod > Duration.Zero) { + after(scanningPeriod)(doIt())(materializer.system) + } else { + doIt() + } + } }