Skip to content

Commit

Permalink
chore: delivery monitor for store v3 reliability protocol (#2977)
Browse files Browse the repository at this point in the history
- Use of observer observable pattern to inform delivery_monitor about subscription state
- send_monitor becomes a publish observer of lightpush and relay
- deliver monitor add more protection against possible crash and better logs
- creating a separate proc in store client for delivery monitor
Ivansete-status authored Aug 27, 2024
1 parent c3cb06a commit 0f68274
Showing 17 changed files with 679 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS NotDeliveredMessages(
messageHash BLOB PRIMARY KEY,
timestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
meta BLOB,
version INTEGER NOT NULL
);
9 changes: 9 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
@@ -472,6 +472,15 @@ type WakuNodeConf* = object
name: "lightpushnode"
.}: string

## Reliability config
reliabilityEnabled* {.
desc:
"""Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests.
with the drawback of consuming some more bandwitdh.""",
defaultValue: false,
name: "reliability"
.}: bool

## REST HTTP config
rest* {.
desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest"
25 changes: 24 additions & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import
../waku_node,
../node/peer_manager,
../node/health_monitor,
../node/delivery_monitor/delivery_monitor,
../waku_api/message_cache,
../waku_api/rest/server,
../waku_archive,
@@ -51,6 +52,8 @@ type Waku* = object

node*: WakuNode

deliveryMonitor: DeliveryMonitor

restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef

@@ -147,13 +150,29 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error)

let node = nodeRes.get()

var deliveryMonitor: DeliveryMonitor
if conf.reliabilityEnabled:
if conf.storenode == "":
return err("A storenode should be set when reliability mode is on")

let deliveryMonitorRes = DeliveryMonitor.new(
node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient,
node.wakuFilterClient,
)
if deliveryMonitorRes.isErr():
return err("could not create delivery monitor: " & $deliveryMonitorRes.error)
deliveryMonitor = deliveryMonitorRes.get()

var waku = Waku(
version: git_version,
conf: confCopy,
rng: rng,
key: confCopy.nodekey.get(),
node: nodeRes.get(),
node: node,
dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(),
deliveryMonitor: deliveryMonitor,
)

ok(waku)
@@ -237,6 +256,10 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
(await waku.wakuDiscV5.start()).isOkOr:
return err("failed to start waku discovery v5: " & $error)

## Reliability
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()

return ok()

# Waku shutdown
17 changes: 17 additions & 0 deletions waku/node/delivery_monitor/delivery_callback.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import ../../waku_core

type DeliveryDirection* {.pure.} = enum
PUBLISHING
RECEIVING

type DeliverySuccess* {.pure.} = enum
SUCCESSFUL
UNSUCCESSFUL

type DeliveryFeedbackCallback* = proc(
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].}
43 changes: 43 additions & 0 deletions waku/node/delivery_monitor/delivery_monitor.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
## This module helps to ensure the correct transmission and reception of messages

import results
import chronos
import
./recv_monitor,
./send_monitor,
./delivery_callback,
../../waku_core,
../../waku_store/client,
../../waku_relay/protocol,
../../waku_lightpush/client,
../../waku_filter_v2/client

type DeliveryMonitor* = ref object
sendMonitor: SendMonitor
recvMonitor: RecvMonitor

