Skip to content

Commit

Permalink
chore: simple PR to enhance postgres and retention policy logs (#2884)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Jul 15, 2024
1 parent 241fb8c commit 71ee42d
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 8 deletions.
6 changes: 2 additions & 4 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ proc handleMessage*(
return

let msgHash = computeMessageHash(pubsubTopic, msg)

let insertStartTime = getTime().toUnixFloat()

(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
Expand All @@ -99,7 +98,7 @@ proc handleMessage*(
timestamp = msg.timestamp,
error = error

notice "message archived",
trace "message archived",
hash_hash = msgHash.to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
Expand Down Expand Up @@ -186,11 +185,10 @@ proc findMessages*(
)

proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
debug "executing message retention policy"

let policy = self.retentionPolicy.get()

while true:
debug "executing message retention policy"
(await policy.execute(self.driver)).isOkOr:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error = error
Expand Down
46 changes: 42 additions & 4 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ proc getPartitionsList(
## Retrieves the seq of partition table names.
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]

debug "beginning getPartitionsList"
var partitions: seq[string]
proc rowCallback(pqResult: ptr PGresult) =
for iRow in 0 ..< pqResult.pqNtuples():
Expand Down Expand Up @@ -404,6 +405,7 @@ proc getMessagesArbitraryQuery(
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.

debug "beginning getMessagesArbitraryQuery"
var query = SelectClause
var statements: seq[string]
var args: seq[string]
Expand Down Expand Up @@ -484,7 +486,9 @@ proc getMessageHashesArbitraryQuery(
.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.

debug "beginning of getMessagesV2ArbitraryQuery"
var query = """SELECT messageHash FROM messages"""

var statements: seq[string]
var args: seq[string]

Expand Down Expand Up @@ -565,6 +569,7 @@ proc getMessagesPreparedStmt(

var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]

debug "beginning of getMessagesPreparedStmt"
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)

Expand Down Expand Up @@ -653,6 +658,7 @@ proc getMessageHashesPreparedStmt(

var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]

debug "beginning of getMessagesV2PreparedStmt"
proc rowCallback(pqResult: ptr PGresult) =
hashCallbackImpl(pqResult, rows)

Expand Down Expand Up @@ -742,6 +748,8 @@ method getMessages*(
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"

let hexHashes = hashes.mapIt(toHex(it))

if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and
Expand Down Expand Up @@ -787,6 +795,8 @@ proc getStr(
): Future[ArchiveDriverResult[string]] {.async.} =
# Performs a query that is expected to return a single string

debug "beginning of getStr"

var ret: string
proc rowCallback(pqResult: ptr PGresult) =
if pqResult.pqnfields() != 1:
Expand All @@ -809,6 +819,7 @@ proc getInt(
): Future[ArchiveDriverResult[int64]] {.async.} =
# Performs a query that is expected to return a single numeric value (int64)

debug "beginning of getInt"
var retInt = 0'i64
let str = (await s.getStr(query)).valueOr:
return err("could not get str in getInt: " & $error)
Expand All @@ -826,6 +837,8 @@ proc getInt(
method getDatabaseSize*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
debug "beginning of getDatabaseSize"

let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
return err("error in getDatabaseSize: " & error)

Expand All @@ -835,6 +848,8 @@ method getDatabaseSize*(
method getMessagesCount*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
debug "beginning of getMessagesCount"

let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
if intRes.isErr():
return err("error in getMessagesCount: " & intRes.error)
Expand All @@ -847,6 +862,8 @@ method getOldestMessageTimestamp*(
## In some cases it could happen that we have
## empty partitions which are older than the current stored rows.
## In those cases we want to consider those older partitions as the oldest considered timestamp.
debug "beginning of getOldestMessageTimestamp"

let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition: " & $error)

Expand All @@ -862,7 +879,9 @@ method getOldestMessageTimestamp*(
method getNewestMessageTimestamp*(
s: PostgresDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
debug "beginning of getNewestMessageTimestamp"
let intRes = await s.getInt("SELECT MAX(timestamp) FROM messages")

if intRes.isErr():
return err("error in getNewestMessageTimestamp: " & intRes.error)

Expand All @@ -871,6 +890,8 @@ method getNewestMessageTimestamp*(
method deleteOldestMessagesNotWithinLimit*(
s: PostgresDriver, limit: int
): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of deleteOldestMessagesNotWithinLimit"

let execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages WHERE messageHash NOT IN
(
Expand All @@ -881,9 +902,12 @@ method deleteOldestMessagesNotWithinLimit*(
if execRes.isErr():
return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)

debug "end of deleteOldestMessagesNotWithinLimit"
return ok()

method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of postgres close"

## Cancel the partition factory loop
s.futLoopPartitionFactory.cancelSoon()

Expand Down Expand Up @@ -929,6 +953,9 @@ proc acquireDatabaseLock*(
## approach is using the "performWriteQueryWithLock" proc. However, we can't use
## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL
## scripts.

debug "beginning of acquireDatabaseLock", lockId

let locked = (
await s.getStr(
fmt"""
Expand All @@ -947,6 +974,7 @@ proc releaseDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
## Release an advisory lock (useful to avoid more than one application running migrations at the same time)
debug "beginning of releaseDatabaseLock", lockId
let unlocked = (
await s.getStr(
fmt"""
Expand Down Expand Up @@ -976,9 +1004,7 @@ const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock"
proc performWriteQueryWithLock*(
self: PostgresDriver, queryToProtect: string
): Future[ArchiveDriverResult[void]] {.async.} =
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it.
## The purpose of this proc is to protect write queries that might be performed simultaneously
## to the same database, from different store nodes.
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it
debug "performWriteQueryWithLock", queryToProtect
let query =
fmt"""
Expand Down Expand Up @@ -1042,6 +1068,7 @@ proc addPartition(
## Creates a partition table that will store the messages that fall in the range
## `startTime` <= timestamp < `startTime + duration`.
## `startTime` is measured in seconds since epoch
debug "beginning of addPartition"

let beginning = startTime
let `end` = partitions_manager.calcEndPartitionTime(startTime)
Expand Down Expand Up @@ -1139,7 +1166,7 @@ proc loopPartitionFactory(
debug "starting loopPartitionFactory"

while true:
trace "Check if we need to create a new partition"
trace "Check if a new partition is needed"

## Let's make the 'partition_manager' aware of the current partitions
(await self.refreshPartitionsInfo()).isOkOr:
Expand Down Expand Up @@ -1184,6 +1211,7 @@ proc getTableSize*(
): Future[ArchiveDriverResult[string]] {.async.} =
## Returns a human-readable representation of the size for the requested table.
## tableName - table of interest.
debug "beginning of getTableSize"

let tableSize = (
await self.getStr(
Expand All @@ -1200,6 +1228,8 @@ proc getTableSize*(
proc removePartition(
self: PostgresDriver, partitionName: string
): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of removePartition", partitionName

var partSize = ""
let partSizeRes = await self.getTableSize(partitionName)
if partSizeRes.isOk():
Expand Down Expand Up @@ -1229,6 +1259,7 @@ proc removePartitionsOlderThan(
## Removes old partitions that don't contain the specified timestamp

let tsInSec = Timestamp(float(tsInNanoSec) / 1_000_000_000)
debug "beginning of removePartitionsOlderThan", tsInSec

var oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition in removePartitionOlderThan: " & $error)
Expand All @@ -1249,6 +1280,7 @@ proc removeOldestPartition(
self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests
): Future[ArchiveDriverResult[void]] {.async.} =
## Indirectly called from the retention policy
debug "beginning of removeOldestPartition"

let oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not remove oldest partition: " & $error)
Expand All @@ -1271,6 +1303,8 @@ proc containsAnyPartition*(self: PostgresDriver): bool =
method decreaseDatabaseSize*(
driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false
): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of decreaseDatabaseSize"

var dbSize = (await driver.getDatabaseSize()).valueOr:
return err("decreaseDatabaseSize failed to get database size: " & $error)

Expand Down Expand Up @@ -1337,6 +1371,8 @@ method existsTable*(
proc getCurrentVersion*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
debug "beginning of getCurrentVersion"

let existsVersionTable = (await s.existsTable("version")).valueOr:
return err("error in getCurrentVersion-existsTable: " & $error)

Expand All @@ -1353,6 +1389,8 @@ method deleteMessagesOlderThanTimestamp*(
): Future[ArchiveDriverResult[void]] {.async.} =
## First of all, let's remove the older partitions so that we can reduce
## the database size.
debug "beginning of deleteMessagesOlderThanTimestamp"

(await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
return err("error while removing older partitions: " & $error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ proc new*(T: type CapacityRetentionPolicy, capacity = DefaultCapacity): T =
method execute*(
p: CapacityRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =
debug "beginning executing message retention policy - capacity"

let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

Expand All @@ -62,4 +64,6 @@ method execute*(
(await driver.deleteOldestMessagesNotWithinLimit(limit = p.capacity + p.deleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)

debug "end executing message retention policy - capacity"

return ok()
3 changes: 3 additions & 0 deletions waku/waku_archive/retention_policy/retention_policy_size.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ proc new*(T: type SizeRetentionPolicy, size = DefaultRetentionSize): T =
method execute*(
p: SizeRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =
debug "beginning of executing message retention policy - size"

(await driver.decreaseDatabaseSize(p.sizeLimit)).isOkOr:
return err("decreaseDatabaseSize failed: " & $error)

debug "end of executing message retention policy - size"
return ok()
2 changes: 2 additions & 0 deletions waku/waku_archive/retention_policy/retention_policy_time.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ method execute*(
p: TimeRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
debug "beginning of executing message retention policy - time"

let omtRes = await driver.getOldestMessageTimestamp()
if omtRes.isErr():
Expand All @@ -34,4 +35,5 @@ method execute*(
if res.isErr():
return err("failed to delete oldest messages: " & res.error)

debug "end of executing message retention policy - time"
return ok()

0 comments on commit 71ee42d

Please sign in to comment.