From 56019f9db399e49ae3f5e360be14261e1f70f52a Mon Sep 17 00:00:00 2001 From: Timur Shemsedinov Date: Tue, 10 May 2022 04:59:08 +0300 Subject: [PATCH] Initial integration bus implementation Refs: https://github.com/metarhia/impress/issues/1733 PR-URL: https://github.com/metarhia/impress/pull/1736 --- CHANGELOG.md | 1 + lib/application.js | 4 +++ lib/cache.js | 3 +- lib/modules.js | 6 +++- lib/services.js | 79 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 lib/services.js diff --git a/CHANGELOG.md b/CHANGELOG.md index c9d859e9..f189572d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Fix shutdown while initialization - Worker-based multitenancy implementation +- Initial integration bus implementation (new place `application/bus`) ## [2.6.10][] - 2022-05-09 diff --git a/lib/application.js b/lib/application.js index ab78faeb..c5c01ade 100644 --- a/lib/application.js +++ b/lib/application.js @@ -11,6 +11,7 @@ const metavm = require('metavm'); const metawatch = require('metawatch'); const { Interfaces } = require('./interfaces.js'); const { Modules } = require('./modules.js'); +const { Services } = require('./services.js'); const { Resources } = require('./resources.js'); const { Schemas } = require('./schemas.js'); const { Scheduler } = require('./scheduler.js'); @@ -49,6 +50,7 @@ class Application extends events.EventEmitter { this.api = new Interfaces('api', this); this.lib = new Modules('lib', this); this.db = new Modules('db', this); + this.bus = new Services('bus', this); this.domain = new Modules('domain', this); this.scheduler = new Scheduler(this); @@ -78,6 +80,7 @@ class Application extends events.EventEmitter { (async () => { await this.lib.load(); await this.db.load(); + await this.bus.load(); await this.domain.load(); })(), ]); @@ -130,6 +133,7 @@ class Application extends events.EventEmitter { sandbox.api = {}; sandbox.lib = this.lib.tree; sandbox.db = this.db.tree; + sandbox.bus = this.bus.tree; sandbox.domain = this.domain.tree; this.sandbox = metavm.createContext(sandbox); } diff --git a/lib/cache.js b/lib/cache.js index 0d499faa..cd8720ef 100644 --- a/lib/cache.js +++ b/lib/cache.js @@ -5,6 +5,7 @@ const fsp = require('fs').promises; class Cache { constructor(place, application) { + this.place = place; this.path = application.absolute(place); this.application = application; } @@ -14,7 +15,7 @@ class Cache { try { const files = await fsp.readdir(targetPath, { withFileTypes: true }); for (const file of files) { - if (file.name.startsWith('.')) continue; + if (file.name.startsWith('.') && !file.name.endsWith('.js')) continue; const filePath = path.join(targetPath, file.name); if (file.isDirectory()) await this.load(filePath); else await this.change(filePath); diff --git a/lib/modules.js b/lib/modules.js index 94b59bef..4272d0dc 100644 --- a/lib/modules.js +++ b/lib/modules.js @@ -25,6 +25,10 @@ class Modules extends Cache { }, timeout); } + preprocess(iface) { + return iface; + } + set(relPath, iface) { const names = parsePath(relPath); let level = this.tree; @@ -66,7 +70,7 @@ class Modules extends Cache { try { const { exports } = await metavm.readScript(filePath, options); const relPath = filePath.substring(this.path.length + 1); - this.set(relPath, exports); + this.set(relPath, this.preprocess(exports)); } catch (err) { if (err.code !== 'ENOENT') { this.application.console.error(err.stack); diff --git a/lib/services.js b/lib/services.js new file mode 100644 index 00000000..b08410d1 --- /dev/null +++ b/lib/services.js @@ -0,0 +1,79 @@ +'use strict'; + +const http = require('http'); +const https = require('https'); +const metautil = require('metautil'); +const { Schema } = require('metaschema'); +const { Modules } = require('./modules.js'); + +const request = (url, { method, body }) => + new Promise((resolve, reject) => { + const proto = url.startsWith('https') ? https : http; + const headers = { 'Content-Type': 'application/json' }; + const req = proto.request(url, { method, headers }, (res) => { + const code = res.statusCode; + if (code !== 200) { + const dest = `for ${method} ${url}`; + return reject(new Error(`HTTP status code ${code} ${dest}`)); + } + metautil.receiveBody(res).then((data) => { + const json = data.toString(); + try { + const object = JSON.parse(json); + resolve(object); + } catch (error) { + return reject(error); + } + }, reject); + }); + req.on('error', reject); + if (body) req.write(body); + req.end(); + }); + +const serialize = (fields, args) => { + if (!fields) return ''; + const data = {}; + for (const par of fields) { + data[par] = args[par]; + } + return JSON.stringify(data); +}; + +class Services extends Modules { + preprocess(iface) { + const { application } = this; + const namespaces = application.schemas ? [application.schemas.model] : []; + const { parameters, returns } = iface; + const validation = { + parameters: parameters ? Schema.from(parameters, namespaces) : null, + returns: returns ? Schema.from(returns, namespaces) : null, + }; + const method = async (args) => { + const { parameters, returns } = validation; + if (parameters) { + const { valid, errors } = parameters.check(args); + const problems = errors.join('; '); + if (!valid) return new Error('Invalid parameters type: ' + problems); + } + const service = method.parent['.service']; + const verb = iface.method.get ? 'get' : 'post'; + const target = [service.url, iface.method[verb]]; + if (iface.method.path) { + target.push(...iface.method.path.map((arg) => args[arg])); + } + const body = serialize(iface.method.body, args); + const url = target.join('/'); + const result = await request(url, { method: verb.toUpperCase(), body }); + if (returns) { + const { valid, errors } = returns.check(result); + const problems = errors.join('; '); + if (!valid) return new Error('Invalid result type: ' + problems); + } + return result; + }; + return Object.assign(method, iface); + } +} + +module.exports = { Services };