Skip to content

Commit

Permalink
sample: Shopping cart sample from Akka guide (#14)
Browse files Browse the repository at this point in the history
* feat: CreateTables utility

* wrong location of logback-test.xml

* sample: Shopping cart sample from Akka guide

* remove wrong grpc-api dependency

* incompatibility: NoSuchFieldError PickFirstLeafLoadBalancer HEALTH_CONSUMER_LISTENER_ARG_KEY
* should be transitive from akka-grpc

* remove obsolete fixme

* update docker compose in readme

* updated snapshot

* r2dbc reference

Co-authored-by: Peter Vlugter <[email protected]>

* snapshot plugin

* region and credentials environment variables

* kubernetes for analytics-service

* atLeastOnce

---------

Co-authored-by: Peter Vlugter <[email protected]>
  • Loading branch information
patriknw and pvlugter authored Jun 10, 2024
1 parent 26727bb commit 972093b
Show file tree
Hide file tree
Showing 68 changed files with 2,908 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,29 @@

package akka.persistence.dynamodb

import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.jdk.DurationConverters._
import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._

import akka.annotation.InternalStableApi
import akka.actor.typed.ActorSystem
import com.typesafe.config.Config

/**
* INTERNAL API
*/
@InternalStableApi
object DynamoDBSettings {

/**
* Scala API: Load configuration from `akka.persistence.dynamodb`.
*/
def apply(system: ActorSystem[_]): DynamoDBSettings =
apply(system.settings.config.getConfig("akka.persistence.dynamodb"))

/**
* Java API: Load configuration from `akka.persistence.dynamodb`.
*/
def create(system: ActorSystem[_]): DynamoDBSettings =
apply(system)

/**
* Scala API: From custom configuration corresponding to `akka.persistence.dynamodb`.
*/
def apply(config: Config): DynamoDBSettings = {
val journalTable: String = config.getString("journal.table")

Expand All @@ -28,12 +37,14 @@ object DynamoDBSettings {
new DynamoDBSettings(journalTable, snapshotTable, querySettings)
}

/**
* Java API: From custom configuration corresponding to `akka.persistence.dynamodb`.
*/
def create(config: Config): DynamoDBSettings =
apply(config)

}

/**
* INTERNAL API
*/
@InternalStableApi
final class DynamoDBSettings private (
val journalTable: String,
val snapshotTable: String,
Expand All @@ -45,10 +56,6 @@ final class DynamoDBSettings private (
s"DynamoDBSettings($journalTable, $querySettings)"
}

/**
* INTERNAL API
*/
@InternalStableApi
final class QuerySettings(config: Config) {
val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala
val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").toScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
.limit(settings.querySettings.bufferSize)
.build()

// FIXME for backtracking we don't need all attributes, can be filtered with builder.attributesToGet

val publisher = client.queryPaginator(req)

Source.fromPublisher(publisher).mapConcat { response =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import akka.persistence.AtomicWrite
import akka.persistence.Persistence
import akka.persistence.PersistentRepr
import akka.persistence.SerializedEvent
import akka.persistence.dynamodb.ClientProvider
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.dynamodb.internal.JournalDao
import akka.persistence.dynamodb.internal.SerializedEventMetadata
import akka.persistence.dynamodb.internal.SerializedJournalItem
import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
import akka.persistence.dynamodb.util.ClientProvider
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.journal.Tagged
import akka.persistence.query.PersistenceQuery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.persistence.FilteredPayload
import akka.persistence.Persistence
import akka.persistence.dynamodb.ClientProvider
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.dynamodb.internal.BySliceQuery
import akka.persistence.dynamodb.internal.EnvelopeOrigin
import akka.persistence.dynamodb.internal.QueryDao
import akka.persistence.dynamodb.internal.SerializedJournalItem
import akka.persistence.dynamodb.internal.TimestampOffsetBySlice
import akka.persistence.dynamodb.util.ClientProvider
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.TimestampOffset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import akka.annotation.InternalApi
import akka.persistence.SelectedSnapshot
import akka.persistence.SnapshotMetadata
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.dynamodb.ClientProvider
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.dynamodb.internal.QueryDao
import akka.persistence.dynamodb.internal.SerializedJournalItem
import akka.persistence.dynamodb.internal.SerializedSnapshotItem
import akka.persistence.dynamodb.internal.SerializedSnapshotMetadata
import akka.persistence.dynamodb.internal.SnapshotDao
import akka.persistence.dynamodb.util.ClientProvider
import akka.persistence.snapshot.SnapshotStore
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb
package akka.persistence.dynamodb.util

import java.net.URI
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -15,6 +15,7 @@ import akka.actor.CoordinatedShutdown
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.persistence.dynamodb.ClientSettings
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.util.javadsl

import java.util.concurrent.CompletionStage

import scala.jdk.FutureConverters._

import akka.Done
import akka.actor.typed.ActorSystem
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.dynamodb.util.scaladsl
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

object CreateTables {
def createJournalTable(
system: ActorSystem[_],
settings: DynamoDBSettings,
client: DynamoDbAsyncClient,
deleteIfExists: Boolean): CompletionStage[Done] =
scaladsl.CreateTables.createJournalTable(system, settings, client, deleteIfExists).asJava

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.util.scaladsl

import java.util.concurrent.CompletionException

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.FutureConverters._
import scala.util.Failure
import scala.util.Success

import akka.Done
import akka.actor.typed.ActorSystem
import akka.persistence.dynamodb.DynamoDBSettings
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement
import software.amazon.awssdk.services.dynamodb.model.KeyType
import software.amazon.awssdk.services.dynamodb.model.Projection
import software.amazon.awssdk.services.dynamodb.model.ProjectionType
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType

object CreateTables {

def createJournalTable(
system: ActorSystem[_],
settings: DynamoDBSettings,
client: DynamoDbAsyncClient,
deleteIfExists: Boolean): Future[Done] = {
import akka.persistence.dynamodb.internal.JournalAttributes._
implicit val ec: ExecutionContext = system.executionContext

val existingTable =
client.describeTable(DescribeTableRequest.builder().tableName(settings.journalTable).build()).asScala

def create(): Future[Done] = {
val sliceIndex = GlobalSecondaryIndex
.builder()
.indexName(settings.journalBySliceGsi)
.keySchema(
KeySchemaElement.builder().attributeName(EntityTypeSlice).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(Timestamp).keyType(KeyType.RANGE).build())
.projection(
// FIXME we could skip a few attributes
Projection.builder().projectionType(ProjectionType.ALL).build())
// FIXME config
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build())
.build()

val req = CreateTableRequest
.builder()
.tableName(settings.journalTable)
.keySchema(
KeySchemaElement.builder().attributeName(Pid).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(SeqNr).keyType(KeyType.RANGE).build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder().attributeName(SeqNr).attributeType(ScalarAttributeType.N).build(),
AttributeDefinition.builder().attributeName(EntityTypeSlice).attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder().attributeName(Timestamp).attributeType(ScalarAttributeType.N).build())
// FIXME config
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build())
.globalSecondaryIndexes(sliceIndex)
.build()

client.createTable(req).asScala.map(_ => Done)
}

def delete(): Future[Done] = {
val req = DeleteTableRequest.builder().tableName(settings.journalTable).build()
client.deleteTable(req).asScala.map(_ => Done)
}

existingTable.transformWith {
case Success(_) =>
if (deleteIfExists) delete().flatMap(_ => create())
else Future.successful(Done)
case Failure(_: ResourceNotFoundException) => create()
case Failure(exception: CompletionException) =>
exception.getCause match {
case _: ResourceNotFoundException => create()
case failure => Future.failed[Done](failure)
}
case Failure(exc) =>
Future.failed[Done](exc)
}
}

def createSnapshotsTable(
system: ActorSystem[_],
settings: DynamoDBSettings,
client: DynamoDbAsyncClient,
deleteIfExists: Boolean): Future[Done] = {
import akka.persistence.dynamodb.internal.SnapshotAttributes._

implicit val ec: ExecutionContext = system.executionContext

val existingTable =
client.describeTable(DescribeTableRequest.builder().tableName(settings.snapshotTable).build()).asScala

def create(): Future[Done] = {
val request = CreateTableRequest
.builder()
.tableName(settings.snapshotTable)
.keySchema(KeySchemaElement.builder().attributeName(Pid).keyType(KeyType.HASH).build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build())
// FIXME config
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build())
.build()

client.createTable(request).asScala.map(_ => Done)
}

def delete(): Future[Done] = {
val req = DeleteTableRequest.builder().tableName(settings.snapshotTable).build()
client.deleteTable(req).asScala.map(_ => Done)
}

existingTable.transformWith {
case Success(_) =>
if (deleteIfExists) delete().flatMap(_ => create())
else Future.successful(Done)
case Failure(_: ResourceNotFoundException) => create()
case Failure(exception: CompletionException) =>
exception.getCause match {
case _: ResourceNotFoundException => create()
case failure => Future.failed[Done](failure)
}
case Failure(exception) => Future.failed[Done](exception)
}
}

}
Loading

0 comments on commit 972093b

Please sign in to comment.