proc new*(
T: type DeliveryMonitor,
storeClient: WakuStoreClient,
wakuRelay: protocol.WakuRelay,
wakuLightpushClient: WakuLightPushClient,
wakuFilterClient: WakuFilterClient,
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryMonitor
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendMonitor to re-publish
let sendMonitor = ?SendMonitor.new(storeClient, wakuRelay, wakuLightpushClient)
let recvMonitor = RecvMonitor.new(storeClient, wakuFilterClient)
return ok(DeliveryMonitor(sendMonitor: sendMonitor, recvMonitor: recvMonitor))

proc startDeliveryMonitor*(self: DeliveryMonitor) =
self.sendMonitor.startSendMonitor()
self.recvMonitor.startRecvMonitor()

proc stopDeliveryMonitor*(self: DeliveryMonitor) {.async.} =
self.sendMonitor.stopSendMonitor()
await self.recvMonitor.stopRecvMonitor()

proc setDeliveryCallback*(self: DeliveryMonitor, deliveryCb: DeliveryFeedbackCallback) =
## The deliveryCb is a proc defined by the api client so that it can get delivery feedback
self.sendMonitor.setDeliveryCallback(deliveryCb)
self.recvMonitor.setDeliveryCallback(deliveryCb)
26 changes: 26 additions & 0 deletions waku/node/delivery_monitor/not_delivered_storage/migrations.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{.push raises: [].}

import std/[tables, strutils, os], results, chronicles
import ../../../common/databases/db_sqlite, ../../../common/databases/common

logScope:
topics = "waku node delivery_monitor"

const TargetSchemaVersion* = 1
# increase this when there is an update in the database schema

template projectRoot(): string =
currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".."

const PeerStoreMigrationPath: string = projectRoot / "migrations" / "sent_msgs"

proc migrate*(db: SqliteDatabase): DatabaseResult[void] =
debug "starting peer store's sqlite database migration for sent messages"

let migrationRes =
migrate(db, TargetSchemaVersion, migrationsScriptsDir = PeerStoreMigrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)

debug "finished peer store's sqlite database migration for sent messages"
ok()
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## This module is aimed to keep track of the sent/published messages that are considered
## not being properly delivered.
##
## The archiving of such messages will happen in a local sqlite database.
##
## In the very first approach, we consider that a message is sent properly is it has been
## received by any store node.
##

import results
import
../../../common/databases/db_sqlite,
../../../waku_core/message/message,
../../../node/delivery_monitor/not_delivered_storage/migrations

const NotDeliveredMessagesDbUrl = "not-delivered-messages.db"

type NotDeliveredStorage* = ref object
database: SqliteDatabase

type TrackedWakuMessage = object
msg: WakuMessage
numTrials: uint
## for statistics purposes. Counts the number of times the node has tried to publish it

proc new*(T: type NotDeliveredStorage): Result[T, string] =
let db = ?SqliteDatabase.new(NotDeliveredMessagesDbUrl)

?migrate(db)

return ok(NotDeliveredStorage(database: db))

proc archiveMessage*(
self: NotDeliveredStorage, msg: WakuMessage
): Result[void, string] =
## Archives a waku message so that we can keep track of it
## even when the app restarts
return ok()
9 changes: 9 additions & 0 deletions waku/node/delivery_monitor/publish_observer.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import chronicles
import ../../waku_core/message/message

type PublishObserver* = ref object of RootObj

method onMessagePublished*(
self: PublishObserver, pubsubTopic: string, message: WakuMessage
) {.base, gcsafe, raises: [].} =
error "onMessagePublished not implemented"
196 changes: 196 additions & 0 deletions waku/node/delivery_monitor/recv_monitor.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
## This module is in charge of taking care of the messages that this node is expecting to
## receive and is backed by store-v3 requests to get an additional degree of certainty
##

import std/[tables, sequtils, sets, options]
import chronos, chronicles, libp2p/utility
import
../../waku_core,
./delivery_callback,
./subscriptions_observer,
../../waku_store/[client, common],
../../waku_filter_v2/client,
../../waku_core/topics

const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries

const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages

const PruneOldMsgsPeriod = chronos.minutes(1)

const DelayExtra* = chronos.seconds(5)
## Additional security time to overlap the missing messages queries

type TupleHashAndMsg = tuple[hash: WakuMessageHash, msg: WakuMessage]

type RecvMessage = object
msgHash: WakuMessageHash
rxTime: Timestamp
## timestamp of the rx message. We will not keep the rx messages forever

type RecvMonitor* = ref object of SubscriptionObserver
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
## Tracks message verification requests and when was the last time a
## pubsub topic was verified for missing messages
## The key contains pubsub-topics

storeClient: WakuStoreClient
deliveryCb: DeliveryFeedbackCallback

recentReceivedMsgs: seq[RecvMessage]

msgCheckerHandler: Future[void] ## allows to stop the msgChecker async task
msgPrunerHandler: Future[void] ## removes too old messages

startTimeToCheck: Timestamp
endTimeToCheck: Timestamp

proc getMissingMsgsFromStore(
self: RecvMonitor, msgHashes: seq[WakuMessageHash]
): Future[Result[seq[TupleHashAndMsg], string]] {.async.} =
let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
StoreQueryRequest(includeData: true, messageHashes: msgHashes)
)
).valueOr:
return err("getMissingMsgsFromStore: " & $error)

