Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move mount store before relay and rln relay #3257

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 93 additions & 93 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,99 @@ proc setupProtocols(
node.mountMetadata(conf.clusterId).isOkOr:
return err("failed to mount waku metadata protocol: " & error)

# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

if conf.store:
if conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)

let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get())
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)

## For now we always mount the future archive driver but if the legacy one is mounted,
## then the legacy will be in charge of performing the archiving.
## Regarding storage, the only diff between the current/future archive driver and the legacy
## one, is that the legacy stores an extra field: the id (message digest.)

## TODO: remove this "migrate" variable once legacy store is removed
## It is now necessary because sqlite's legacy store has an extra field: storedAt
## This breaks compatibility between store's and legacy store's schemas in sqlite
## So for now, we need to make sure that when legacy store is enabled and we use sqlite
## that we migrate our db according to legacy store's schema to have the extra field

let engineRes = dburl.getDbEngine(conf.storeMessageDbUrl)
if engineRes.isErr():
return err("error getting db engine in setupProtocols: " & engineRes.error)

let engine = engineRes.get()

let migrate =
if engine == "sqlite" and conf.legacyStore:
false
else:
conf.storeMessageDbMigration

let archiveDriverRes = waitFor driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, migrate,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

if conf.legacyStore:
# Store legacy setup
try:
await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2))
except CatchableError:
return
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())

# Store setup
try:
await mountStore(node, node.rateLimitSettings.getSetting(STOREV3))
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNode.error)

mountLegacyStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(
storeNode.value, legacy_common.WakuLegacyStoreCodec
)
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)

if conf.store and conf.storeResume:
node.setupStoreResume()

# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
let numShardsInNetwork = getNumShardsInNetwork(conf)

if conf.numShardsInNetwork == 0:
Expand Down Expand Up @@ -243,12 +335,6 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())

var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

if conf.rlnRelay:
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
Expand All @@ -269,92 +355,6 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

if conf.store:
if conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)

let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get())
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)

## For now we always mount the future archive driver but if the legacy one is mounted,
## then the legacy will be in charge of performing the archiving.
## Regarding storage, the only diff between the current/future archive driver and the legacy
## one, is that the legacy stores an extra field: the id (message digest.)

## TODO: remove this "migrate" variable once legacy store is removed
## It is now necessary because sqlite's legacy store has an extra field: storedAt
## This breaks compatibility between store's and legacy store's schemas in sqlite
## So for now, we need to make sure that when legacy store is enabled and we use sqlite
## that we migrate our db according to legacy store's schema to have the extra field

let engineRes = dburl.getDbEngine(conf.storeMessageDbUrl)
if engineRes.isErr():
return err("error getting db engine in setupProtocols: " & engineRes.error)

let engine = engineRes.get()

let migrate =
if engine == "sqlite" and conf.legacyStore:
false
else:
conf.storeMessageDbMigration

let archiveDriverRes = waitFor driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, migrate,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

if conf.legacyStore:
# Store legacy setup
try:
await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2))
except CatchableError:
return
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())

# Store setup
try:
await mountStore(node, node.rateLimitSettings.getSetting(STOREV3))
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNode.error)

mountLegacyStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(
storeNode.value, legacy_common.WakuLegacyStoreCodec
)
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)

if conf.store and conf.storeResume:
node.setupStoreResume()

# NOTE Must be mounted after relay
if conf.lightpush:
try:
Expand Down
Loading