Skip to content

Commit

Permalink
chore: More information in query log prefix (#112)
Browse files Browse the repository at this point in the history
* and some more debug logging
  • Loading branch information
patriknw authored Dec 4, 2024
1 parent 75a273e commit 8827aca
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,9 @@ import org.slf4j.Logger

if (state.queryCount != 0 && log.isDebugEnabled())
log.debug(
"{} next query [{}] from slice [{}], between time [{} - {}]. Found [{}] items in previous query.",
"{} next query [{}], between time [{} - {}]. Found [{}] items in previous query.",
logPrefix,
state.queryCount,
slice,
state.latest.timestamp,
toTimestamp,
state.itemCount)
Expand All @@ -168,10 +167,9 @@ import org.slf4j.Logger
} else {
if (log.isDebugEnabled)
log.debug(
"{} query [{}] from slice [{}] completed. Found [{}] items in previous query.",
"{} query [{}] completed. Found [{}] items in previous query.",
logPrefix,
state.queryCount,
slice,
state.itemCount)

state -> None
Expand All @@ -180,12 +178,7 @@ import org.slf4j.Logger

val currentTimestamp = InstantFactory.now()
if (log.isDebugEnabled())
log.debug(
"{} query slice [{}], from time [{}] until now [{}].",
logPrefix,
slice,
initialOffset.timestamp,
currentTimestamp)
log.debug("{} query from time [{}] until now [{}].", logPrefix, initialOffset.timestamp, currentTimestamp)

ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
Expand All @@ -205,7 +198,7 @@ import org.slf4j.Logger
val initialOffset = toTimestampOffset(offset)

if (log.isDebugEnabled())
log.debug("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)
log.debug("{} starting query from time [{}].", logPrefix, initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
if (EnvelopeOrigin.isHeartbeatEvent(envelope)) state
Expand Down Expand Up @@ -256,12 +249,7 @@ import org.slf4j.Logger

if (log.isDebugEnabled)
delay.foreach { d =>
log.debug(
"{} query [{}] from slice [{}] delay next [{}] ms.",
logPrefix,
state.queryCount,
slice,
d.toMillis)
log.debug("{} query [{}] delay next [{}] ms.", logPrefix, state.queryCount, d.toMillis)
}

delay
Expand Down Expand Up @@ -413,7 +401,10 @@ import org.slf4j.Logger
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))

createHeartbeat(timestamp)
val h = createHeartbeat(timestamp)
if (h.isDefined)
log.debug("{} heartbeat timestamp [{}]", logPrefix, timestamp)
h
} else None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
toSequenceNr: Long,
includeDeleted: Boolean): Source[SerializedJournalItem, NotUsed] = {

log.debug("[{}] eventsByPersistenceId from seqNr [{}] to [{}]", persistenceId, fromSequenceNr, toSequenceNr)

queryDao.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, includeDeleted)
}

Expand All @@ -244,7 +246,11 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
offset: Offset): Source[EventEnvelope[Event], NotUsed] = {
val bySliceQueries = (minSlice to maxSlice).map { slice =>
bySlice[Event](entityType, slice)
.currentBySlice("currentEventsBySlices", entityType, slice, sliceStartOffset(slice, offset))
.currentBySlice(
s"[$entityType] currentEventsBySlice [$slice]: ",
entityType,
slice,
sliceStartOffset(slice, offset))
}
require(bySliceQueries.nonEmpty, s"maxSlice [$maxSlice] must be >= minSlice [$minSlice]")

Expand Down Expand Up @@ -286,7 +292,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg

val bySliceQueries = (minSlice to maxSlice).map { slice =>
bySlice[Event](entityType, slice).liveBySlice(
"eventsBySlices",
s"[$entityType] eventsBySlice [$slice]: ",
entityType,
slice,
sliceStartOffset(slice, offset))
Expand Down Expand Up @@ -333,7 +339,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
val timestampOffset = TimestampOffset.toTimestampOffset(sliceStartOffset(slice, offset))

val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, slice, transformSnapshot)
.currentBySlice("currentSnapshotsBySlice", entityType, slice, timestampOffset)
.currentBySlice(s"[$entityType] currentSnapshotsBySlice [$slice]: ", entityType, slice, timestampOffset)

Source.fromGraph(
new StartingFromSnapshotStage[Event](
Expand All @@ -357,7 +363,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
snapshotOffsets.size)

bySlice[Event](entityType, slice).currentBySlice(
"currentEventsBySlice",
s"[$entityType] currentEventsBySlice [$slice]: ",
entityType,
slice,
initOffset,
Expand Down Expand Up @@ -395,7 +401,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
val timestampOffset = TimestampOffset.toTimestampOffset(sliceStartOffset(slice, offset))

val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, slice, transformSnapshot)
.currentBySlice("snapshotsBySlice", entityType, slice, timestampOffset)
.currentBySlice(s"[$entityType] snapshotsBySlice [$slice]: ", entityType, slice, timestampOffset)

Source.fromGraph(
new StartingFromSnapshotStage[Event](
Expand All @@ -419,7 +425,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
snapshotOffsets.size)

bySlice[Event](entityType, slice).liveBySlice(
"eventsBySlice",
s"[$entityType] eventsBySlice [$slice]: ",
entityType,
slice,
initOffset,
Expand Down Expand Up @@ -624,11 +630,18 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg

// EventTimestampQuery
override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = {
queryDao.timestampOfEvent(persistenceId, sequenceNr)
val result = queryDao.timestampOfEvent(persistenceId, sequenceNr)
if (log.isDebugEnabled) {
result.foreach { t =>
log.debug("[{}] timestampOf seqNr [{}] is [{}]", persistenceId, sequenceNr, t)
}
}
result
}

//LoadEventQuery
override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]] = {
log.debug("[{}] loadEnvelope seqNr [{}]", persistenceId, sequenceNr)
queryDao
.loadEvent(persistenceId, sequenceNr, includePayload = true)
.map {
Expand Down

0 comments on commit 8827aca

Please sign in to comment.