let otherwiseMsg = WakuMessage()
## message to be returned if the Option message is none
return ok(
storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otherwiseMsg)))
)

proc performDeliveryFeedback(
self: RecvMonitor,
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].} =
## This procs allows to bring delivery feedback to the API client
## It requires a 'deliveryCb' to be registered beforehand.
if self.deliveryCb.isNil():
error "deliveryCb is nil in performDeliveryFeedback",
success, dir, comment, msg_hash
return

debug "recv monitor performDeliveryFeedback",
success, dir, comment, msg_hash = shortLog(msgHash)
self.deliveryCb(success, dir, comment, msgHash, msg)

proc msgChecker(self: RecvMonitor) {.async.} =
## Continuously checks if a message has been received
while true:
await sleepAsync(StoreCheckPeriod)

self.endTimeToCheck = getNowInNanosecondTime()

var msgHashesInStore = newSeq[WakuMessageHash](0)
for pubsubTopic, cTopics in self.topicsInterest.pairs:
let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(PubsubTopic(pubsubTopic)),
contentTopics: cTopics,
startTime: some(self.startTimeToCheck - DelayExtra.nanos),
endTime: some(self.endTimeToCheck + DelayExtra.nanos),
)
)
).valueOr:
error "msgChecker failed to get remote msgHashes",
pubsubTopic, cTopics, error = $error
continue

msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))

## compare the msgHashes seen from the store vs the ones received directly
let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash)
let missedHashes: seq[WakuMessageHash] =
msgHashesInStore.filterIt(not rxMsgHashes.contains(it))

## Now retrieve the missed WakuMessages
let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes)
if missingMsgsRet.isOk():
## Give feedback so that the api client can perfom any action with the missed messages
for msgTuple in missingMsgsRet.get():
self.performDeliveryFeedback(
DeliverySuccess.UNSUCCESSFUL, RECEIVING, "Missed message", msgTuple.hash,
msgTuple.msg,
)
else:
error "failed to retrieve missing messages: ", error = $missingMsgsRet.error

## update next check times
self.startTimeToCheck = self.endTimeToCheck

method onSubscribe(
self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
debug "onSubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
contentTopicsOfInterest[].add(contentTopics)
do:
self.topicsInterest[pubsubTopic] = contentTopics

method onUnsubscribe(
self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
debug "onUnsubscribe", pubsubTopic, contentTopics

self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
let remainingCTopics =
contentTopicsOfInterest[].filterIt(not contentTopics.contains(it))
contentTopicsOfInterest[] = remainingCTopics

if remainingCTopics.len == 0:
self.topicsInterest.del(pubsubTopic)
do:
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics

proc new*(
T: type RecvMonitor,
storeClient: WakuStoreClient,
wakuFilterClient: WakuFilterClient,
): T =
## The storeClient will help to acquire any possible missed messages

let now = getNowInNanosecondTime()
var recvMonitor = RecvMonitor(storeClient: storeClient, startTimeToCheck: now)

if not wakuFilterClient.isNil():
wakuFilterClient.addSubscrObserver(recvMonitor)

let filterPushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, closure.} =
## Captures all the messages recived through filter

let msgHash = computeMessageHash(pubSubTopic, message)
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
recvMonitor.recentReceivedMsgs.add(rxMsg)

wakuFilterClient.registerPushHandler(filterPushHandler)

return recvMonitor

proc loopPruneOldMessages(self: RecvMonitor) {.async.} =
while true:
let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos
self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime)
await sleepAsync(PruneOldMsgsPeriod)

proc startRecvMonitor*(self: RecvMonitor) =
self.msgCheckerHandler = self.msgChecker()
self.msgPrunerHandler = self.loopPruneOldMessages()

proc stopRecvMonitor*(self: RecvMonitor) {.async.} =
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
if not self.msgPrunerHandler.isNil():
await self.msgPrunerHandler.cancelAndWait()

proc setDeliveryCallback*(self: RecvMonitor, deliveryCb: DeliveryFeedbackCallback) =
self.deliveryCb = deliveryCb
212 changes: 212 additions & 0 deletions waku/node/delivery_monitor/send_monitor.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
## This module reinforces the publish operation with regular store-v3 requests.
##

