Skip to content

Commit

Permalink
Initial integration bus implementation
Browse files Browse the repository at this point in the history
Refs: #1733
PR-URL: #1736
  • Loading branch information
tshemsedinov committed May 28, 2022
1 parent 25abc44 commit 56019f9
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Fix shutdown while initialization
- Worker-based multitenancy implementation
- Initial integration bus implementation (new place `application/bus`)

## [2.6.10][] - 2022-05-09

Expand Down
4 changes: 4 additions & 0 deletions lib/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const metavm = require('metavm');
const metawatch = require('metawatch');
const { Interfaces } = require('./interfaces.js');
const { Modules } = require('./modules.js');
const { Services } = require('./services.js');
const { Resources } = require('./resources.js');
const { Schemas } = require('./schemas.js');
const { Scheduler } = require('./scheduler.js');
Expand Down Expand Up @@ -49,6 +50,7 @@ class Application extends events.EventEmitter {
this.api = new Interfaces('api', this);
this.lib = new Modules('lib', this);
this.db = new Modules('db', this);
this.bus = new Services('bus', this);
this.domain = new Modules('domain', this);
this.scheduler = new Scheduler(this);

Expand Down Expand Up @@ -78,6 +80,7 @@ class Application extends events.EventEmitter {
(async () => {
await this.lib.load();
await this.db.load();
await this.bus.load();
await this.domain.load();
})(),
]);
Expand Down Expand Up @@ -130,6 +133,7 @@ class Application extends events.EventEmitter {
sandbox.api = {};
sandbox.lib = this.lib.tree;
sandbox.db = this.db.tree;
sandbox.bus = this.bus.tree;
sandbox.domain = this.domain.tree;
this.sandbox = metavm.createContext(sandbox);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const fsp = require('fs').promises;

class Cache {
constructor(place, application) {
this.place = place;
this.path = application.absolute(place);
this.application = application;
}
Expand All @@ -14,7 +15,7 @@ class Cache {
try {
const files = await fsp.readdir(targetPath, { withFileTypes: true });
for (const file of files) {
if (file.name.startsWith('.')) continue;
if (file.name.startsWith('.') && !file.name.endsWith('.js')) continue;
const filePath = path.join(targetPath, file.name);
if (file.isDirectory()) await this.load(filePath);
else await this.change(filePath);
Expand Down
6 changes: 5 additions & 1 deletion lib/modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class Modules extends Cache {
}, timeout);
}

preprocess(iface) {
return iface;
}

set(relPath, iface) {
const names = parsePath(relPath);
let level = this.tree;
Expand Down Expand Up @@ -66,7 +70,7 @@ class Modules extends Cache {
try {
const { exports } = await metavm.readScript(filePath, options);
const relPath = filePath.substring(this.path.length + 1);
this.set(relPath, exports);
this.set(relPath, this.preprocess(exports));
} catch (err) {
if (err.code !== 'ENOENT') {
this.application.console.error(err.stack);
Expand Down
79 changes: 79 additions & 0 deletions lib/services.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict';

const http = require('http');
const https = require('https');
const metautil = require('metautil');
const { Schema } = require('metaschema');
const { Modules } = require('./modules.js');

const request = (url, { method, body }) =>
new Promise((resolve, reject) => {
const proto = url.startsWith('https') ? https : http;
const headers = { 'Content-Type': 'application/json' };
const req = proto.request(url, { method, headers }, (res) => {
const code = res.statusCode;
if (code !== 200) {
const dest = `for ${method} ${url}`;
return reject(new Error(`HTTP status code ${code} ${dest}`));
}
metautil.receiveBody(res).then((data) => {
const json = data.toString();
try {
const object = JSON.parse(json);
resolve(object);
} catch (error) {
return reject(error);
}
}, reject);
});
req.on('error', reject);
if (body) req.write(body);
req.end();
});

const serialize = (fields, args) => {
if (!fields) return '';
const data = {};
for (const par of fields) {
data[par] = args[par];
}
return JSON.stringify(data);
};

class Services extends Modules {
preprocess(iface) {
const { application } = this;
const namespaces = application.schemas ? [application.schemas.model] : [];
const { parameters, returns } = iface;
const validation = {
parameters: parameters ? Schema.from(parameters, namespaces) : null,
returns: returns ? Schema.from(returns, namespaces) : null,
};
const method = async (args) => {
const { parameters, returns } = validation;
if (parameters) {
const { valid, errors } = parameters.check(args);
const problems = errors.join('; ');
if (!valid) return new Error('Invalid parameters type: ' + problems);
}
const service = method.parent['.service'];
const verb = iface.method.get ? 'get' : 'post';
const target = [service.url, iface.method[verb]];
if (iface.method.path) {
target.push(...iface.method.path.map((arg) => args[arg]));
}
const body = serialize(iface.method.body, args);
const url = target.join('/');
const result = await request(url, { method: verb.toUpperCase(), body });
if (returns) {
const { valid, errors } = returns.check(result);
const problems = errors.join('; ');
if (!valid) return new Error('Invalid result type: ' + problems);
}
return result;
};
return Object.assign(method, iface);
}
}

module.exports = { Services };

0 comments on commit 56019f9

Please sign in to comment.