Skip to content

Commit

Permalink
feat(csi/643): add fx-notify forwarding (#542)
Browse files Browse the repository at this point in the history
* feat(csi/643): add fx-notify forwarding

* audit

* int tests

* tests

* command
  • Loading branch information
kleyow authored Sep 18, 2024
1 parent 36b6c4e commit 9e75007
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 236 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ jobs:
curl localhost:3000/health && \
# run integration tests
npm run test:xint | tee ./test/results/test-int.log
npm run test:integration | tee ./test/results/test-int.log
environment:
ENDPOINT_URL: http://localhost:4545/notification
- store_artifacts:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ services:

central-ledger:
# image: mojaloop/central-ledger:latest
image: mojaloop/central-ledger:v17.8.0.551-snapshot.16
image: mojaloop/central-ledger:v17.8.0.643-snapshot.0
container_name: ml_central-ledger
command: sh -c "/opt/app/wait4/wait4.js central-ledger && npm run migrate && node src/api/index.js"
links:
Expand Down
381 changes: 168 additions & 213 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.7.3",
"@mojaloop/central-services-shared": "18.8.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/event-sdk": "14.1.1",
"@mojaloop/sdk-standard-components": "18.4.0",
"@mojaloop/sdk-standard-components": "18.4.1",
"@now-ims/hapi-now-auth": "2.1.0",
"axios": "1.7.7",
"blipp": "4.0.2",
Expand Down Expand Up @@ -126,21 +126,21 @@
"jsdoc": "4.0.3",
"leaked-handles": "^5.2.0",
"license-checker": "25.0.1",
"nodemon": "3.1.4",
"nodemon": "3.1.5",
"npm-audit-resolver": "3.0.0-RC.0",
"npm-check-updates": "17.1.1",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
"replace": "^1.2.2",
"rewire": "7.0.0",
"sinon": "18.0.0",
"standard": "17.1.0",
"sinon": "19.0.2",
"standard": "17.1.2",
"standard-version": "^9.5.0",
"supertest": "7.0.0",
"tap-spec": "^5.0.0",
"tap-xunit": "2.4.1",
"tape": "5.8.1",
"tape": "5.9.0",
"tapes": "4.1.0",
"uuid4": "2.0.3"
},
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/notification/dto.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const FX_ACTIONS = [
Action.FX_FULFIL_DUPLICATE,
Action.FX_ABORT_DUPLICATE,
Action.FX_TIMEOUT_RESERVED,
Action.FX_TIMEOUT_RECEIVED
Action.FX_TIMEOUT_RECEIVED,
Action.FX_NOTIFY
]

const getCallbackPayload = (content) => {
Expand Down
33 changes: 33 additions & 0 deletions src/handlers/notification/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,39 @@ const processMessage = async (msg, span) => {
return true
}

if ([Action.FX_NOTIFY].includes(action)) {
if (!isSuccess) {
throw ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR,
'FX_NOTIFY action must be successful'
)
}

const { url: callbackURLTo } = await getEndpointFn(destination, REQUEST_TYPE.PATCH, true)
const endpointTemplate = getEndpointTemplate(REQUEST_TYPE.PATCH)
headers = createCallbackHeaders({ headers: content.headers, httpMethod: PATCH, endpointTemplate })
logger.debug(`Notification::processMessage - Callback.sendRequest({ ${callbackURLTo}, ${PATCH}, ${JSON.stringify(content.headers)}, ${payload}, ${id}, ${source}, ${destination} ${hubNameRegex} })`)
let response = { status: 'unknown' }
const histTimerEndSendRequest = Metrics.getHistogram(
'notification_event_delivery',
'notification_event_delivery - metric for sending notification requests to FSPs',
['success', 'from', 'to', 'dest', 'action', 'status']
).startTimer()

try {
response = await Callback.sendRequest({ url: callbackURLTo, headers, source, destination, method: PATCH, payload, responseType, span, protocolVersions, hubNameRegex })
} catch (err) {
logger.error(err)
histTimerEndSendRequest({ success: false, from: source, dest: destination, action, status: response.status })
histTimerEnd({ success: false, action })
throw err
}
histTimerEndSendRequest({ success: true, from: source, dest: destination, action, status: response.status })
histTimerEnd({ success: true, action })

return true
}

Logger.warn(`Unknown action received from kafka: ${action}`)
histTimerEnd({ success: false, action: 'unknown' })
return false
Expand Down
146 changes: 132 additions & 14 deletions test/integration/handlers/notification/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ const GeneralTopicTemplate = Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_T

const timeoutAttempts = 10
const callbackWaitSeconds = 2
const retryDelay = process?.env?.test_INT_RETRY_DELAY || 2
const retryCount = process?.env?.test_INT_RETRY_COUNT || 40
const retryOpts = {
retries: retryCount,
minTimeout: retryDelay,
maxTimeout: retryDelay
}
const wrapWithRetriesConf = {
remainingRetries: retryOpts?.retries || 10, // default 10
timeout: retryOpts?.maxTimeout || 2 // default 2
}

