Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Store timestamp and prepare GSI #7

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,35 @@ package akka.persistence.dynamodb.internal

import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.atomic.AtomicReference

import akka.annotation.InternalApi

/**
* INTERNAL API
*/
@InternalApi private[akka] object InstantFactory {
private val previousNow = new AtomicReference(Instant.EPOCH)

/**
* Current time truncated to microseconds.
* Current time truncated to microseconds. Within this JVM it's guaranteed to be equal to or greater than previous
* invocation of `now`.
*/
def now(): Instant =
Instant.now().truncatedTo(ChronoUnit.MICROS)
def now(): Instant = {
val n = Instant.now().truncatedTo(ChronoUnit.MICROS)
previousNow.updateAndGet { prev =>
if (prev.isAfter(n)) prev.plus(1, ChronoUnit.MICROS)
else n
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure if this is needed or if Instant.now() is safe enough. I wouldn't trust System.currentTimeMillis, which is used by the Instant system clock, but it's only used for the offset baseline and it's always 1024 seconds back in time. Question is if getNanoTimeAdjustment is monotonically increasing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Independent of that, it's now needed.
The timestamp is now increasing by at least 1 micros, which is needed for the sorting of the by slice GSI. seqNr ordering and timestamp ordering should be the same.

The GSI is eventually consistent and may not preserve visibility order, but we handle that in other way. Having it in strict timestamp order will at least make the query return the items in the right order when the GSI has been populated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 Yes, sort keys will need to be unique.

Copy link
Member Author

@patriknw patriknw May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for gsi, actually, for ordinary tables the primary key has to be unique. I'm sure I read that was not the case for gsi. We will not have unique timestamps anyway, because different Akka nodes can create the same timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, makes sense that index primary keys don't need to be unique. Could probably also have a composite sort key of timestamp and seq nr, if that's cleaner than needing the increasing timestamps.

}

def toEpochMicros(instant: Instant): Long =
instant.getEpochSecond * 1_000_000 + (instant.getNano / 1000)

def fromEpochMicros(micros: Long): Instant = {
val epochSeconds = micros / 1_000_000
val nanos = (micros % 1_000_000) * 1000
Instant.ofEpochSecond(epochSeconds, nanos)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.Persistence
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
Expand Down Expand Up @@ -56,15 +57,17 @@ import software.amazon.awssdk.services.dynamodb.model.Update

// it's always the same persistenceId for all events
val persistenceId = events.head.persistenceId
val entityType = PersistenceId.extractEntityType(persistenceId)
val slice = persistenceExt.sliceForPersistenceId(persistenceId)

def putItemAttributes(item: SerializedJournalItem) = {
import JournalAttributes._
val attributes = new JHashMap[String, AttributeValue]
attributes.put(Pid, AttributeValue.fromS(persistenceId))
attributes.put(SeqNr, AttributeValue.fromN(item.seqNr.toString))
attributes.put(Slice, AttributeValue.fromN(slice.toString))
attributes.put(EntityType, AttributeValue.fromS(item.entityType))
attributes.put(EntityTypeSlice, AttributeValue.fromS(s"$entityType-$slice"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler to have a single attribute for GSI partition key. Otherwise we could combine multiple attributes into the partition key if it's useful to have the slice attribute separate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it could only be one attribute for the partition key and one attribute for the sort key? Otherwise it would be better to use a partition key composed of entity type and slice attributes, for the gsi.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought composite keys was a thing. I can take another look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looks like only single attributes are possible and there's no composite key support.

val timestampMicros = InstantFactory.toEpochMicros(item.writeTimestamp)
attributes.put(Timestamp, AttributeValue.fromN(timestampMicros.toString))
attributes.put(EventSerId, AttributeValue.fromN(item.serId.toString))
attributes.put(EventSerManifest, AttributeValue.fromS(item.serManifest))
attributes.put(EventPayload, AttributeValue.fromB(SdkBytes.fromByteArray(item.payload.get)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

package akka.persistence.dynamodb.internal

import java.time.Instant

import scala.jdk.CollectionConverters._

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import akka.stream.scaladsl.Source
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
Expand Down Expand Up @@ -60,11 +57,9 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
}

SerializedJournalItem(
slice = item.get(Slice).n().toInt,
entityType = item.get(EntityType).s(),
persistenceId = item.get(Pid).s(),
seqNr = item.get(SeqNr).n().toLong,
writeTimestamp = Instant.EPOCH,
writeTimestamp = InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong),
payload = Some(item.get(EventPayload).b().asByteArray()),
serId = item.get(EventSerId).n().toInt,
serManifest = item.get(EventSerManifest).s(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import java.time.Instant
import akka.annotation.InternalApi

final case class SerializedJournalItem(
slice: Int,
entityType: String,
persistenceId: String,
seqNr: Long,
writeTimestamp: Instant,
Expand All @@ -30,9 +28,9 @@ final case class SerializedEventMetadata(serId: Int, serManifest: String, payloa
// FIXME should attribute names be shorter?
val Pid = "pid"
val SeqNr = "seq_nr"
val Slice = "slice"
// redundant to store entity type, but needed for the bySlices GSI
val EntityType = "entity_type"
// needed for the bySlices GSI
val EntityTypeSlice = "entity_type_slice"
val Timestamp = "timestamp"
val EventSerId = "event_ser_id"
val EventSerManifest = "event_ser_manifest"
val EventPayload = "event_payload"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
(other.asInstanceOf[AnyRef], Set.empty[String])
}

val entityType = PersistenceId.extractEntityType(pr.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(pr.persistenceId)

val serializedEvent = event match {
case s: SerializedEvent => s // already serialized
case _ =>
Expand All @@ -137,8 +134,6 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
}

SerializedJournalItem(
slice,
entityType,
pr.persistenceId,
pr.sequenceNr,
timestamp,
Expand Down
104 changes: 104 additions & 0 deletions core/src/test/scala/akka/persistence/dynamodb/TestActors.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior

object TestActors {
object Persister {

import akka.persistence.typed.scaladsl.Effect

sealed trait Command
final case class Persist(payload: Any) extends Command
final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done]) extends Command
final case class PersistAll(payloads: List[Any]) extends Command
final case class Ping(replyTo: ActorRef[Done]) extends Command
final case class GetState(replyTo: ActorRef[String]) extends Command
final case class GetSeqNr(replyTo: ActorRef[Long]) extends Command
final case class Stop(replyTo: ActorRef[Done]) extends Command

def apply(pid: String): Behavior[Command] =
apply(PersistenceId.ofUniqueId(pid))

def apply(pid: PersistenceId): Behavior[Command] = {
apply(pid, "", "", Set.empty)
}

def apply(pid: PersistenceId, tags: Set[String]): Behavior[Command] = {
apply(pid, "", "", tags)
}

def apply(
pid: PersistenceId,
journalPluginId: String,
snapshotPluginId: String,
tags: Set[String]): Behavior[Command] = {
Behaviors.setup { context =>
eventSourcedBehavior(pid, context)
.withJournalPluginId(journalPluginId)
.withSnapshotPluginId(snapshotPluginId)
.snapshotWhen { case (_, event, _) =>
event.toString.contains("snap")
}
.withTagger(_ => tags)
}
}

def eventSourcedBehavior(
pid: PersistenceId,
context: ActorContext[Command]): EventSourcedBehavior[Command, Any, String] = {
EventSourcedBehavior[Command, Any, String](
persistenceId = pid,
"",
{ (state, command) =>
command match {
case command: Persist =>
context.log.debugN(
"Persist [{}], pid [{}], seqNr [{}]",
command.payload,
pid.id,
EventSourcedBehavior.lastSequenceNumber(context) + 1)
Effect.persist(command.payload)
case command: PersistWithAck =>
context.log.debugN(
"Persist [{}], pid [{}], seqNr [{}]",
command.payload,
pid.id,
EventSourcedBehavior.lastSequenceNumber(context) + 1)
Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done)
case command: PersistAll =>
if (context.log.isDebugEnabled)
context.log.debugN(
"PersistAll [{}], pid [{}], seqNr [{}]",
command.payloads.mkString(","),
pid.id,
EventSourcedBehavior.lastSequenceNumber(context) + 1)
Effect.persist(command.payloads)
case Ping(replyTo) =>
replyTo ! Done
Effect.none
case GetState(replyTo) =>
replyTo ! state
Effect.none
case GetSeqNr(replyTo) =>
replyTo ! EventSourcedBehavior.lastSequenceNumber(context)
Effect.none
case Stop(replyTo) =>
replyTo ! Done
Effect.stop()
}
},
(state, event) => if (state == "") event.toString else s"$state|$event")
}
}
}
28 changes: 28 additions & 0 deletions core/src/test/scala/akka/persistence/dynamodb/TestData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb

import java.util.concurrent.atomic.AtomicLong

import akka.persistence.typed.PersistenceId

object TestData {
private val start = 0L // could be something more unique, like currentTimeMillis
private val pidCounter = new AtomicLong(start)
private val entityTypeCounter = new AtomicLong(start)
}

trait TestData {
import TestData.pidCounter
import TestData.entityTypeCounter

def nextPid(): String = s"p-${pidCounter.incrementAndGet()}"

def nextEntityType(): String = s"TestEntity-${entityTypeCounter.incrementAndGet()}"

def nextPersistenceId(entityType: String): PersistenceId =
PersistenceId.of(entityType, s"${pidCounter.incrementAndGet()}")

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package akka.persistence.dynamodb

import java.net.URI
import java.util.concurrent.CompletionException

import scala.concurrent.Await
Expand All @@ -21,17 +20,16 @@ import akka.actor.typed.ActorSystem
import akka.persistence.Persistence
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Suite
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement
import software.amazon.awssdk.services.dynamodb.model.KeyType
import software.amazon.awssdk.services.dynamodb.model.Projection
import software.amazon.awssdk.services.dynamodb.model.ProjectionType
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType
Expand All @@ -47,7 +45,7 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>

lazy val persistenceExt: Persistence = Persistence(typedSystem)

private lazy val client = ClientProvider(typedSystem).clientFor(testConfigPath + ".client")
lazy val client: DynamoDbAsyncClient = ClientProvider(typedSystem).clientFor(testConfigPath + ".client")

override protected def beforeAll(): Unit = {
try {
Expand All @@ -67,6 +65,18 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
client.describeTable(DescribeTableRequest.builder().tableName(settings.journalTable).build()).asScala

def create(): Future[Done] = {
val sliceIndex = GlobalSecondaryIndex
.builder()
.indexName(settings.journalTable + "_slice_idx")
.keySchema(
KeySchemaElement.builder().attributeName(EntityTypeSlice).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(Timestamp).keyType(KeyType.RANGE).build())
.projection(
Projection.builder().projectionType(ProjectionType.ALL).build()
) // FIXME we could skip a few attributes
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build())
.build()

val req = CreateTableRequest
.builder()
.tableName(settings.journalTable)
Expand All @@ -75,8 +85,11 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
KeySchemaElement.builder().attributeName(SeqNr).keyType(KeyType.RANGE).build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder().attributeName(SeqNr).attributeType(ScalarAttributeType.N).build())
AttributeDefinition.builder().attributeName(SeqNr).attributeType(ScalarAttributeType.N).build(),
AttributeDefinition.builder().attributeName(EntityTypeSlice).attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder().attributeName(Timestamp).attributeType(ScalarAttributeType.N).build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build())
.globalSecondaryIndexes(sliceIndex)
.build()

client.createTable(req).asScala.map(_ => Done)(typedSystem.executionContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.internal

import java.time.temporal.ChronoUnit

import org.scalatest.TestSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class InstantFactorySpec extends AnyWordSpec with TestSuite with Matchers {
"InstantFactory" should {
"truncate now to micros" in {
InstantFactory.now().getNano % 1000 shouldBe 0
}

"always increasing or equal now" in {
val now1 = InstantFactory.now()
val now2 = InstantFactory.now()
Thread.sleep(1)
val now3 = InstantFactory.now()
now2.isBefore(now1) shouldBe false
now3.isBefore(now2) shouldBe false
now3.isBefore(now1) shouldBe false
}

"convert to/from micros since epoch" in {
import InstantFactory._
val now1 = now()
fromEpochMicros(toEpochMicros(now1)) shouldBe now1
}

"should not overflow micros since epoch" in {
import InstantFactory._
(10 to 1000).foreach { years =>
val future = now().plus(years * 365, ChronoUnit.DAYS)
toEpochMicros(future) shouldBe <(Long.MaxValue)
fromEpochMicros(toEpochMicros(future)) shouldBe future
}
}
}

}
Loading