Skip to content

Commit

Permalink
can assume order in records
Browse files Browse the repository at this point in the history
  • Loading branch information
leviramsey committed Nov 25, 2024
1 parent 16810fb commit 666478c
Showing 1 changed file with 10 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,22 +403,20 @@ private[projection] class DynamoDBOffsetStore(
storeSequenceNumbers: IndexedSeq[Record] => Future[Done],
canBeConcurrent: Boolean): Future[Done] = {
load(records.map(_.pid)).flatMap { oldState =>
val filteredRecords = {
val filteredRecords =
if (records.size <= 1)
records.filterNot(oldState.isDuplicate)
else {
// use last record for each pid
val grouped = records.groupBy(_.pid)

if (grouped.size == records.size)
records.filterNot(oldState.isDuplicate)
else
grouped.valuesIterator
.map(_.maxBy(_.seqNr))
.filterNot(oldState.isDuplicate)
.toVector
// Can assume (given other projection guarantees) that records for the same pid
// have montonically increasing sequence numbers
records.groupBy(_.pid)
.valuesIterator
.collect {
case recordsByPid if !oldState.isDuplicate(recordsByPid.last) => recordsByPid.last
}
.toVector
}
}

if (filteredRecords.isEmpty) {
FutureDone
} else {
Expand Down

0 comments on commit 666478c

Please sign in to comment.