Skip to content

Commit

Permalink
make scan less aggressive, #847
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 7, 2020
1 parent 546fab2 commit 4e83083
Showing 1 changed file with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import java.util.UUID
import akka.NotUsed
import akka.annotation.InternalApi
import akka.event.Logging
import akka.pattern.after
import akka.persistence.cassandra.journal.CassandraJournal._
import akka.persistence.cassandra.journal.TimeBucket
import akka.persistence.cassandra.formatOffset
import akka.persistence.cassandra.query.TagViewSequenceNumberScanner.Session
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row }

import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future }

import akka.persistence.cassandra.BucketSize
import akka.stream.alpakka.cassandra.scaladsl.CassandraSession

Expand Down Expand Up @@ -64,7 +65,6 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
scanningPeriod: FiniteDuration,
whichToKeep: (TagPidSequenceNr, TagPidSequenceNr) => TagPidSequenceNr)
: Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
val deadline: Deadline = Deadline.now + scanningPeriod

def doIt(): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {

Expand Down Expand Up @@ -109,14 +109,13 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
}
acc + (pid -> ((newTagPidSequenceNr, newTimestamp)))
}
.flatMap { result =>
if (deadline.hasTimeLeft()) {
doIt()
} else {
Future.successful(result)
}
}
}
doIt()

if (scanningPeriod > Duration.Zero) {
after(scanningPeriod)(doIt())(materializer.system)
} else {
doIt()
}

}
}

0 comments on commit 4e83083

Please sign in to comment.