Skip to content

Commit

Permalink
always increasing timestamp by at least 1 micros
Browse files Browse the repository at this point in the history
* needed for sorting in the by slice GSI
  • Loading branch information
patriknw committed May 28, 2024
1 parent 11d7f5f commit 60aafab
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import akka.annotation.InternalApi
def now(): Instant = {
val n = Instant.now().truncatedTo(ChronoUnit.MICROS)
previousNow.updateAndGet { prev =>
if (prev.isAfter(n)) prev.plus(1, ChronoUnit.MICROS)
// monotonically increasing, at least 1 microsecond more than previous timestamp
if (!n.isAfter(prev)) prev.plus(1, ChronoUnit.MICROS)
else n
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package akka.persistence.dynamodb.journal

import java.time.Instant

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -34,7 +32,6 @@ import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.journal.Tagged
import akka.persistence.query.PersistenceQuery
import akka.persistence.typed.PersistenceId
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
Expand Down Expand Up @@ -104,8 +101,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
}

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Instant] = {
val timestamp = InstantFactory.now()
def atomicWrite(atomicWrite: AtomicWrite): Future[Done] = {
val serialized: Try[Seq[SerializedJournalItem]] = Try {
atomicWrite.payload.map { pr =>
val (event, tags) = pr.payload match {
Expand Down Expand Up @@ -133,6 +129,9 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
SerializedEventMetadata(id, metaManifest, serializedMeta)
}

// monotonically increasing, at least 1 microsecond more than previous timestamp
val timestamp = InstantFactory.now()

SerializedJournalItem(
pr.persistenceId,
pr.sequenceNr,
Expand All @@ -148,14 +147,14 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e

serialized match {
case Success(writes) =>
journalDao.writeEvents(writes).map(_ => timestamp)(ExecutionContexts.parasitic)
journalDao.writeEvents(writes)
case Failure(exc) =>
Future.failed(exc)
}
}

val persistenceId = messages.head.persistenceId
val writeResult: Future[Instant] =
val writeResult: Future[Done] =
if (messages.size == 1)
atomicWrite(messages.head)
else {
Expand All @@ -165,8 +164,9 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
atomicWrite(batch)
}

// FiXME pubSub not added yet
val writeAndPublishResult: Future[Done] =
publish(messages, writeResult)
writeResult

writesInProgress.put(persistenceId, writeAndPublishResult)
writeAndPublishResult.onComplete { _ =>
Expand All @@ -175,11 +175,6 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic)
}

private def publish(messages: immutable.Seq[AtomicWrite], timestamp: Future[Instant]): Future[Done] = {
// FiXME pubSub not added yet
Future.successful(Done)
}

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
log.debug("asyncDeleteMessagesTo persistenceId [{}], toSequenceNr [{}]", persistenceId, toSequenceNr)
journalDao.deleteEventsTo(persistenceId, toSequenceNr, resetSequenceNumber = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ class InstantFactorySpec extends AnyWordSpec with TestSuite with Matchers {
InstantFactory.now().getNano % 1000 shouldBe 0
}

"always increasing or equal now" in {
"always increasing now" in {
val now1 = InstantFactory.now()
val now2 = InstantFactory.now()
Thread.sleep(1)
val now3 = InstantFactory.now()
now2.isBefore(now1) shouldBe false
now3.isBefore(now2) shouldBe false
now3.isBefore(now1) shouldBe false
now2.isAfter(now1) shouldBe true
now3.isAfter(now2) shouldBe true
now3.isAfter(now1) shouldBe true
}

"convert to/from micros since epoch" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class PersistTimestampSpec

"Persist timestamp" should {

"be the same for events stored in same transaction" in {
"be increasing for events stored in same transaction" in {
val numberOfEntities = 20
val entityType = nextEntityType()

Expand Down Expand Up @@ -99,14 +99,21 @@ class PersistTimestampSpec

rows.groupBy(_.event).foreach { case (_, rowsByUniqueEvent) =>
withClue(s"pid [${rowsByUniqueEvent.head.pid}]: ") {
rowsByUniqueEvent.map(_.timestamp).toSet shouldBe Set(rowsByUniqueEvent.head.timestamp)
rowsByUniqueEvent.map(_.timestamp).sliding(2).foreach {
case Seq(a, b) => a.isBefore(b) shouldBe true
case _ => ()
}
}
}

val rowOrdering: Ordering[Item] = Ordering.fromLessThan[Item] { (a, b) =>
if (a eq b) false
else if (a.timestamp != b.timestamp) a.timestamp.compareTo(b.timestamp) < 0
else a.seqNr.compareTo(b.seqNr) < 0
else {
if (a.pid == b.pid && a.seqNr != b.seqNr)
throw new IllegalStateException(s"Unexpected same timestamp for several events of [$a.pid]")
a.pid.compareTo(b.pid) < 0
}
}

rows.groupBy(_.pid).foreach { case (_, rowsByPid) =>
Expand Down

0 comments on commit 60aafab

Please sign in to comment.