diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 852ec460dd..625f1a77b2 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -167,7 +167,99 @@ proc setupProtocols( node.mountMetadata(conf.clusterId).isOkOr: return err("failed to mount waku metadata protocol: " & error) - # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork + var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = + ## Action to be taken when an internal error occurs during the node run. + ## e.g. the connection with the database is lost and not recovered. + error "Unrecoverable error occurred", error = msg + quit(QuitFailure) + + if conf.store: + if conf.legacyStore: + let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new( + conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration, + conf.storeMaxNumDbConnections, onFatalErrorAction, + ) + if archiveDriverRes.isErr(): + return err("failed to setup legacy archive driver: " & archiveDriverRes.error) + + let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get()) + if mountArcRes.isErr(): + return err("failed to mount waku legacy archive protocol: " & mountArcRes.error) + + ## For now we always mount the future archive driver but if the legacy one is mounted, + ## then the legacy will be in charge of performing the archiving. + ## Regarding storage, the only diff between the current/future archive driver and the legacy + ## one, is that the legacy stores an extra field: the id (message digest.) + + ## TODO: remove this "migrate" variable once legacy store is removed + ## It is now necessary because sqlite's legacy store has an extra field: storedAt + ## This breaks compatibility between store's and legacy store's schemas in sqlite + ## So for now, we need to make sure that when legacy store is enabled and we use sqlite + ## that we migrate our db according to legacy store's schema to have the extra field + + let engineRes = dburl.getDbEngine(conf.storeMessageDbUrl) + if engineRes.isErr(): + return err("error getting db engine in setupProtocols: " & engineRes.error) + + let engine = engineRes.get() + + let migrate = + if engine == "sqlite" and conf.legacyStore: + false + else: + conf.storeMessageDbMigration + + let archiveDriverRes = waitFor driver.ArchiveDriver.new( + conf.storeMessageDbUrl, conf.storeMessageDbVacuum, migrate, + conf.storeMaxNumDbConnections, onFatalErrorAction, + ) + if archiveDriverRes.isErr(): + return err("failed to setup archive driver: " & archiveDriverRes.error) + + let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy) + if retPolicyRes.isErr(): + return err("failed to create retention policy: " & retPolicyRes.error) + + let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get()) + if mountArcRes.isErr(): + return err("failed to mount waku archive protocol: " & mountArcRes.error) + + if conf.legacyStore: + # Store legacy setup + try: + await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2)) + except CatchableError: + return + err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg()) + + # Store setup + try: + await mountStore(node, node.rateLimitSettings.getSetting(STOREV3)) + except CatchableError: + return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + + mountStoreClient(node) + if conf.storenode != "": + let storeNode = parsePeerInfo(conf.storenode) + if storeNode.isOk(): + node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec) + else: + return err("failed to set node waku store peer: " & storeNode.error) + + mountLegacyStoreClient(node) + if conf.storenode != "": + let storeNode = parsePeerInfo(conf.storenode) + if storeNode.isOk(): + node.peerManager.addServicePeer( + storeNode.value, legacy_common.WakuLegacyStoreCodec + ) + else: + return err("failed to set node waku legacy store peer: " & storeNode.error) + + if conf.store and conf.storeResume: + node.setupStoreResume() + + # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork let numShardsInNetwork = getNumShardsInNetwork(conf) if conf.numShardsInNetwork == 0: @@ -243,12 +335,6 @@ proc setupProtocols( except CatchableError: return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) - var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = - ## Action to be taken when an internal error occurs during the node run. - ## e.g. the connection with the database is lost and not recovered. - error "Unrecoverable error occurred", error = msg - quit(QuitFailure) - if conf.rlnRelay: let rlnConf = WakuRlnConfig( rlnRelayDynamic: conf.rlnRelayDynamic, @@ -269,92 +355,6 @@ proc setupProtocols( except CatchableError: return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) - if conf.store: - if conf.legacyStore: - let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new( - conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration, - conf.storeMaxNumDbConnections, onFatalErrorAction, - ) - if archiveDriverRes.isErr(): - return err("failed to setup legacy archive driver: " & archiveDriverRes.error) - - let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get()) - if mountArcRes.isErr(): - return err("failed to mount waku legacy archive protocol: " & mountArcRes.error) - - ## For now we always mount the future archive driver but if the legacy one is mounted, - ## then the legacy will be in charge of performing the archiving. - ## Regarding storage, the only diff between the current/future archive driver and the legacy - ## one, is that the legacy stores an extra field: the id (message digest.) - - ## TODO: remove this "migrate" variable once legacy store is removed - ## It is now necessary because sqlite's legacy store has an extra field: storedAt - ## This breaks compatibility between store's and legacy store's schemas in sqlite - ## So for now, we need to make sure that when legacy store is enabled and we use sqlite - ## that we migrate our db according to legacy store's schema to have the extra field - - let engineRes = dburl.getDbEngine(conf.storeMessageDbUrl) - if engineRes.isErr(): - return err("error getting db engine in setupProtocols: " & engineRes.error) - - let engine = engineRes.get() - - let migrate = - if engine == "sqlite" and conf.legacyStore: - false - else: - conf.storeMessageDbMigration - - let archiveDriverRes = waitFor driver.ArchiveDriver.new( - conf.storeMessageDbUrl, conf.storeMessageDbVacuum, migrate, - conf.storeMaxNumDbConnections, onFatalErrorAction, - ) - if archiveDriverRes.isErr(): - return err("failed to setup archive driver: " & archiveDriverRes.error) - - let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy) - if retPolicyRes.isErr(): - return err("failed to create retention policy: " & retPolicyRes.error) - - let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get()) - if mountArcRes.isErr(): - return err("failed to mount waku archive protocol: " & mountArcRes.error) - - if conf.legacyStore: - # Store legacy setup - try: - await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2)) - except CatchableError: - return - err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg()) - - # Store setup - try: - await mountStore(node, node.rateLimitSettings.getSetting(STOREV3)) - except CatchableError: - return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) - - mountStoreClient(node) - if conf.storenode != "": - let storeNode = parsePeerInfo(conf.storenode) - if storeNode.isOk(): - node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec) - else: - return err("failed to set node waku store peer: " & storeNode.error) - - mountLegacyStoreClient(node) - if conf.storenode != "": - let storeNode = parsePeerInfo(conf.storenode) - if storeNode.isOk(): - node.peerManager.addServicePeer( - storeNode.value, legacy_common.WakuLegacyStoreCodec - ) - else: - return err("failed to set node waku legacy store peer: " & storeNode.error) - - if conf.store and conf.storeResume: - node.setupStoreResume() - # NOTE Must be mounted after relay if conf.lightpush: try: