diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 68234947fc..4c22521c26 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -85,37 +85,46 @@ proc new*( proc handleMessage*( self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage ) {.async.} = + let msgHash = computeMessageHash(pubsubTopic, msg) + let msgHashHex = msgHash.to0xHex() + + trace "handling message", + msg_hash = msgHashHex, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + msgTimestamp = msg.timestamp + self.validator(msg).isOkOr: waku_archive_errors.inc(labelValues = [error]) trace "invalid message", - msg_hash = computeMessageHash(pubsubTopic, msg).to0xHex(), + msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, error = error return - let msgHash = computeMessageHash(pubsubTopic, msg) let insertStartTime = getTime().toUnixFloat() (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) trace "failed to insert message", - msg_hash = msgHash.to0xHex(), + msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, error = error return + let insertDuration = getTime().toUnixFloat() - insertStartTime + waku_archive_insert_duration_seconds.observe(insertDuration) + trace "message archived", - msg_hash = msgHash.to0xHex(), + msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, - timestamp = msg.timestamp - - let insertDuration = getTime().toUnixFloat() - insertStartTime - waku_archive_insert_duration_seconds.observe(insertDuration) + timestamp = msg.timestamp, + insertDuration = insertDuration proc syncMessageIngress*( self: WakuArchive, @@ -123,27 +132,35 @@ proc syncMessageIngress*( pubsubTopic: PubsubTopic, msg: WakuMessage, ): Future[Result[void, string]] {.async.} = - let insertStartTime = getTime().toUnixFloat() + let msgHashHex = msgHash.to0xHex() + trace "handling message in syncMessageIngress", + msg_hash = msgHashHex, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp + + let insertStartTime = getTime().toUnixFloat() (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) - trace "failed to insert message", - msg_hash = msgHash.toHex(), + trace "failed to insert message in in syncMessageIngress", + msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, error = $error return err(error) - trace "message archived", - msg_hash = msgHash.to0xHex(), - pubsubTopic = pubsubTopic, - contentTopic = msg.contentTopic, - timestamp = msg.timestamp - let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) + trace "message archived in syncMessageIngress", + msg_hash = msgHashHex, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp, + insertDuration = insertDuration + return ok() proc validateTimeRange( diff --git a/waku/waku_archive_legacy/archive.nim b/waku/waku_archive_legacy/archive.nim index 0abcc3068b..5e08aa5f84 100644 --- a/waku/waku_archive_legacy/archive.nim +++ b/waku/waku_archive_legacy/archive.nim @@ -79,10 +79,6 @@ proc new*( proc handleMessage*( self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage ) {.async.} = - self.validator(msg).isOkOr: - waku_legacy_archive_errors.inc(labelValues = [error]) - return - let msgDigest = computeDigest(msg) msgDigestHex = msgDigest.data.to0xHex() @@ -99,26 +95,40 @@ proc handleMessage*( pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, msgTimestamp = msg.timestamp, - usedTimestamp = msgTimestamp, digest = msgDigestHex + self.validator(msg).isOkOr: + waku_legacy_archive_errors.inc(labelValues = [error]) + trace "invalid message", + msg_hash = msgHashHex, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp, + error = error + return + let insertStartTime = getTime().toUnixFloat() (await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr: waku_legacy_archive_errors.inc(labelValues = [insertFailure]) - error "failed to insert message", error = error + error "failed to insert message", + msg_hash = msgHashHex, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp, + error = error return + let insertDuration = getTime().toUnixFloat() - insertStartTime + waku_legacy_archive_insert_duration_seconds.observe(insertDuration) + debug "message archived", msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, msgTimestamp = msg.timestamp, - usedTimestamp = msgTimestamp, - digest = msgDigestHex - - let insertDuration = getTime().toUnixFloat() - insertStartTime - waku_legacy_archive_insert_duration_seconds.observe(insertDuration) + digest = msgDigestHex, + insertDuration = insertDuration proc findMessages*( self: WakuArchive, query: ArchiveQuery