Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: call rules during fx quote #361

Open
wants to merge 14 commits into
base: minor/iso
Choose a base branch
from
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ services:

# To use with proxyCache.type === 'redis-cluster'
redis-node-0:
container_name: redis-node-0
<<: *REDIS_NODE
environment:
<<: *REDIS_ENVS
Expand All @@ -140,34 +141,39 @@ services:
- redis-node-4
- redis-node-5
redis-node-1:
container_name: redis-node-1
<<: *REDIS_NODE
environment:
<<: *REDIS_ENVS
REDIS_PORT_NUMBER: 6380
ports:
- "16380:16380"
redis-node-2:
container_name: redis-node-2
<<: *REDIS_NODE
environment:
<<: *REDIS_ENVS
REDIS_PORT_NUMBER: 6381
ports:
- "16381:16381"
redis-node-3:
container_name: redis-node-3
<<: *REDIS_NODE
environment:
<<: *REDIS_ENVS
REDIS_PORT_NUMBER: 6382
ports:
- "16382:16382"
redis-node-4:
container_name: redis-node-4
<<: *REDIS_NODE
environment:
<<: *REDIS_ENVS
REDIS_PORT_NUMBER: 6383
ports:
- "16383:16383"
redis-node-5:
container_name: redis-node-5
<<: *REDIS_NODE
environment:
<<: *REDIS_ENVS
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/QuotingHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class QuotingHandler {

try {
span = await this.createSpan(requestData)
await model.handleFxQuoteRequest(headers, payload, span, originalPayload)
await model.handleFxQuoteRequest(headers, payload, span, originalPayload, this.cache)
this.logger.debug('handlePostFxQuotes is done')
} catch (err) {
this.logger.error('error in handlePostFxQuotes:', err)
Expand Down
80 changes: 80 additions & 0 deletions src/model/executeRules.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
const ErrorHandler = require('@mojaloop/central-services-error-handling')

const rules = require('../../config/rules')
const RulesEngine = require('./rules.js')

module.exports.executeRules = async function executeRules (headers, quoteRequest, originalPayload, payer, payee, operation) {
if (rules.length === 0) {
return await this.handleRuleEvents([], headers, quoteRequest, originalPayload)
}

const facts = {
operation,
payer,
payee,
payload: quoteRequest,
headers
}

const { events } = await RulesEngine.run(rules, facts)

this.writeLog(`Rules engine returned events ${JSON.stringify(events)}`)

return await this.handleRuleEvents(events, headers, quoteRequest, originalPayload)
}

module.exports.handleRuleEvents = async function handleRuleEvents (events, headers, payload, originalPayload) {
const quoteRequest = originalPayload || payload
// todo: pass only originalPayload (added this logic only for passing tests)

// At the time of writing, all events cause the "normal" flow of execution to be interrupted.
// So we'll return false when there have been no events whatsoever.
if (events.length === 0) {
return { terminate: false, quoteRequest, headers }
}

const { INVALID_QUOTE_REQUEST, INTERCEPT_QUOTE } = RulesEngine.events

const unhandledEvents = events.filter(ev => !(ev.type in RulesEngine.events))

if (unhandledEvents.length > 0) {
// The rules configuration contains events not handled in the code
// TODO: validate supplied rules at startup and fail if any invalid rules are discovered.
throw new Error('Unhandled event returned by rules engine')
}

const invalidQuoteRequestEvents = events.filter(ev => ev.type === INVALID_QUOTE_REQUEST)
if (invalidQuoteRequestEvents.length > 0) {
// Use the first event, ignore the others for now. This is ergonomically worse for someone
// developing against this service, as they can't see all reasons their quote was invalid at
// once. But is a valid solution in the short-term.
const { FSPIOPError: code, message } = invalidQuoteRequestEvents[0].params
// Will throw an internal server error if property doesn't exist
throw ErrorHandler.CreateFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes[code],
message, null, headers['fspiop-source'])
}

const interceptQuoteEvents = events.filter(ev => ev.type === INTERCEPT_QUOTE)
if (interceptQuoteEvents.length > 1) {
// TODO: handle priority. Can we stream events?
throw new Error('Multiple intercept quote events received')
}
if (interceptQuoteEvents.length > 0) {
// send the quote request to the recipient in the event
const result = {
terminate: false,
quoteRequest,
headers: {
...headers,
'fspiop-destination': interceptQuoteEvents[0].params.rerouteToFsp
}
}
// if additionalHeaders are present then add the additional non-standard headers (e.g. used by forex)
// Note these headers are not part of the mojaloop specification
if (interceptQuoteEvents[0].params.additionalHeaders) {
result.headers = { ...result.headers, ...interceptQuoteEvents[0].params.additionalHeaders }
result.additionalHeaders = interceptQuoteEvents[0].params.additionalHeaders
}
return result
}
}
31 changes: 29 additions & 2 deletions src/model/fxQuotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const LOCAL_ENUM = require('../lib/enum')
const dto = require('../lib/dto')
const { logger } = require('../lib')
const { httpRequest } = require('../lib/http')
const { getStackOrInspect, generateRequestHeadersForJWS, generateRequestHeaders, getParticipantEndpoint, calculateRequestHash } = require('../lib/util')
const { getStackOrInspect, generateRequestHeadersForJWS, generateRequestHeaders, getParticipantEndpoint, calculateRequestHash, fetchParticipantInfo } = require('../lib/util')
const { RESOURCES, ERROR_MESSAGES } = require('../constants')
const { executeRules, handleRuleEvents } = require('./executeRules')

axios.defaults.headers.common = {}

Expand All @@ -49,6 +50,10 @@ class FxQuotesModel {
})
}

