Skip to content

Commit

Permalink
feat: delete events (#4)
Browse files Browse the repository at this point in the history
* following same structure as r2dbc plugin, which also has the
  grouping of deletes to a max number of events per statement
* using TransactWriteItems, deletes must be on individual pk
* delete marker to keep highest sequence number if all events have been deleted
* write and read metadata
* use consistent reads
  • Loading branch information
patriknw authored May 28, 2024
1 parent de20b25 commit 56c2aa0
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.Delete
import software.amazon.awssdk.services.dynamodb.model.Put
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest
import software.amazon.awssdk.services.dynamodb.model.QueryRequest
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
import software.amazon.awssdk.services.dynamodb.model.Update

/**
* INTERNAL API
Expand Down Expand Up @@ -62,20 +64,28 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
attributes.put(Pid, AttributeValue.fromS(persistenceId))
attributes.put(SeqNr, AttributeValue.fromN(item.seqNr.toString))
attributes.put(Slice, AttributeValue.fromN(slice.toString))
attributes.put(EntityType, AttributeValue.fromS(item.entityType))
attributes.put(EventSerId, AttributeValue.fromN(item.serId.toString))
attributes.put(EventSerManifest, AttributeValue.fromS(item.serManifest))
attributes.put(EventPayload, AttributeValue.fromB(SdkBytes.fromByteArray(item.payload.get)))
attributes.put(Writer, AttributeValue.fromS(item.writerUuid))

item.metadata.foreach { meta =>
attributes.put(MetaSerId, AttributeValue.fromN(meta.serId.toString))
attributes.put(MetaSerManifest, AttributeValue.fromS(meta.serManifest))
attributes.put(MetaPayload, AttributeValue.fromB(SdkBytes.fromByteArray(meta.payload)))
}

attributes
}

val totalEvents = events.size
if (totalEvents == 1) {
val req = PutItemRequest
.builder()
.tableName(settings.journalTable)
.item(putItemAttributes(events.head))
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.tableName(settings.journalTable)
.build()
val result = client.putItem(req).asScala

Expand All @@ -94,7 +104,7 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
events.map { item =>
TransactWriteItem
.builder()
.put(Put.builder().item(putItemAttributes(item)).tableName(settings.journalTable).build())
.put(Put.builder().tableName(settings.journalTable).item(putItemAttributes(item)).build())
.build()
}.asJava

Expand All @@ -106,8 +116,6 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest

val result = client.transactWriteItems(req).asScala

result.failed.foreach { exc => println(exc) }

if (log.isDebugEnabled()) {
result.foreach { response =>
log.debug(
Expand All @@ -129,6 +137,7 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest

val request = QueryRequest.builder
.tableName(settings.journalTable)
.consistentRead(true)
.keyConditionExpression(s"$Pid = :pid")
.expressionAttributeValues(attributeValues)
.scanIndexForward(false) // get last item (highest sequence nr)
Expand All @@ -147,4 +156,117 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
result
}

def readLowestSequenceNr(persistenceId: String): Future[Long] = {
import JournalAttributes._

val attributeValues = Map(":pid" -> AttributeValue.fromS(persistenceId)).asJava

val request = QueryRequest.builder
.tableName(settings.journalTable)
.consistentRead(true)
.keyConditionExpression(s"$Pid = :pid")
.expressionAttributeValues(attributeValues)
.scanIndexForward(true) // get first item (lowest sequence nr)
.limit(1)
.build()

val result = client.query(request).asScala.map { response =>
response.items().asScala.headOption.fold(0L) { item =>
item.get(SeqNr).n().toLong
}
}

if (log.isDebugEnabled)
result.foreach(seqNr => log.debug("Lowest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr))

result
}

def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = {
import JournalAttributes._

def pk(pid: String, seqNr: Long): JHashMap[String, AttributeValue] = {
val m = new JHashMap[String, AttributeValue]
m.put(Pid, AttributeValue.fromS(pid))
m.put(SeqNr, AttributeValue.fromN(seqNr.toString))
m
}

def deleteBatch(from: Long, to: Long, lastBatch: Boolean): Future[Unit] = {
val result = {
val toSeqNr = if (lastBatch && !resetSequenceNumber) to - 1 else to
val deleteItems =
(from to toSeqNr).map { seqNr =>
TransactWriteItem
.builder()
.delete(Delete.builder().tableName(settings.journalTable).key(pk(persistenceId, seqNr)).build())
.build()
}

val writeItems =
if (lastBatch && !resetSequenceNumber) {
// update last item instead of deleting, keeping it as a tombstone to keep track of latest seqNr even
// though all events have been deleted
val deleteMarker =
TransactWriteItem
.builder()
.update(Update
.builder()
.tableName(settings.journalTable)
.key(pk(persistenceId, to))
.updateExpression(
s"SET $Deleted = :del REMOVE $EventPayload, $EventSerId, $EventSerManifest, $Writer, $MetaPayload, $MetaSerId, $MetaSerManifest")
.expressionAttributeValues(Map(":del" -> AttributeValue.fromBool(true)).asJava)
.build())
.build()
deleteItems :+ deleteMarker
} else
deleteItems

val req = TransactWriteItemsRequest
.builder()
.transactItems(writeItems.asJava)
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

client.transactWriteItems(req).asScala
}

if (log.isDebugEnabled()) {
result.foreach { response =>
log.debug(
"Deleted events from [{}] to [{}] for persistenceId [{}], consumed [{}] WCU",
from,
to,
persistenceId,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
}
}
result.map(_ => ())(ExecutionContexts.parasitic)
}

// TransactWriteItems has a limit of 100
val batchSize = 100

def deleteInBatches(from: Long, maxTo: Long): Future[Unit] = {
if (from + batchSize > maxTo) {
deleteBatch(from, maxTo, lastBatch = true)
} else {
val to = from + batchSize - 1
deleteBatch(from, to, lastBatch = false).flatMap(_ => deleteInBatches(to + 1, maxTo))
}
}

val lowestSequenceNrForDelete = readLowestSequenceNr(persistenceId)
val highestSeqNrForDelete =
if (toSequenceNr == Long.MaxValue) readHighestSequenceNr(persistenceId)
else Future.successful(toSequenceNr)

for {
fromSeqNr <- lowestSequenceNrForDelete
toSeqNr <- highestSeqNrForDelete
_ <- deleteInBatches(fromSeqNr, toSeqNr)
} yield ()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import akka.stream.scaladsl.Source
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
Expand Down Expand Up @@ -41,27 +42,35 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
import JournalAttributes._
val req = QueryRequest.builder
.tableName(settings.journalTable)
.consistentRead(true)
.keyConditionExpression(s"$Pid = :pid AND $SeqNr BETWEEN :from AND :to")
.filterExpression(s"attribute_not_exists($Deleted)")
.expressionAttributeValues(expressionAttributeValues)
.build()

val publisher = client.queryPaginator(req)

Source.fromPublisher(publisher).mapConcat { response =>
response.items().iterator().asScala.map { item =>
// FIXME read all attributes
val metadata = Option(item.get(MetaPayload)).map { metaPayload =>
SerializedEventMetadata(
serId = item.get(MetaSerId).n().toInt,
serManifest = item.get(MetaSerManifest).s(),
payload = metaPayload.b().asByteArray())
}

SerializedJournalItem(
slice = item.get(Slice).n().toInt,
entityType = "",
entityType = item.get(EntityType).s(),
persistenceId = item.get(Pid).s(),
seqNr = item.get(SeqNr).n().toLong,
writeTimestamp = Instant.EPOCH,
payload = Some(item.get(EventPayload).b().asByteArray()),
serId = item.get(EventSerId).n().toInt,
serManifest = item.get(EventSerManifest).s(),
writerUuid = item.get(Writer).s(),
tags = Set.empty,
metadata = None)
tags = Set.empty, // FIXME
metadata = metadata)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@ final case class SerializedEventMetadata(serId: Int, serManifest: String, payloa
val Pid = "pid"
val SeqNr = "seq_nr"
val Slice = "slice"
// redundant to store entity type, but needed for the bySlices GSI
val EntityType = "entity_type"
val EventSerId = "event_ser_id"
val EventSerManifest = "event_ser_manifest"
val EventPayload = "event_payload"
val Writer = "writer"
val MetaSerId = "meta_ser_id"
val MetaSerManifest = "meta_ser_manifest"
val MetaPayload = "meta_payload"
val Deleted = "del"
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
log.debug("asyncDeleteMessagesTo persistenceId [{}], toSequenceNr [{}]", persistenceId, toSequenceNr)
//FIXME journalDao.deleteEventsTo(persistenceId, toSequenceNr, resetSequenceNumber = false)
Future.successful(())
journalDao.deleteEventsTo(persistenceId, toSequenceNr, resetSequenceNumber = false)
}

override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
Expand Down

0 comments on commit 56c2aa0

Please sign in to comment.