-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Store timestamp and prepare GSI #7
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ import akka.annotation.InternalApi | |
import akka.dispatch.ExecutionContexts | ||
import akka.persistence.Persistence | ||
import akka.persistence.dynamodb.DynamoDBSettings | ||
import akka.persistence.typed.PersistenceId | ||
import org.slf4j.Logger | ||
import org.slf4j.LoggerFactory | ||
import software.amazon.awssdk.core.SdkBytes | ||
|
@@ -56,15 +57,17 @@ import software.amazon.awssdk.services.dynamodb.model.Update | |
|
||
// it's always the same persistenceId for all events | ||
val persistenceId = events.head.persistenceId | ||
val entityType = PersistenceId.extractEntityType(persistenceId) | ||
val slice = persistenceExt.sliceForPersistenceId(persistenceId) | ||
|
||
def putItemAttributes(item: SerializedJournalItem) = { | ||
import JournalAttributes._ | ||
val attributes = new JHashMap[String, AttributeValue] | ||
attributes.put(Pid, AttributeValue.fromS(persistenceId)) | ||
attributes.put(SeqNr, AttributeValue.fromN(item.seqNr.toString)) | ||
attributes.put(Slice, AttributeValue.fromN(slice.toString)) | ||
attributes.put(EntityType, AttributeValue.fromS(item.entityType)) | ||
attributes.put(EntityTypeSlice, AttributeValue.fromS(s"$entityType-$slice")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simpler to have a single attribute for GSI partition key. Otherwise we could combine multiple attributes into the partition key if it's useful to have the slice attribute separate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought it could only be one attribute for the partition key and one attribute for the sort key? Otherwise it would be better to use a partition key composed of entity type and slice attributes, for the gsi. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought composite keys was a thing. I can take another look. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, looks like only single attributes are possible and there's no composite key support. |
||
val timestampMicros = InstantFactory.toEpochMicros(item.writeTimestamp) | ||
attributes.put(Timestamp, AttributeValue.fromN(timestampMicros.toString)) | ||
attributes.put(EventSerId, AttributeValue.fromN(item.serId.toString)) | ||
attributes.put(EventSerManifest, AttributeValue.fromS(item.serManifest)) | ||
attributes.put(EventPayload, AttributeValue.fromB(SdkBytes.fromByteArray(item.payload.get))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/* | ||
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.persistence.dynamodb | ||
|
||
import akka.Done | ||
import akka.actor.typed.ActorRef | ||
import akka.actor.typed.Behavior | ||
import akka.actor.typed.scaladsl.ActorContext | ||
import akka.actor.typed.scaladsl.Behaviors | ||
import akka.actor.typed.scaladsl.LoggerOps | ||
import akka.persistence.typed.PersistenceId | ||
import akka.persistence.typed.scaladsl.EventSourcedBehavior | ||
|
||
object TestActors { | ||
object Persister { | ||
|
||
import akka.persistence.typed.scaladsl.Effect | ||
|
||
sealed trait Command | ||
final case class Persist(payload: Any) extends Command | ||
final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done]) extends Command | ||
final case class PersistAll(payloads: List[Any]) extends Command | ||
final case class Ping(replyTo: ActorRef[Done]) extends Command | ||
final case class GetState(replyTo: ActorRef[String]) extends Command | ||
final case class GetSeqNr(replyTo: ActorRef[Long]) extends Command | ||
final case class Stop(replyTo: ActorRef[Done]) extends Command | ||
|
||
def apply(pid: String): Behavior[Command] = | ||
apply(PersistenceId.ofUniqueId(pid)) | ||
|
||
def apply(pid: PersistenceId): Behavior[Command] = { | ||
apply(pid, "", "", Set.empty) | ||
} | ||
|
||
def apply(pid: PersistenceId, tags: Set[String]): Behavior[Command] = { | ||
apply(pid, "", "", tags) | ||
} | ||
|
||
def apply( | ||
pid: PersistenceId, | ||
journalPluginId: String, | ||
snapshotPluginId: String, | ||
tags: Set[String]): Behavior[Command] = { | ||
Behaviors.setup { context => | ||
eventSourcedBehavior(pid, context) | ||
.withJournalPluginId(journalPluginId) | ||
.withSnapshotPluginId(snapshotPluginId) | ||
.snapshotWhen { case (_, event, _) => | ||
event.toString.contains("snap") | ||
} | ||
.withTagger(_ => tags) | ||
} | ||
} | ||
|
||
def eventSourcedBehavior( | ||
pid: PersistenceId, | ||
context: ActorContext[Command]): EventSourcedBehavior[Command, Any, String] = { | ||
EventSourcedBehavior[Command, Any, String]( | ||
persistenceId = pid, | ||
"", | ||
{ (state, command) => | ||
command match { | ||
case command: Persist => | ||
context.log.debugN( | ||
"Persist [{}], pid [{}], seqNr [{}]", | ||
command.payload, | ||
pid.id, | ||
EventSourcedBehavior.lastSequenceNumber(context) + 1) | ||
Effect.persist(command.payload) | ||
case command: PersistWithAck => | ||
context.log.debugN( | ||
"Persist [{}], pid [{}], seqNr [{}]", | ||
command.payload, | ||
pid.id, | ||
EventSourcedBehavior.lastSequenceNumber(context) + 1) | ||
Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done) | ||
case command: PersistAll => | ||
if (context.log.isDebugEnabled) | ||
context.log.debugN( | ||
"PersistAll [{}], pid [{}], seqNr [{}]", | ||
command.payloads.mkString(","), | ||
pid.id, | ||
EventSourcedBehavior.lastSequenceNumber(context) + 1) | ||
Effect.persist(command.payloads) | ||
case Ping(replyTo) => | ||
replyTo ! Done | ||
Effect.none | ||
case GetState(replyTo) => | ||
replyTo ! state | ||
Effect.none | ||
case GetSeqNr(replyTo) => | ||
replyTo ! EventSourcedBehavior.lastSequenceNumber(context) | ||
Effect.none | ||
case Stop(replyTo) => | ||
replyTo ! Done | ||
Effect.stop() | ||
} | ||
}, | ||
(state, event) => if (state == "") event.toString else s"$state|$event") | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.persistence.dynamodb | ||
|
||
import java.util.concurrent.atomic.AtomicLong | ||
|
||
import akka.persistence.typed.PersistenceId | ||
|
||
object TestData { | ||
private val start = 0L // could be something more unique, like currentTimeMillis | ||
private val pidCounter = new AtomicLong(start) | ||
private val entityTypeCounter = new AtomicLong(start) | ||
} | ||
|
||
trait TestData { | ||
import TestData.pidCounter | ||
import TestData.entityTypeCounter | ||
|
||
def nextPid(): String = s"p-${pidCounter.incrementAndGet()}" | ||
|
||
def nextEntityType(): String = s"TestEntity-${entityTypeCounter.incrementAndGet()}" | ||
|
||
def nextPersistenceId(entityType: String): PersistenceId = | ||
PersistenceId.of(entityType, s"${pidCounter.incrementAndGet()}") | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.persistence.dynamodb.internal | ||
|
||
import java.time.temporal.ChronoUnit | ||
|
||
import org.scalatest.TestSuite | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatest.wordspec.AnyWordSpec | ||
|
||
class InstantFactorySpec extends AnyWordSpec with TestSuite with Matchers { | ||
"InstantFactory" should { | ||
"truncate now to micros" in { | ||
InstantFactory.now().getNano % 1000 shouldBe 0 | ||
} | ||
|
||
"always increasing or equal now" in { | ||
val now1 = InstantFactory.now() | ||
val now2 = InstantFactory.now() | ||
Thread.sleep(1) | ||
val now3 = InstantFactory.now() | ||
now2.isBefore(now1) shouldBe false | ||
now3.isBefore(now2) shouldBe false | ||
now3.isBefore(now1) shouldBe false | ||
} | ||
|
||
"convert to/from micros since epoch" in { | ||
import InstantFactory._ | ||
val now1 = now() | ||
fromEpochMicros(toEpochMicros(now1)) shouldBe now1 | ||
} | ||
|
||
"should not overflow micros since epoch" in { | ||
import InstantFactory._ | ||
(10 to 1000).foreach { years => | ||
val future = now().plus(years * 365, ChronoUnit.DAYS) | ||
toEpochMicros(future) shouldBe <(Long.MaxValue) | ||
fromEpochMicros(toEpochMicros(future)) shouldBe future | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually not sure if this is needed or if Instant.now() is safe enough. I wouldn't trust System.currentTimeMillis, which is used by the Instant system clock, but it's only used for the offset baseline and it's always 1024 seconds back in time. Question is if getNanoTimeAdjustment is monotonically increasing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Independent of that, it's now needed.
The timestamp is now increasing by at least 1 micros, which is needed for the sorting of the by slice GSI. seqNr ordering and timestamp ordering should be the same.
The GSI is eventually consistent and may not preserve visibility order, but we handle that in other way. Having it in strict timestamp order will at least make the query return the items in the right order when the GSI has been populated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼 Yes, sort keys will need to be unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not for gsi, actually, for ordinary tables the primary key has to be unique. I'm sure I read that was not the case for gsi. We will not have unique timestamps anyway, because different Akka nodes can create the same timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, makes sense that index primary keys don't need to be unique. Could probably also have a composite sort key of timestamp and seq nr, if that's cleaner than needing the increasing timestamps.