Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add extra logging for by-slice queries #114

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import scala.jdk.FutureConverters._
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import akka.stream.Attributes
import akka.stream.scaladsl.Source
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.QueryRequest
import software.amazon.awssdk.services.dynamodb.model.QueryResponse

/**
* INTERNAL API
Expand All @@ -48,6 +51,8 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
s"$bySliceWithMetaProjectionExpression, $EventPayload"
}

private val logging = Logging(system.classicSystem, this.getClass.getName)

def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
Expand Down Expand Up @@ -206,16 +211,37 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest

val publisher = client.queryPaginator(req)

def getTimestamp(item: JMap[String, AttributeValue]): Instant =
InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong)

val logName = s"[$entityType] itemsBySlice [$slice] [${if (backtracking) "backtracking" else "query"}]"

def logQueryResponse: QueryResponse => String = response => {
if (response.hasItems && !response.items.isEmpty) {
val items = response.items
val count = items.size
val first = getTimestamp(items.get(0))
val last = getTimestamp(items.get(items.size - 1))
val scanned = response.scannedCount
val hasMore = response.hasLastEvaluatedKey && !response.lastEvaluatedKey.isEmpty
s"query response page with [$count] events between [$first - $last] (scanned [$scanned], has more [$hasMore])"
} else "empty query response page"
}

Source
.fromPublisher(publisher)
// note that this is not logging each item, only the QueryResponse
.log(logName, logQueryResponse)(logging)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we propagate the logPrefix to the dao so that we try to have the same format, or at least similar prefix format here?

s"[$entityType] eventsBySlice [$slice]: "

Is there some added value of using stream log or shall we just add ordinary logging inside the mapConcat?
For example, stream fail will be logged at error level if we don't adjust the levels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the log prefix to be the same pattern. Yes, using the stream logging intentional here, so we can see the completions in particular. We could adjust failures to be debug as well, if we think error would be a problem.

.withAttributes(Attributes
.logLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.WarningLevel))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting the levels, warning instead of error

.mapConcat(_.items.iterator.asScala)
.take(settings.querySettings.bufferSize)
.map { item =>
if (backtracking) {
SerializedJournalItem(
persistenceId = item.get(Pid).s(),
seqNr = item.get(SeqNr).n().toLong,
writeTimestamp = InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong),
writeTimestamp = getTimestamp(item),
readTimestamp = InstantFactory.now(),
payload = None, // lazy loaded for backtracking
serId = item.get(EventSerId).n().toInt,
Expand Down