import std/[sets, sequtils]
import chronos, chronicles, libp2p/utility
import
./delivery_callback,
./publish_observer,
../../waku_core,
./not_delivered_storage/not_delivered_storage,
../../waku_store/[client, common],
../../waku_archive/archive,
../../waku_relay/protocol,
../../waku_lightpush/client

const MaxTimeInCache* = chronos.minutes(1)
## Messages older than this time will get completely forgotten on publication and a
## feedback will be given when that happens

const SendCheckInterval* = chronos.seconds(3)
## Interval at which we check that messages have been properly received by a store node

const MaxMessagesToCheckAtOnce = 100
## Max number of messages to check if they were properly archived by a store node

const ArchiveTime = chronos.seconds(3)
## Estimation of the time we wait until we start confirming that a message has been properly
## received and archived by a store node

type DeliveryInfo = object
pubsubTopic: string
msg: WakuMessage

type SendMonitor* = ref object of PublishObserver
publishedMessages: Table[WakuMessageHash, DeliveryInfo]
## Cache that contains the delivery info per message hash.
## This is needed to make sure the published messages are properly published

msgStoredCheckerHandle: Future[void] ## handle that allows to stop the async task

notDeliveredStorage: NotDeliveredStorage
## NOTE: this is not fully used because that might be tackled by higher abstraction layers

storeClient: WakuStoreClient
deliveryCb: DeliveryFeedbackCallback

wakuRelay: protocol.WakuRelay
wakuLightpushClient: WakuLightPushClient

proc new*(
T: type SendMonitor,
storeClient: WakuStoreClient,
wakuRelay: protocol.WakuRelay,
wakuLightpushClient: WakuLightPushClient,
): Result[T, string] =
if wakuRelay.isNil() and wakuLightpushClient.isNil():
return err(
"Could not create SendMonitor. wakuRelay or wakuLightpushClient should be set"
)

let notDeliveredStorage = ?NotDeliveredStorage.new()

let sendMonitor = SendMonitor(
notDeliveredStorage: notDeliveredStorage,
storeClient: storeClient,
wakuRelay: wakuRelay,
wakuLightpushClient: wakuLightPushClient,
)

if not wakuRelay.isNil():
wakuRelay.addPublishObserver(sendMonitor)

if not wakuLightpushClient.isNil():
wakuLightpushClient.addPublishObserver(sendMonitor)

return ok(sendMonitor)

proc performFeedbackAndCleanup(
self: SendMonitor,
msgsToDiscard: Table[WakuMessageHash, DeliveryInfo],
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
) =
## This procs allows to bring delivery feedback to the API client
## It requires a 'deliveryCb' to be registered beforehand.
if self.deliveryCb.isNil():
error "deliveryCb is nil in performFeedbackAndCleanup",
success, dir, comment, hashes = toSeq(msgsToDiscard.keys).mapIt(shortLog(it))
return

for hash, deliveryInfo in msgsToDiscard:
debug "send monitor performFeedbackAndCleanup",
success, dir, comment, msg_hash = shortLog(hash)

self.deliveryCb(success, dir, comment, hash, deliveryInfo.msg)
self.publishedMessages.del(hash)

proc checkMsgsInStore(
self: SendMonitor, msgsToValidate: Table[WakuMessageHash, DeliveryInfo]
): Future[
Result[
tuple[
publishedCorrectly: Table[WakuMessageHash, DeliveryInfo],
notYetPublished: Table[WakuMessageHash, DeliveryInfo],
],
void,
]
] {.async.} =
let hashesToValidate = toSeq(msgsToValidate.keys)

let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
StoreQueryRequest(includeData: false, messageHashes: hashesToValidate)
)
).valueOr:
error "checkMsgsInStore failed to get remote msgHashes",
hashes = hashesToValidate.mapIt(shortLog(it)), error = $error
return err()

let publishedHashes = storeResp.messages.mapIt(it.messageHash)

var notYetPublished: Table[WakuMessageHash, DeliveryInfo]
var publishedCorrectly: Table[WakuMessageHash, DeliveryInfo]

for msgHash, deliveryInfo in msgsToValidate.pairs:
if publishedHashes.contains(msgHash):
publishedCorrectly[msgHash] = deliveryInfo
self.publishedMessages.del(msgHash) ## we will no longer track that message
else:
notYetPublished[msgHash] = deliveryInfo

return ok((publishedCorrectly: publishedCorrectly, notYetPublished: notYetPublished))

