Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/#3026): apply back-pressure to outgoing /bulkTransaction requests from SDK-Scheme-Adapter to Switch #426

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ services:
- redis
- ml-testing-toolkit
- kafka
command: yarn nx run modules-api-svc:start
command: yarn workspace @mojaloop/sdk-scheme-adapter-api-svc run start:debug
# Useful for debugging
# command: sleep infinity
volumes:
- ./docker/wait4:/tmp/wait4
- ./secrets:/opt/app/secrets
Expand Down
11 changes: 10 additions & 1 deletion modules/api-svc/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

const fs = require('fs');
require('dotenv').config();
const { from } = require('env-var');
const { from, accessors } = require('env-var');
const yaml = require('js-yaml');

function getFileContent (path) {
Expand Down Expand Up @@ -49,11 +49,17 @@ function parseResourceVersions (resourceString) {
return getVersionFromConfig(noSpResources);
}

function praseIntOrNull (numAsString) {
if (numAsString) return accessors.asIntPositive(numAsString);
return null;
}

const env = from(process.env, {
asFileContent: (path) => getFileContent(path),
asFileListContent: (pathList) => pathList.split(',').map((path) => getFileContent(path)),
asYamlConfig: (path) => yaml.load(getFileContent(path)),
asResourceVersions: (resourceString) => parseResourceVersions(resourceString),
asIntOrNull: (num) => praseIntOrNull(num)
});

module.exports = {
Expand Down Expand Up @@ -96,6 +102,8 @@ module.exports = {
groupId: env.get('BACKEND_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_api_svc_backend_group').asString(),
clientId: env.get('BACKEND_EVENT_CONSUMER_CLIENT_ID').default('backend_consumer_client_id').asString(),
topics: env.get('BACKEND_EVENT_CONSUMER_TOPICS').default('topic-sdk-outbound-domain-events').asArray(),
consumeMessageNum: env.get('BACKEND_EVENT_CONSUMER_MESSAGE_NUM').asIntOrNull(),

},
domainEventProducer:{
brokerList: env.get('BACKEND_EVENT_PRODUCER_BROKER_LIST').default('localhost:9092').asString(),
Expand All @@ -110,6 +118,7 @@ module.exports = {
groupId: env.get('FSPIOP_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_api_svc_fspiop_group').asString(),
clientId: env.get('FSPIOP_EVENT_CONSUMER_CLIENT_ID').default('fspiop_consumer_client_id').asString(),
topics: env.get('FSPIOP_EVENT_CONSUMER_TOPICS').default('topic-sdk-outbound-domain-events').asArray(),
consumeMessageNum: env.get('FSPIOP_EVENT_CONSUMER_MESSAGE_NUM').asIntOrNull(),
},
domainEventProducer:{
brokerList: env.get('FSPIOP_EVENT_PRODUCER_BROKER_LIST').default('localhost:9092').asString(),
Expand Down
2 changes: 1 addition & 1 deletion modules/api-svc/src/lib/model/Async2SyncModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ function generate({
const { requests, config } = this.handlersContext;
logger.push({ args }).log('onRequestAction - arguments');

return deferredJob(cache, channelNameMethod(args))
return deferredJob(cache, channelNameMethod(args)) // TODO: timeout is not be configured here, and thus uses the default!
.init(async (channel) => {
const res = await requestActionMethod(requests, args);
logger.push({ res, channel, args }).log('RequestAction call sent to peer, listening on response');
Expand Down
11 changes: 7 additions & 4 deletions modules/api-svc/test/config/integration.env
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ MGMT_API_WS_PORT=4005
# when running the scheme-adapter as a mojaloop connector component within Payment Manager for Mojaloop.
PM4ML_ENABLED=false

BACKEND_EVENT_CONSUMER_BROKER_LIST=kafka:9092
BACKEND_EVENT_PRODUCER_BROKER_LIST=kafka:9092
FSPIOP_EVENT_CONSUMER_BROKER_LIST=kafka:9092
FSPIOP_EVENT_PRODUCER_BROKER_LIST=kafka:9092
BACKEND_EVENT_CONSUMER_BROKER_LIST=kafka:9093
BACKEND_EVENT_PRODUCER_BROKER_LIST=kafka:9093
FSPIOP_EVENT_CONSUMER_BROKER_LIST=kafka:9093
FSPIOP_EVENT_PRODUCER_BROKER_LIST=kafka:9093

BACKEND_EVENT_CONSUMER_MESSAGE_NUM=1
FSPIOP_EVENT_CONSUMER_MESSAGE_NUM=1
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"groupId": "command_events_consumer_group",
"clientId": "command_events_consumer_client_id",
"topics": ["topic-sdk-outbound-command-events"],
"messageMaxBytes": 200000000
"messageMaxBytes": 200000000,
"consumeMessageNum": 1
},
"DOMAIN_EVENT_PRODUCER": {
"brokerList": "localhost:9092",
Expand Down
2 changes: 2 additions & 0 deletions modules/outbound-command-event-handler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
"scripts": {
"start": "yarn run service",
"start:debug": "yarn run service:debug",
"start:debug-brk": "yarn run service:debug-brk",
"start:dev": "ts-node src/application/index.ts",
"service": "node dist/application/index.js",
"service:debug": "node --inspect=0.0.0.0:9229 dist/application/index.js",
"service:debug-brk": "node --inspect-brk=0.0.0.0:9229 dist/application/index.js",
"build": "tsc && yarn run copy-files",
"watch": "tsc -w",
"dev": "nodemon",
Expand Down
7 changes: 7 additions & 0 deletions modules/outbound-command-event-handler/src/shared/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ const config = Convict({
default: 200000000,
env: 'COMMAND_EVENT_CONSUMER_MESSAGE_MAX_BYTES',
},
consumeMessageNum: {
doc: 'consumeMessageNum',
nullable: true,
default: null,
format: Number,
env: 'COMMAND_EVENT_CONSUMER_MESSAGE_NUM',
},
},
DOMAIN_EVENT_PRODUCER: {
brokerList: {
Expand Down
3 changes: 2 additions & 1 deletion modules/outbound-domain-event-handler/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"groupId": "domain_events_consumer_group",
"clientId": "domain_events_consumer_client_id",
"topics": ["topic-sdk-outbound-domain-events"],
"messageMaxBytes": 200000000
"messageMaxBytes": 200000000,
"consumeMessageNum": 1
},
"COMMAND_EVENT_PRODUCER": {
"brokerList": "localhost:9092",
Expand Down
7 changes: 7 additions & 0 deletions modules/outbound-domain-event-handler/src/shared/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ const config = Convict({
default: 200000000,
env: 'DOMAIN_EVENT_CONSUMER_MESSAGE_MAX_BYTES',
},
consumeMessageNum: {
doc: 'consumeMessageNum',
nullable: true,
default: null,
format: Number,
env: 'DOMAIN_EVENT_CONSUMER_MESSAGE_NUM',
},
},
COMMAND_EVENT_PRODUCER: {
brokerList: {
Expand Down
2 changes: 1 addition & 1 deletion modules/private-shared-lib/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"@mojaloop/central-services-shared": "^17.3.1",
"@mojaloop/logging-bc-public-types-lib": "^0.1.13",
"@mojaloop/platform-shared-lib-messaging-types-lib": "^0.2.18",
"@mojaloop/platform-shared-lib-nodejs-kafka-client-lib": "^0.2.15",
"@mojaloop/platform-shared-lib-nodejs-kafka-client-lib": "0.3.0-snapshot.6",
"ajv": "^8.11.2",
"redis": "^4.5.0",
"uuid": "^9.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ export class KafkaCommandEventConsumer extends KafkaEventConsumer {
outputType: MLKafkaRawConsumerOutputType.Json,
messageMaxBytes: consumerOptions.messageMaxBytes || 200000000,
};

// This ensures that there is back-pressure when consuming messages in order. This can > 1. Comment this out for flow mode is preferred.
if(consumerOptions?.consumeMessageNum) mlConsumerOptions.consumeMessageNum = consumerOptions.consumeMessageNum;

super(mlConsumerOptions, consumerOptions.topics, superHandlerFn, logger);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ export class KafkaDomainEventConsumer extends KafkaEventConsumer {
outputType: MLKafkaRawConsumerOutputType.Json,
messageMaxBytes: consumerOptions.messageMaxBytes || 200000000,
};

// This ensures that there is back-pressure when consuming messages in order. This can > 1. Comment this out for flow mode is preferred.
if(consumerOptions?.consumeMessageNum) mlConsumerOptions.consumeMessageNum = consumerOptions.consumeMessageNum;

super(mlConsumerOptions, consumerOptions.topics, superHandlerFn, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

'use strict';

import Util from 'util';
import { MLKafkaRawConsumer, MLKafkaRawConsumerOptions } from '@mojaloop/platform-shared-lib-nodejs-kafka-client-lib';
import { IMessage } from '@module-types';
import { IEventConsumer } from '../types';
Expand All @@ -49,6 +50,8 @@ export class KafkaEventConsumer implements IEventConsumer {
this._logger = logger;
this._kafkaTopics = kafkaTopics;
this._handler = handlerFn;
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
this._logger.isDebugEnabled() && this._logger.debug(`consumerOptions = ${Util.inspect(consumerOptions)}`);
this._kafkaConsumer = new MLKafkaRawConsumer(consumerOptions, this._logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ export type IKafkaEventConsumerOptions = {
clientId: string,
topics: string[],
messageMaxBytes?: number,
consumeMessageNum?: number,
};
7 changes: 6 additions & 1 deletion test/func/config/sdk-ttksim1/api-svc.env
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,9 @@ FSPIOP_EVENT_PRODUCER_TOPIC=ttksim1-topic-sdk-outbound-domain-events

# Maximum payload limits
FSPIOP_API_SERVER_MAX_REQUEST_BYTES=209715200
BACKEND_API_SERVER_MAX_REQUEST_BYTES=209715200
BACKEND_API_SERVER_MAX_REQUEST_BYTES=209715200

# Number of messages to consume at a time
# comment these out for flow mode
BACKEND_EVENT_CONSUMER_MESSAGE_NUM=1
FSPIOP_EVENT_CONSUMER_MESSAGE_NUM=1
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ DOMAIN_EVENT_PRODUCER_TOPIC=ttksim1-topic-sdk-outbound-domain-events

DOMAIN_EVENT_PRODUCER_COMPRESSION_CODEC=lz4
DOMAIN_EVENT_PRODUCER_MESSAGE_MAX_BYTES=200000000
COMMAND_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
COMMAND_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
# Number of messages to consume at a time
# comment these out for flow mode
# COMMAND_EVENT_CONSUMER_MESSAGE_NUM=1
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ COMMAND_EVENT_PRODUCER_TOPIC=ttksim1-topic-sdk-outbound-command-events
COMMAND_EVENT_PRODUCER_COMPRESSION_CODEC=lz4
COMMAND_EVENT_PRODUCER_MESSAGE_MAX_BYTES=200000000
DOMAIN_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
# Number of messages to consume at a time
# comment these out for flow mode
# DOMAIN_EVENT_CONSUMER_MESSAGE_NUM=1
5 changes: 5 additions & 0 deletions test/func/config/sdk-ttksim2/api-svc.env
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,8 @@ FSPIOP_EVENT_PRODUCER_TOPIC=ttksim2-topic-sdk-outbound-domain-events
# Maximum payload limits
FSPIOP_API_SERVER_MAX_REQUEST_BYTES=209715200
BACKEND_API_SERVER_MAX_REQUEST_BYTES=209715200

# Number of messages to consume at a time
# comment these out for flow mode
# BACKEND_EVENT_CONSUMER_MESSAGE_NUM=1
# FSPIOP_EVENT_CONSUMER_MESSAGE_NUM=1
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ DOMAIN_EVENT_PRODUCER_TOPIC=ttksim2-topic-sdk-outbound-domain-events

DOMAIN_EVENT_PRODUCER_COMPRESSION_CODEC=lz4
DOMAIN_EVENT_PRODUCER_MESSAGE_MAX_BYTES=200000000
COMMAND_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
COMMAND_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
# Number of messages to consume at a time
# comment these out for flow mode
# COMMAND_EVENT_CONSUMER_MESSAGE_NUM=1
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ COMMAND_EVENT_PRODUCER_TOPIC=ttksim2-topic-sdk-outbound-command-events

COMMAND_EVENT_PRODUCER_COMPRESSION_CODEC=lz4
COMMAND_EVENT_PRODUCER_MESSAGE_MAX_BYTES=200000000
DOMAIN_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
DOMAIN_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
# Number of messages to consume at a time
# comment these out for flow mode
# DOMAIN_EVENT_CONSUMER_MESSAGE_NUM=1
5 changes: 5 additions & 0 deletions test/func/config/sdk-ttksim3/api-svc.env
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,8 @@ FSPIOP_EVENT_PRODUCER_TOPIC=ttksim3-topic-sdk-outbound-domain-events
# Maximum payload limits
FSPIOP_API_SERVER_MAX_REQUEST_BYTES=209715200
BACKEND_API_SERVER_MAX_REQUEST_BYTES=209715200

# Number of messages to consume at a time
# comment these out for flow mode
# BACKEND_EVENT_CONSUMER_MESSAGE_NUM=1
# FSPIOP_EVENT_CONSUMER_MESSAGE_NUM=1
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ DOMAIN_EVENT_PRODUCER_TOPIC=ttksim3-topic-sdk-outbound-domain-events

DOMAIN_EVENT_PRODUCER_COMPRESSION_CODEC=lz4
DOMAIN_EVENT_PRODUCER_MESSAGE_MAX_BYTES=200000000
COMMAND_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
COMMAND_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
# Number of messages to consume at a time
# comment these out for flow mode
# COMMAND_EVENT_CONSUMER_MESSAGE_NUM=1
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ COMMAND_EVENT_PRODUCER_TOPIC=ttksim3-topic-sdk-outbound-command-events

COMMAND_EVENT_PRODUCER_COMPRESSION_CODEC=lz4
COMMAND_EVENT_PRODUCER_MESSAGE_MAX_BYTES=200000000
DOMAIN_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
DOMAIN_EVENT_CONSUMER_MESSAGE_MAX_BYTES=200000000
# Number of messages to consume at a time
# comment these out for flow mode
# DOMAIN_EVENT_CONSUMER_MESSAGE_NUM=1
4 changes: 2 additions & 2 deletions test/func/config/ttk-ttksim1/spec_files/user_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"JWS_SIGN_PUT_PARTIES": false,
"CLIENT_MUTUAL_TLS_ENABLED": false,
"ADVANCED_FEATURES_ENABLED": true,
"CALLBACK_TIMEOUT": 60000,
"DEFAULT_REQUEST_TIMEOUT": 60000,
"CALLBACK_TIMEOUT": 200000,
"DEFAULT_REQUEST_TIMEOUT": 200000,
"SCRIPT_TIMEOUT": 60000,
"LOG_SERVER_UI_URL": "http://url-here",
"UI_CONFIGURATION": {
Expand Down
12 changes: 10 additions & 2 deletions test/func/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ services:
networks:
- mojaloop-net
env_file: ./config/sdk-ttksim1/api-svc.env
command: yarn workspace @mojaloop/sdk-scheme-adapter-api-svc run start:debug
command: yarn workspace @mojaloop/sdk-scheme-adapter-api-svc run start
## Useful for debugging
# command: yarn workspace @mojaloop/sdk-scheme-adapter-api-svc run start:debug
ports:
- "14000:4000"
- "14001:4001"
Expand All @@ -41,7 +43,11 @@ services:
file: common-sdk.yml
service: outbound-command-event-handler
env_file: ./config/sdk-ttksim1/outbound-command-event-handler.env
# command: yarn workspace @mojaloop/sdk-scheme-adapter-outbound-command-event-handler run start
## Useful for debugging
command: yarn workspace @mojaloop/sdk-scheme-adapter-outbound-command-event-handler run start:debug
## Useful for debugging, with breakpoint for debugger to connect
# command: yarn workspace @mojaloop/sdk-scheme-adapter-outbound-command-event-handler run start:debug-brk
ports:
- "18000:8000"
- "19222:9229"
Expand All @@ -58,7 +64,9 @@ services:
file: common-sdk.yml
service: outbound-domain-event-handler
env_file: ./config/sdk-ttksim1/outbound-domain-event-handler.env
command: yarn workspace @mojaloop/sdk-scheme-adapter-outbound-domain-event-handler run start:debug
command: yarn workspace @mojaloop/sdk-scheme-adapter-outbound-domain-event-handler run start
## Useful for debugging
# command: yarn workspace @mojaloop/sdk-scheme-adapter-outbound-domain-event-handler run start:debug
ports:
- "18001:8000"
- "19223:9229"
Expand Down
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2185,14 +2185,14 @@ __metadata:
languageName: node
linkType: hard

"@mojaloop/platform-shared-lib-nodejs-kafka-client-lib@npm:^0.2.15":
version: 0.2.15
resolution: "@mojaloop/platform-shared-lib-nodejs-kafka-client-lib@npm:0.2.15"
"@mojaloop/platform-shared-lib-nodejs-kafka-client-lib@npm:0.3.0-snapshot.6":
version: 0.3.0-snapshot.6
resolution: "@mojaloop/platform-shared-lib-nodejs-kafka-client-lib@npm:0.3.0-snapshot.6"
dependencies:
"@mojaloop/logging-bc-public-types-lib": ^0.1.11
"@mojaloop/platform-shared-lib-messaging-types-lib": ~0.2.0
node-rdkafka: ~2.13.0
checksum: abc30b51d57585bcd2fd116de08d6412be2d3b2db0fe5ad7d92d2ef632da7460bcb1d811c82c52fe01d948932d6e9f73d2f1e4fc11eb3f054e1d37e5df1210a1
checksum: 89b317c4a11db3811043f3a3a40cd5bdc517a52b1542436a6e636d4b0aa61784fcc283e2cd7abaeeca35baa9ea6e3544d119ba2b2be59c68af591ca23a535150
languageName: node
linkType: hard

Expand Down Expand Up @@ -2349,7 +2349,7 @@ __metadata:
"@mojaloop/central-services-shared": ^17.3.1
"@mojaloop/logging-bc-public-types-lib": ^0.1.13
"@mojaloop/platform-shared-lib-messaging-types-lib": ^0.2.18
"@mojaloop/platform-shared-lib-nodejs-kafka-client-lib": ^0.2.15
"@mojaloop/platform-shared-lib-nodejs-kafka-client-lib": 0.3.0-snapshot.6
"@types/node": ^18.11.9
ajv: ^8.11.2
eslint: ^8.27.0
Expand Down