const getNotificationUrl = process.env.ENDPOINT_URL
const hubNameRegex = HeaderValidation.getHubNameRegex(Config.HUB_NAME)
Expand Down Expand Up @@ -151,9 +162,7 @@ Test('Notification Handler', notificationHandlerTest => {
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.PREPARE,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)
await new Promise(resolve => setTimeout(resolve, 5000)) // wait for RESERVED
const response = await testNotification(messageProtocol, 'post', transferId, kafkaConfig, topicConfig, undefined, undefined, 'dfsp2')
await new Promise(resolve => setTimeout(resolve, 5000)) // wait for RESERVED_FORWARDED

await db.connect({
client: centralLedgerConfig.DATABASE.DIALECT,
connection: {
Expand All @@ -164,14 +173,44 @@ Test('Notification Handler', notificationHandlerTest => {
database: centralLedgerConfig.DATABASE.SCHEMA
}
})

// wait for RESERVED
try {
const stateChange = await db.from('transferStateChange').findOne({ transferId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Transfer state changed to RESERVED_FORWARDED')
} finally {
await db.disconnect()
await wrapWithRetries(async () => {
const stateChange = await db
.from('transferStateChange')
.findOne({ transferId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED) {
throw new Error('Transfer state not changed to RESERVED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED, 'Transfer state changed to RESERVED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}

const response = await testNotification(messageProtocol, 'post', transferId, kafkaConfig, topicConfig, undefined, undefined, 'dfsp2')
test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to Payee')
// wait for RESERVED_FORWARDED
try {
await wrapWithRetries(async () => {
const stateChange = await db
.from('transferStateChange')
.findOne({ transferId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED_FORWARDED) {
throw new Error('Transfer state not changed to RESERVED_FORWARDED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Transfer state changed to RESERVED_FORWARDED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}

await db.disconnect()
test.end()
})

Expand Down Expand Up @@ -241,9 +280,7 @@ Test('Notification Handler', notificationHandlerTest => {
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.PREPARE,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)
await new Promise(resolve => setTimeout(resolve, 10000)) // wait for RESERVED
const response = await testNotification(messageProtocol, 'post', commitRequestId, kafkaConfig, topicConfig, undefined, undefined, 'fxp1')
await new Promise(resolve => setTimeout(resolve, 5000)) // wait for RESERVED_FORWARDED

await db.connect({
client: centralLedgerConfig.DATABASE.DIALECT,
connection: {
Expand All @@ -254,13 +291,44 @@ Test('Notification Handler', notificationHandlerTest => {
database: centralLedgerConfig.DATABASE.SCHEMA
}
})

// wait for RESERVED
try {
const stateChange = await db.from('fxTransferStateChange').findOne({ commitRequestId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Fx Transfer state changed to RESERVED_FORWARDED')
} finally {
await db.disconnect()
await wrapWithRetries(async () => {
const stateChange = await db
.from('fxTransferStateChange')
.findOne({ commitRequestId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED) {
throw new Error('Transfer state not changed to RESERVED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED, 'Transfer state changed to RESERVED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}

const response = await testNotification(messageProtocol, 'post', commitRequestId, kafkaConfig, topicConfig, undefined, undefined, 'fxp1')
test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to FXP')

// wait for RESERVED_FORWARDED
try {
await wrapWithRetries(async () => {
const stateChange = await db
.from('fxTransferStateChange')
.findOne({ commitRequestId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED_FORWARDED) {
throw new Error('Transfer state not changed to RESERVED_FORWARDED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Transfer state changed to RESERVED_FORWARDED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}
await db.disconnect()
test.end()
})

Expand Down Expand Up @@ -1412,8 +1480,58 @@ Test('Notification Handler', notificationHandlerTest => {
test.end()
})

notificationTest.test('consume a FX_NOTIFY message and send PATCH callback to fxp', async test => {
const commitRequestId = Uuid()
const messageProtocol = Fixtures.createMessageProtocol(
EventTypes.NOTIFICATION,
Action.FX_NOTIFY,
{
commitRequestId,
fulfilment: 'uU0nuZNNPgilLlLX2n2r-sSE7-N6U4DukIj3rOLvze1',
completedTimestamp: '2021-05-24T08:38:08.699-04:00'
},
'HUB',
'fxp1'
)
const { kafkaConfig, topicConfig } = Fixtures.createProducerConfig(
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.FULFIL,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)

const response = await testNotification(messageProtocol, 'patch', commitRequestId, kafkaConfig, topicConfig)

test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to FXP')
test.end()
})

notificationTest.test('consume a FX_NOTIFY message and send PATCH callback to proxied fxp', async test => {
await proxy.addDfspIdToProxyMapping('nonExistentFxp', 'proxyFsp') // simulate proxy mapping
const commitRequestId = Uuid()
const messageProtocol = Fixtures.createMessageProtocol(
EventTypes.NOTIFICATION,
Action.FX_NOTIFY,
{
commitRequestId,
fulfilment: 'uU0nuZNNPgilLlLX2n2r-sSE7-N6U4DukIj3rOLvze1',
completedTimestamp: '2021-05-24T08:38:08.699-04:00'
},
'HUB',
'nonExistentFxp'
)
const { kafkaConfig, topicConfig } = Fixtures.createProducerConfig(
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.FULFIL,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)

const response = await testNotification(messageProtocol, 'patch', commitRequestId, kafkaConfig, topicConfig, undefined, undefined, 'proxyFsp')

test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to FXP')
test.end()
})

notificationTest.test('tear down', async test => {
await proxy.disconnect()
await db.disconnect()
try {
await Kafka.Producer.disconnect()
} catch (err) { /* ignore error */ }
Expand Down
13 changes: 13 additions & 0 deletions test/integration/server/transfers/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,5 +240,18 @@ module.exports = [{
failAction: 'error'
}
}
},
{
method: 'PATCH',
path: '/proxyFsp/fxTransfers/{transferId}',
handler: Handler.receiveNotificationPatch,
options: {
id: 'proxyFsp-patch',
tags,
description: 'receive patch notification for proxyFsp',
payload: {
failAction: 'error'
}
}
}
]

0 comments on commit 9e75007

Please sign in to comment.