proc processMessages(self: SendMonitor) {.async.} =
var msgsToValidate: Table[WakuMessageHash, DeliveryInfo]
var msgsToDiscard: Table[WakuMessageHash, DeliveryInfo]

let now = getNowInNanosecondTime()
let timeToCheckThreshold = now - ArchiveTime.nanos
let maxLifeTime = now - MaxTimeInCache.nanos

for hash, deliveryInfo in self.publishedMessages.pairs:
if deliveryInfo.msg.timestamp < maxLifeTime:
## message is too old
msgsToDiscard[hash] = deliveryInfo

if deliveryInfo.msg.timestamp < timeToCheckThreshold:
msgsToValidate[hash] = deliveryInfo

## Discard the messages that are too old
self.performFeedbackAndCleanup(
msgsToDiscard, DeliverySuccess.UNSUCCESSFUL, DeliveryDirection.PUBLISHING,
"Could not publish messages. Please try again.",
)

let (publishedCorrectly, notYetPublished) = (
await self.checkMsgsInStore(msgsToValidate)
).valueOr:
return ## the error log is printed in checkMsgsInStore

## Give positive feedback for the correctly published messages
self.performFeedbackAndCleanup(
publishedCorrectly, DeliverySuccess.SUCCESSFUL, DeliveryDirection.PUBLISHING,
"messages published correctly",
)

## Try to publish again
for msgHash, deliveryInfo in notYetPublished.pairs:
let pubsubTopic = deliveryInfo.pubsubTopic
let msg = deliveryInfo.msg
if not self.wakuRelay.isNil():
debug "trying to publish again with wakuRelay", msgHash, pubsubTopic
let ret = await self.wakuRelay.publish(pubsubTopic, msg)
if ret == 0:
error "could not publish with wakuRelay.publish", msgHash, pubsubTopic
continue

if not self.wakuLightpushClient.isNil():
debug "trying to publish again with wakuLightpushClient", msgHash, pubsubTopic
(await self.wakuLightpushClient.publishToAny(pubsubTopic, msg)).isOkOr:
error "could not publish with publishToAny", error = $error
continue

proc checkIfMessagesStored(self: SendMonitor) {.async.} =
## Continuously monitors that the sent messages have been received by a store node
while true:
await self.processMessages()
await sleepAsync(SendCheckInterval)

method onMessagePublished(
self: SendMonitor, pubsubTopic: string, msg: WakuMessage
) {.gcsafe, raises: [].} =
## Implementation of the PublishObserver interface.
##
## When publishing a message either through relay or lightpush, we want to add some extra effort
## to make sure it is received to one store node. Hence, keep track of those published messages.

debug "onMessagePublished"
let msgHash = computeMessageHash(pubSubTopic, msg)

if not self.publishedMessages.hasKey(msgHash):
self.publishedMessages[msgHash] = DeliveryInfo(pubsubTopic: pubsubTopic, msg: msg)

proc startSendMonitor*(self: SendMonitor) =
self.msgStoredCheckerHandle = self.checkIfMessagesStored()

proc stopSendMonitor*(self: SendMonitor) =
self.msgStoredCheckerHandle.cancel()

proc setDeliveryCallback*(self: SendMonitor, deliveryCb: DeliveryFeedbackCallback) =
self.deliveryCb = deliveryCb
13 changes: 13 additions & 0 deletions waku/node/delivery_monitor/subscriptions_observer.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import chronicles

type SubscriptionObserver* = ref object of RootObj

method onSubscribe*(
self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string]
) {.base, gcsafe, raises: [].} =
error "onSubscribe not implemented"

method onUnsubscribe*(
self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
error "onUnsubscribe not implemented"
26 changes: 23 additions & 3 deletions waku/waku_filter_v2/client.nim
Original file line number Diff line number Diff line change
@@ -4,7 +4,13 @@

import std/options, chronicles, chronos, libp2p/protocols/protocol, bearssl/rand
import
../node/peer_manager, ../waku_core, ./common, ./protocol_metrics, ./rpc_codec, ./rpc
../node/peer_manager,
../node/delivery_monitor/subscriptions_observer,
../waku_core,
./common,
./protocol_metrics,
./rpc_codec,
./rpc

logScope:
topics = "waku filter client"
@@ -13,12 +19,16 @@ type WakuFilterClient* = ref object of LPProtocol
rng: ref HmacDrbgContext
peerManager: PeerManager
pushHandlers: seq[FilterPushHandler]
subscrObservers: seq[SubscriptionObserver]

func generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)

