Skip to content

Commit

Permalink
move mount store before relay and rln relay
Browse files Browse the repository at this point in the history
This is needed to make a quick creation of the messages
table, before the event rln sync kicks off. With that,
we avoid having errors from postgres-exporter (nwaku-compose)
complaining about non-existing messages table.
  • Loading branch information
Ivansete-status committed Jan 27, 2025
1 parent 7f64dc0 commit a964053
Showing 1 changed file with 93 additions and 93 deletions.
186 changes: 93 additions & 93 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,99 @@ proc setupProtocols(
node.mountMetadata(conf.clusterId).isOkOr:
return err("failed to mount waku metadata protocol: " & error)

# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

if conf.store:
if conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)

let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get())
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)

## For now we always mount the future archive driver but if the legacy one is mounted,
## then the legacy will be in charge of performing the archiving.
## Regarding storage, the only diff between the current/future archive driver and the legacy
## one, is that the legacy stores an extra field: the id (message digest.)

## TODO: remove this "migrate" variable once legacy store is removed
## It is now necessary because sqlite's legacy store has an extra field: storedAt
## This breaks compatibility between store's and legacy store's schemas in sqlite
## So for now, we need to make sure that when legacy store is enabled and we use sqlite
## that we migrate our db according to legacy store's schema to have the extra field

let engineRes = dburl.getDbEngine(conf.storeMessageDbUrl)
if engineRes.isErr():
return err("error getting db engine in setupProtocols: " & engineRes.error)

let engine = engineRes.get()

let migrate =
if engine == "sqlite" and conf.legacyStore:
false
else:
conf.storeMessageDbMigration

let archiveDriverRes = waitFor driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, migrate,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

if conf.legacyStore:
# Store legacy setup
try:
await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2))
except CatchableError:
return
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())

# Store setup
try:
await mountStore(node, node.rateLimitSettings.getSetting(STOREV3))
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNode.error)

mountLegacyStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(
storeNode.value, legacy_common.WakuLegacyStoreCodec
)
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)

if conf.store and conf.storeResume:
node.setupStoreResume()

# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
let numShardsInNetwork = getNumShardsInNetwork(conf)

if conf.numShardsInNetwork == 0:
Expand Down Expand Up @@ -243,12 +335,6 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())

var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

if conf.rlnRelay:
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
Expand All @@ -269,92 +355,6 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

if conf.store:
if conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)

let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get())
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)

## For now we always mount the future archive driver but if the legacy one is mounted,
## then the legacy will be in charge of performing the archiving.
## Regarding storage, the only diff between the current/future archive driver and the legacy
## one, is that the legacy stores an extra field: the id (message digest.)

## TODO: remove this "migrate" variable once legacy store is removed
## It is now necessary because sqlite's legacy store has an extra field: storedAt
## This breaks compatibility between store's and legacy store's schemas in sqlite
## So for now, we need to make sure that when legacy store is enabled and we use sqlite
## that we migrate our db according to legacy store's schema to have the extra field

let engineRes = dburl.getDbEngine(conf.storeMessageDbUrl)
if engineRes.isErr():
return err("error getting db engine in setupProtocols: " & engineRes.error)

let engine = engineRes.get()

let migrate =
if engine == "sqlite" and conf.legacyStore:
false
else:
conf.storeMessageDbMigration

let archiveDriverRes = waitFor driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, migrate,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

if conf.legacyStore:
# Store legacy setup
try:
await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2))
except CatchableError:
return
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())

# Store setup
try:
await mountStore(node, node.rateLimitSettings.getSetting(STOREV3))
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNode.error)

mountLegacyStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(
storeNode.value, legacy_common.WakuLegacyStoreCodec
)
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)

if conf.store and conf.storeResume:
node.setupStoreResume()

# NOTE Must be mounted after relay
if conf.lightpush:
try:
Expand Down

0 comments on commit a964053

Please sign in to comment.