executeRules = executeRules
handleRuleEvents = handleRuleEvents
_fetchParticipantInfo = fetchParticipantInfo

/**
* Validates the fxQuote request object
*
Expand Down Expand Up @@ -161,7 +166,8 @@ class FxQuotesModel {
*
* @returns {undefined}
*/
async handleFxQuoteRequest (headers, fxQuoteRequest, span, originalPayload) {
async handleFxQuoteRequest (headers, fxQuoteRequest, span, originalPayload = fxQuoteRequest, cache) {
// todo: remove default value for originalPayload (added just for passing tests)
const histTimer = Metrics.getHistogram(
'model_fxquote',
'handleFxQuoteRequest - Metrics for fx quote model',
Expand Down Expand Up @@ -227,6 +233,17 @@ class FxQuotesModel {
await txn.commit()
}

const { payer, payee } = await this._fetchParticipantInfo(fspiopSource, fspiopDestination, cache)
this.writeLog(`Got payer ${payer} and payee ${payee}`)

// Run the rules engine. If the user does not want to run the rules engine, they need only to
// supply a rules file containing an empty array.
const handledRuleEvents = await this.executeRules(headers, fxQuoteRequest, originalPayload, payer, payee, 'fxQuoteRequest')

if (handledRuleEvents.terminate) {
return
}

await this.forwardFxQuoteRequest(headers, fxQuoteRequest.conversionRequestId, originalPayload, childSpan)
histTimer({ success: true, queryName: 'handleFxQuoteRequest' })
} catch (err) {
Expand Down Expand Up @@ -815,6 +832,16 @@ class FxQuotesModel {
opts.headers['fspiop-signature'] = jwsSigner.getSignature(opts)
}
}

/**
* Writes a formatted message to the console
*
* @returns {undefined}
*/
// eslint-disable-next-line no-unused-vars
writeLog (message) {
Logger.isDebugEnabled && Logger.debug(`(${this.requestId}) [quotesmodel]: ${message}`)
}
}

module.exports = FxQuotesModel
85 changes: 5 additions & 80 deletions src/model/quotes.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ const util = require('../lib/util')
const { logger } = require('../lib')
const { httpRequest } = require('../lib/http')
const { RESOURCES } = require('../constants')
const rules = require('../../config/rules.json')
const RulesEngine = require('./rules.js')
const { executeRules, handleRuleEvents } = require('./executeRules')

axios.defaults.headers.common = {}

Expand All @@ -73,84 +72,11 @@ class QuotesModel {
})
}

async executeRules (headers, quoteRequest, payer, payee) {
if (rules.length === 0) {
return []
}

const facts = {
payer,
payee,
payload: quoteRequest,
headers
}

const { events } = await RulesEngine.run(rules, facts)
this.log.debug('Rules engine returned events:', { events })

return events
executeRules () {
return executeRules.apply(this, arguments)
}

async handleRuleEvents (events, headers, originalPayload) {
// At the time of writing, all events cause the "normal" flow of execution to be interrupted.
// So we'll return false when there have been no events whatsoever.
if (events.length === 0) {
return {
terminate: false,
quoteRequest: originalPayload,
headers
}
}

const { INVALID_QUOTE_REQUEST, INTERCEPT_QUOTE } = RulesEngine.events

const unhandledEvents = events.filter(ev => !(ev.type in RulesEngine.events))

if (unhandledEvents.length > 0) {
// The rules configuration contains events not handled in the code
// TODO: validate supplied rules at startup and fail if any invalid rules are discovered.
throw new Error('Unhandled event returned by rules engine')
}

const invalidQuoteRequestEvents = events.filter(ev => ev.type === INVALID_QUOTE_REQUEST)
if (invalidQuoteRequestEvents.length > 0) {
// Use the first event, ignore the others for now. This is ergonomically worse for someone
// developing against this service, as they can't see all reasons their quote was invalid at
// once. But is a valid solution in the short-term.
const { FSPIOPError: code, message } = invalidQuoteRequestEvents[0].params
// Will throw an internal server error if property doesn't exist
throw ErrorHandler.CreateFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes[code],
message,
null,
headers['fspiop-source']
)
}

const interceptQuoteEvents = events.filter(ev => ev.type === INTERCEPT_QUOTE)
if (interceptQuoteEvents.length > 1) {
// TODO: handle priority. Can we stream events?
throw new Error('Multiple intercept quote events received')
}
if (interceptQuoteEvents.length > 0) {
// send the quote request to the recipient in the event
const result = {
terminate: false,
quoteRequest: originalPayload,
headers: {
...headers,
'fspiop-destination': interceptQuoteEvents[0].params.rerouteToFsp
}
}
// if additionalHeaders are present then add the additional non-standard headers (e.g. used by forex)
// Note these headers are not part of the mojaloop specification
if (interceptQuoteEvents[0].params.additionalHeaders) {
result.headers = { ...result.headers, ...interceptQuoteEvents[0].params.additionalHeaders }
result.additionalHeaders = interceptQuoteEvents[0].params.additionalHeaders
}
return result
}
}
handleRuleEvents = handleRuleEvents

