Skip to content

Commit

Permalink
chore: per limit split of PostgreSQL queries (#3008)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Sep 4, 2024
1 parent 8baf627 commit e1e05af
Showing 1 changed file with 57 additions and 24 deletions.
81 changes: 57 additions & 24 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.push raises: [].}

import
std/[nre, options, sequtils, strutils, strformat, times],
std/[nre, options, sequtils, strutils, strformat, times, sugar],
stew/[byteutils, arrayops],
results,
chronos,
Expand Down Expand Up @@ -128,7 +128,9 @@ const SelectCursorByHashDef =
"""SELECT timestamp FROM messages
WHERE messageHash = $1"""

const DefaultMaxNumConns = 50
const
DefaultMaxNumConns = 50
MaxHashesPerQuery = 100

proc new*(
T: type PostgresDriver,
Expand Down Expand Up @@ -815,38 +817,35 @@ proc getMessagesByMessageHashes(
debug "end of getMessagesByMessageHashes"
return ok(rows)

method getMessages*(
s: PostgresDriver,
includeData = true,
contentTopics = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
proc getMessagesWithinLimits(
self: PostgresDriver,
includeData: bool,
contentTopics: seq[ContentTopic],
pubsubTopic: Option[PubsubTopic],
cursor: Option[ArchiveCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
maxPageSize: uint,
ascendingOrder: bool,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"

const MAX_ALLOWED_HASHES = 100
if hashes.len > MAX_ALLOWED_HASHES:
return err(fmt"can not attend queries with more than {MAX_ALLOWED_HASHES} hashes")
if hashes.len > MaxHashesPerQuery:
return err(fmt"can not attend queries with more than {MaxHashesPerQuery} hashes")

let hexHashes = hashes.mapIt(toHex(it))

if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and
startTime.isNone() and endTime.isNone() and hexHashes.len > 0:
return await s.getMessagesByMessageHashes(
return await self.getMessagesByMessageHashes(
"'" & hexHashes.join("','") & "'", maxPageSize, requestId
)

if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and
startTime.isSome() and endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.
if includeData:
return await s.getMessagesPreparedStmt(
return await self.getMessagesPreparedStmt(
contentTopics.join(","),
PubsubTopic(pubsubTopic.get()),
cursor,
Expand All @@ -858,7 +857,7 @@ method getMessages*(
requestId,
)
else:
return await s.getMessageHashesPreparedStmt(
return await self.getMessageHashesPreparedStmt(
contentTopics.join(","),
PubsubTopic(pubsubTopic.get()),
cursor,
Expand All @@ -872,16 +871,50 @@ method getMessages*(
else:
if includeData:
## We will run atypical query. In this case we don't use prepared statemets
return await s.getMessagesArbitraryQuery(
return await self.getMessagesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder, requestId,
)
else:
return await s.getMessageHashesArbitraryQuery(
return await self.getMessageHashesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder, requestId,
)

method getMessages*(
s: PostgresDriver,
includeData = true,
contentTopics = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"

let rows = collect(newSeq):
for i in countup(0, hashes.len, MaxHashesPerQuery):
let stop = min(i + MaxHashesPerQuery, hashes.len)

let splittedHashes = hashes[i ..< stop]

let subRows =
?await s.getMessagesWithinLimits(
includeData, contentTopics, pubsubTopic, cursor, startTime, endTime,
splittedHashes, maxPageSize, ascendingOrder, requestId,
)

for row in subRows:
row

debug "end of getMessages"

return ok(rows)

proc getStr(
s: PostgresDriver, query: string
): Future[ArchiveDriverResult[string]] {.async.} =
Expand Down

0 comments on commit e1e05af

Please sign in to comment.