proc addSubscrObserver*(wfc: WakuFilterClient, obs: SubscriptionObserver) =
wfc.subscrObservers.add(obs)

proc sendSubscribeRequest(
wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
@@ -113,7 +123,12 @@ proc subscribe*(
requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq
)

return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)

for obs in wfc.subscrObservers:
obs.onSubscribe(pubSubTopic, contentTopicSeq)

return ok()

proc unsubscribe*(
wfc: WakuFilterClient,
@@ -132,7 +147,12 @@ proc unsubscribe*(
requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq
)

return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)

for obs in wfc.subscrObservers:
obs.onUnsubscribe(pubSubTopic, contentTopicSeq)

return ok()

proc unsubscribeAll*(
wfc: WakuFilterClient, servicePeer: RemotePeerInfo
29 changes: 28 additions & 1 deletion waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import std/options, results, chronicles, chronos, metrics, bearssl/rand
import
../node/peer_manager,
../node/delivery_monitor/publish_observer,
../utils/requests,
../waku_core,
./common,
@@ -16,12 +17,16 @@ logScope:
type WakuLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
publishObservers: seq[PublishObserver]

proc new*(
T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
): T =
WakuLightPushClient(peerManager: peerManager, rng: rng)

proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)

proc sendPushRequest(
wl: WakuLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
@@ -67,4 +72,26 @@ proc publish*(
peer: PeerId | RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
return await wl.sendPushRequest(pushRequest, peer)
?await wl.sendPushRequest(pushRequest, peer)

for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)

return ok()

proc publishToAny*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
## This proc is similar to the publish one but in this case
## we don't specify a particular peer and instead we get it from peer manager

let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")

let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)

for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)

return ok()
14 changes: 13 additions & 1 deletion waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
libp2p/switch
import ../waku_core, ./message_id
import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer

logScope:
topics = "waku relay"
@@ -129,6 +129,7 @@ type
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
# a map of validators to error messages to return when validation fails
validatorInserted: Table[PubsubTopic, bool]
publishObservers: seq[PublishObserver]

proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
@@ -297,7 +298,14 @@ proc addValidator*(
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))

proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) =
## Observer when the api client performed a publish operation. This
## is initially aimed for bringing an additional layer of delivery reliability thanks
## to store
w.publishObservers.add(obs)

proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
## Observes when a message is sent/received from the GossipSub PoV
procCall GossipSub(w).addObserver(observer)

method start*(w: WakuRelay) {.async, base.} =
@@ -440,4 +448,8 @@ proc publish*(

let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data)

if relayedPeerCount > 0:
for obs in w.publishObservers:
obs.onMessagePublished(pubSubTopic, message)

return relayedPeerCount
19 changes: 19 additions & 0 deletions waku/waku_store/client.nim
Original file line number Diff line number Diff line change
@@ -59,3 +59,22 @@ proc query*(
return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer))

return await self.sendStoreRequest(request, connection)

proc queryToAny*(
self: WakuStoreClient, request: StoreQueryRequest, peerId = none(PeerId)
): Future[StoreQueryResult] {.async.} =
## This proc is similar to the query one but in this case
## we don't specify a particular peer and instead we get it from peer manager

if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))

let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr:
return err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected"))

let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
waku_store_errors.inc(labelValues = [dialFailure])

return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer))

return await self.sendStoreRequest(request, connection)
3 changes: 0 additions & 3 deletions waku/waku_store/protocol.nim
Original file line number Diff line number Diff line change
@@ -25,9 +25,6 @@ import
logScope:
topics = "waku store"

const MaxMessageTimestampVariance* = getNanoSecondTime(20)
# 20 seconds maximum allowable sender timestamp "drift"

type StoreQueryRequestHandler* =
proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.}

3 changes: 0 additions & 3 deletions waku/waku_store_legacy/protocol.nim
Original file line number Diff line number Diff line change
@@ -26,9 +26,6 @@ import
logScope:
topics = "waku legacy store"

const MaxMessageTimestampVariance* = getNanoSecondTime(20)
# 20 seconds maximum allowable sender timestamp "drift"

type HistoryQueryHandler* =
proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.}

0 comments on commit 0f68274

Please sign in to comment.