/**
* Validates the quote request object
Expand Down Expand Up @@ -284,9 +210,8 @@ class QuotesModel {

// Run the rules engine. If the user does not want to run the rules engine, they need only to
// supply a rules file containing an empty array.
const events = await this.executeRules(headers, quoteRequest, payer, payee)
handledRuleEvents = await this.executeRules(headers, quoteRequest, originalPayload, payer, payee, 'quoteRequest')

handledRuleEvents = await this.handleRuleEvents(events, headers, originalPayload)
if (handledRuleEvents.terminate) {
return
}
Expand Down
3 changes: 3 additions & 0 deletions src/model/rules.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const createEngine = () => {
}
}

engine.addOperator('truthy', (factValue, ruleValue) => {
return !!factValue === ruleValue
})
engine.addOperator('notDeepEqual', (factValue, ruleValue) => {
return !deepEqual(factValue, ruleValue)
})
Expand Down
7 changes: 4 additions & 3 deletions test/integration/fxQuotes.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ describe('POST /fxQuotes request tests --> ', () => {
expect(isOk).toBe(true)

response = await getResponseWithRetry()
expect(response.data.history.length).toBe(1)
expect(response.data.history.length).toBe(3) // count 2 extra calls to redbank and pinkbank

// assert that the request was received by the proxy
const request = response.data.history[0]
Expand All @@ -143,7 +143,7 @@ describe('POST /fxQuotes request tests --> ', () => {
counterPartyFsp: payload.conversionTerms.counterPartyFsp,
sourceAmount: payload.conversionTerms.sourceAmount.amount,
sourceCurrency: payload.conversionTerms.sourceAmount.currency,
targetAmount: payload.conversionTerms.targetAmount.amount,
targetAmount: 0,
targetCurrency: payload.conversionTerms.targetAmount.currency,
extensions: expect.anything(),
expirationDate: expect.anything(),
Expand Down Expand Up @@ -194,6 +194,7 @@ describe('POST /fxQuotes request tests --> ', () => {
expect(isOk).toBe(true)

response = await getResponseWithRetry()
if (response.data.history.length !== 1) console.log(response.data.history)
expect(response.data.history.length).toBe(1)

// assert that the callback was received by the payer dfsp
Expand Down Expand Up @@ -266,7 +267,7 @@ describe('POST /fxQuotes request tests --> ', () => {
expect(isOk).toBe(true)

response = await getResponseWithRetry()
expect(response.data.history.length).toBe(1)
expect(response.data.history.length).toBe(2) // count 1 extra call to greenbank

// assert that the request was received by the payee dfsp
const request = response.data.history[0]
Expand Down
3 changes: 3 additions & 0 deletions test/unit/model/fxQuotes.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ describe('FxQuotesModel Tests -->', () => {
describe('handleFxQuoteRequest', () => {
test('should handle fx quote request', async () => {
fxQuotesModel = new FxQuotesModel({ db, requestId, proxyClient, log })
fxQuotesModel._fetchParticipantInfo = jest.fn(() => ({ payer: 'payer', payee: 'payee' }))
jest.spyOn(fxQuotesModel, 'forwardFxQuoteRequest').mockResolvedValue()
jest.spyOn(fxQuotesModel, 'validateFxQuoteRequest')

Expand Down Expand Up @@ -176,6 +177,7 @@ describe('FxQuotesModel Tests -->', () => {

test('should handle fx quote request in persistent mode', async () => {
fxQuotesModel = new FxQuotesModel({ db, requestId, proxyClient, log })
fxQuotesModel._fetchParticipantInfo = jest.fn(() => ({ payer: 'payer', payee: 'payee' }))
fxQuotesModel.envConfig.simpleRoutingMode = false

jest.spyOn(fxQuotesModel, 'checkDuplicateFxQuoteRequest').mockResolvedValue({
Expand Down Expand Up @@ -216,6 +218,7 @@ describe('FxQuotesModel Tests -->', () => {

test('should handle error thrown', async () => {
fxQuotesModel = new FxQuotesModel({ db, requestId, proxyClient, log })
fxQuotesModel._fetchParticipantInfo = jest.fn(() => ({ payer: 'payer', payee: 'payee' }))
jest.spyOn(fxQuotesModel, 'forwardFxQuoteRequest').mockRejectedValue(new Error('Forward Error'))
jest.spyOn(fxQuotesModel, 'validateFxQuoteRequest')
jest.spyOn(fxQuotesModel, 'handleException').mockResolvedValue()
Expand Down
Loading