Skip to content

Commit

Permalink
fix(runtime): fix change stream concurrency reconnect (labring#1480)
Browse files Browse the repository at this point in the history
* fix: fix change stream concurrency reconnect

* fix: format style

* fix(runtime): unbind mongo change stream event

* chore

---------

Co-authored-by: 0fatal <[email protected]>
  • Loading branch information
skyoct and 0fatal authored Aug 24, 2023
1 parent d7594af commit 2ef4873
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 26 deletions.
4 changes: 4 additions & 0 deletions runtimes/nodejs/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,8 @@ export default class Config {
static get LOG_SERVER_TOKEN(): string {
return process.env.LOG_SERVER_TOKEN || ''
}

static get CHANGE_STREAM_RECONNECT_INTERVAL(): number {
return (process.env.CHANGE_STREAM_RECONNECT_INTERVAL || 3000) as number
}
}
29 changes: 17 additions & 12 deletions runtimes/nodejs/src/support/db-change-stream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import Config from '../config'
import { CONFIG_COLLECTION } from '../constants'
import { DatabaseAgent } from '../db'

import { logger } from './logger'

export class DatabaseChangeStream {
static async initialize() {
this.watchConf()
DatabaseChangeStream.watchConf()
}

/**
Expand All @@ -15,21 +16,25 @@ export class DatabaseChangeStream {
*/
static async watchConf() {
logger.info('Listening for changes in conf collection...')
this.updateEnvironments()
DatabaseChangeStream.updateEnvironments()

const stream = DatabaseAgent.db.collection(CONFIG_COLLECTION).watch()

stream.on('change', async (_change) => {
this.updateEnvironments()
})
const changeEvent = async (_change) => {
DatabaseChangeStream.updateEnvironments()
}

stream.on('change', changeEvent)

// stream.on('close', () => {
// logger.info('Conf collection change stream closed.')
// setTimeout(() => {
// logger.info('Reconnecting conf collection change stream...')
// DatabaseChangeStream.watchConf()
// }, 3000)
// })
stream.once('close', () => {
stream.off('change', changeEvent)
logger.error('Conf collection change stream closed.')

setTimeout(() => {
logger.info('Reconnecting conf collection change stream...')
DatabaseChangeStream.watchConf()
}, Config.CHANGE_STREAM_RECONNECT_INTERVAL)
})
}

private static async updateEnvironments() {
Expand Down
44 changes: 30 additions & 14 deletions runtimes/nodejs/src/support/function-engine/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { FunctionRequire } from './require'
import { logger } from '../logger'
import assert from 'assert'
import { InitHook } from '../init-hook'
import Config from '../../config'

export class FunctionCache {
private static cache: Map<string, ICloudFunctionData> = new Map()
Expand All @@ -20,12 +21,11 @@ export class FunctionCache {
FunctionCache.cache.set(func.name, func)
}

this.streamChange()
FunctionCache.streamChange()
logger.info('Function cache initialized.')

// invoke init function
InitHook.invoke()

}

/**
Expand All @@ -40,7 +40,11 @@ export class FunctionCache {
`require cloud function failed: function ${moduleName} not found`,
)
const funcRequire = new FunctionRequire(this.requireFunc, fromModules)
const module = funcRequire.load(func.name, func.source.compiled, fromModules)
const module = funcRequire.load(
func.name,
func.source.compiled,
fromModules,
)
return module
}

Expand All @@ -54,7 +58,8 @@ export class FunctionCache {
const stream = DatabaseAgent.db
.collection(CLOUD_FUNCTION_COLLECTION)
.watch()
stream.on('change', async (change) => {

const changeEvent = async (change) => {
if (change.operationType === 'insert') {
const func = await DatabaseAgent.db
.collection<ICloudFunctionData>(CLOUD_FUNCTION_COLLECTION)
Expand All @@ -70,15 +75,19 @@ export class FunctionCache {
}
}
}
})
}

stream.on('change', changeEvent)

// stream.on('close', () => {
// logger.error('Cloud function change stream closed')
// setTimeout(() => {
// logger.info('Reconnecting cloud function change stream...')
// FunctionCache.streamChange()
// }, 3000)
// })
stream.once('close', () => {
logger.error('Cloud function change stream closed...')
stream.off('change', changeEvent)

setTimeout(() => {
logger.info('Reconnecting cloud function change stream......')
FunctionCache.streamChange()
}, Config.CHANGE_STREAM_RECONNECT_INTERVAL)
})
}

/**
Expand All @@ -87,7 +96,10 @@ export class FunctionCache {
* @param module the module id. ex. `path`, `lodash`
* @returns
*/
static requireFunc: RequireFuncType = (module: string, fromModules?: string[]): any => {
static requireFunc: RequireFuncType = (
module: string,
fromModules?: string[],
): any => {
if (module === '@/cloud-sdk') {
return require('@lafjs/cloud')
}
Expand All @@ -97,7 +109,11 @@ export class FunctionCache {
// check circular dependency
const index = fromModules?.indexOf(cloudModule)
if (index !== -1) {
throw new Error(`Circular dependency detected: ${fromModules.slice(index).join(' -> ')} -> ${cloudModule}`)
throw new Error(
`Circular dependency detected: ${fromModules
.slice(index)
.join(' -> ')} -> ${cloudModule}`,
)
}

return FunctionCache.requireCloudFunction(cloudModule, fromModules)
Expand Down

0 comments on commit 2ef4873

Please sign in to comment.