diff --git a/packages/core/src/client.js b/packages/core/src/client.js deleted file mode 100644 index 40783d48..00000000 --- a/packages/core/src/client.js +++ /dev/null @@ -1,109 +0,0 @@ -import { PassThrough } from 'readable-stream'; -import http from 'http'; -import debug from 'debug'; -import Parameter from './parameter'; -import settings from './settings'; - -export const agent = new http.Agent({ - maxSockets: 0, - keepAlive: false, - timeout: 0, -}); - -export const parseAddress = (commands, environment) => (srvr) => { - if (typeof srvr !== 'string') { - return null; - } - const hostWithPort = srvr.match(/^\[?([^\]]+)\]?:(\d+)$/); - const serverOptions = { - hostname: srvr, - port: settings.port, - path: '/', - method: 'POST', - headers: { - 'Transfer-Encoding': 'chunked', - 'Content-Type': 'application/json', - }, - agent, - }; - commands - .filter(Boolean) - .forEach((command, index) => { - serverOptions.headers[`X-Command-${index}`] = Parameter.pack(command); - }); - Object.keys(environment) - .filter((keyEnv) => environment[keyEnv]) - .forEach((keyEnv, index) => { - serverOptions.headers[`X-Environment-${index}`] = Parameter.pack({ k: keyEnv, v: environment[keyEnv] }); - }); - if (hostWithPort) { - return { - ...serverOptions, - hostname: hostWithPort[1], - port: Number(hostWithPort[2]), - }; - } - return serverOptions; -}; - -export const ensureArray = (a) => (Array.isArray(a) ? a : [a]); - -export const inspectServers = (servers, commands, environment, ns) => ensureArray(servers) - .filter(Boolean) - .filter((elem, pos, arr) => arr.indexOf(elem) === pos) - .map(parseAddress(commands, environment)) - .map((s) => Array(ns || settings.concurrency).fill(s)) // multiple each line - .reduce((a, b) => a.concat(b), []); // flatten all - -export const connectServer = (ezs) => (serverOptions, index) => { - const { hostname, port } = serverOptions; - let connected = false; - serverOptions.headers = { - ...ezs.encodingMode(), - ...serverOptions.headers, - }; - const input = new PassThrough(ezs.objectMode()); - const output = new PassThrough(ezs.objectMode()); - const handle = http.request(serverOptions, (res) => { - connected = true; - debug('ezs')(`http://${hostname}:${port} send code ${res.statusCode}`); - if (res.statusCode === 200) { - res - .pipe(ezs.uncompress(res.headers)) - .pipe(ezs('unpack')) - .pipe(ezs('ungroup')) - .pipe(output); - return 1; - } - if (res.statusCode === 500) { - const errmsg = Parameter.decode(res.headers['x-error']); - output.write(new Error(`Server sent: ${errmsg}`)); - output.end(); - return 2; - } - output.write(new Error( - `http://${hostname}:${port} at item #${index} return ${res.statusCode}`, - )); - return 3; - }); - handle.on('error', (e) => { - handle.abort(); - if (!connected) { - output.write(new Error( - `http://${hostname || '?'}:${port || '?'} at item #${index} return ${e.message}`, - )); - return output.end(); - } - debug('ezs')(`http://${hostname}:${port} was stopped properly following ${e}`); - return 4; - }); - handle.setNoDelay(false); - - input - .pipe(ezs('group')) - .pipe(ezs('pack')) - .pipe(ezs.compress(ezs.encodingMode())) - .pipe(handle); - const duplex = [input, output]; - return duplex; -}; diff --git a/packages/core/src/server/index.js b/packages/core/src/server/index.js index 259afdd3..94300584 100644 --- a/packages/core/src/server/index.js +++ b/packages/core/src/server/index.js @@ -6,7 +6,6 @@ import controlServer from 'http-shutdown'; import { parse } from 'url'; import debug from 'debug'; import knownPipeline from './knownPipeline'; -import unknownPipeline from './unknownPipeline'; import serverInformation from './serverInformation'; import serverControl from './serverControl'; import errorHandler from './errorHandler'; @@ -58,7 +57,6 @@ function createServer(ezs, serverPort, serverPath, workerId) { app.use(metrics(ezs)); app.use(serverInformation(ezs)); app.use(serverControl(ezs)); - app.use(unknownPipeline(ezs)); app.use(knownPipeline(ezs)); app.use((request, response, next) => { if (request.catched === false) { diff --git a/packages/core/src/server/unknownPipeline.js b/packages/core/src/server/unknownPipeline.js deleted file mode 100644 index 6b00fe02..00000000 --- a/packages/core/src/server/unknownPipeline.js +++ /dev/null @@ -1,54 +0,0 @@ -import debug from 'debug'; -import sizeof from 'object-sizeof'; -import Parameter from '../parameter'; -import errorHandler from './errorHandler'; - -const unknownPipeline = ezs => (request, response, next) => { - - if (request.catched || !request.methodMatch(['POST']) || request.pathName !== '/') { - return next(); - } - request.catched = true; - debug('ezs')(`Create middleware 'unknownPipeline' for ${request.method} ${request.pathName}`); - - const { headers } = request; - response.setHeader('Content-Encoding', headers['content-encoding'] || 'identity'); - const commands = Object.keys(headers) - .filter(headerKey => (headerKey.indexOf('x-command') === 0)) - .map(headerKey => parseInt(headerKey.replace('x-command-', ''), 10)) - .sort((x, y) => x - y) - .map(commandIndex => Parameter.unscramble(headers[`x-command-${commandIndex}`])); - const environment = Object.keys(headers) - .filter(headerKey => (headerKey.indexOf('x-environment') === 0)) - .map(headerKey => headerKey.replace('x-environment-', '')) - .map((environmentKey) => { - const { k, v } = Parameter.unpack(headers[`x-environment-${environmentKey}`]); - return { - [k]: v, - }; - }) - .reduce((prev, cur) => Object.assign(prev, cur), {}); - debug('ezs')( - `PID ${process.pid} will execute ${commands.length} commands with ${sizeof(environment)} of global parameters`, - ); - request - .pipe(ezs.uncompress(headers)) - .pipe(ezs('unpack')) - .pipe(ezs('ungroup')) - .pipe(ezs('delegate', { commands }, environment)) - .pipe(ezs.catch(errorHandler(request, response))) - .pipe(ezs((input, output) => { - if (!response.headersSent) { - response.writeHead(200); - } - return output.send(input); - })) - .pipe(ezs('group')) - .pipe(ezs('pack')) - .pipe(ezs.compress(headers)) - .pipe(response); - request.resume(); - response.once('close', next); -}; - -export default unknownPipeline; diff --git a/packages/core/src/statements/dispatch.js b/packages/core/src/statements/dispatch.js deleted file mode 100644 index 25aa6eb8..00000000 --- a/packages/core/src/statements/dispatch.js +++ /dev/null @@ -1,68 +0,0 @@ -import merge from 'merge2'; -import debug from 'debug'; -import { inspectServers, connectServer } from '../client'; - -/** - * Dispatch processing to an external pipeline on one or more servers. - * - * @name dispatch - * @param {String} [file] the external pipeline is described in a file - * @param {String} [script] the external pipeline is described in a string of characters - * @param {String} [commands] the external pipeline is described in a object - * @param {String} [command] the external pipeline is described in a URL-like command - * @returns {Object} - */ -export default function dispatch(data, feed) { - const { ezs } = this; - if (this.isFirst()) { - this.lastIndex = 0; - const file = this.getParam('file'); - const fileContent = ezs.loadScript(file); - const script = this.getParam('script', fileContent); - const cmd1 = ezs.compileScript(script).get(); - const command = this.getParam('command'); - const cmd2 = [].concat(command).map(ezs.parseCommand).filter(Boolean); - const commands = this.getParam('commands', cmd1.concat(cmd2)); - const environment = this.getEnv(); - const servers = inspectServers( - this.getParam('server', []), - commands, - environment, - ); - - if ( - !servers - || servers.length === 0 - || !commands - || commands.length === 0 - ) { - return feed.stop(new Error('Invalid parmeter for [dispatch]')); - } - debug('ezs')(`[dispatch] connect to #${servers.length} servers.`); - const handles = servers.map(connectServer(ezs)); - this.ins = handles.map((h) => h[0]); - this.outs = handles.map((h) => h[1]); - const funnel = merge(this.outs, ezs.objectMode()) - .on('queueDrain', () => { - funnel.destroy(); - }) - .on('error', (e) => feed.write(e)) - .on('data', (d) => feed.write(d)); - this.whenFinish = new Promise((resolve) => { - funnel.on('close', resolve); - }); - } - if (this.isLast()) { - this.whenFinish.then(() => feed.close()).catch((e) => feed.stop(e)); - this.ins.forEach((handle) => handle.end()); - } else { - if (this.lastIndex >= this.ins.length) { - this.lastIndex = 0; - } - const check = ezs.writeTo(this.ins[this.lastIndex], data, () => feed.end()); - if (!check) { - this.lastIndex += 1; - } - } - return 1; -} diff --git a/packages/core/src/statements/index.js b/packages/core/src/statements/index.js index 12cb036a..4472a1b2 100644 --- a/packages/core/src/statements/index.js +++ b/packages/core/src/statements/index.js @@ -14,7 +14,6 @@ import shuffle from './shuffle'; import env from './env'; import group from './group'; import ungroup from './ungroup'; -import dispatch from './dispatch'; import parallel from './parallel'; import spawn from './spawn'; import delegate from './delegate'; @@ -55,7 +54,6 @@ export default { env, group, ungroup, - dispatch, parallel, tracer, spawn, diff --git a/packages/core/test/unKnownPipeline.js b/packages/core/test/unKnownPipeline.js deleted file mode 100644 index 9ff7bd88..00000000 --- a/packages/core/test/unKnownPipeline.js +++ /dev/null @@ -1,632 +0,0 @@ -import assert from 'assert'; -import from from 'from'; -import { Readable } from 'stream'; -import ezs from '../src'; -import { parseAddress } from '../src/client'; -import JSONezs from '../src/json'; - -ezs.use(require('./locals')); - -ezs.addPath(__dirname); - -ezs.settings.servePath = __dirname; -ezs.settings.cacheEnable = true; -ezs.settings.tracerEnable = true; -ezs.settings.metricsEnable = false; - -class Upto extends Readable { - constructor(m) { - super({ objectMode: true }); - this.i = 0; - this.m = m; - } - - _read() { - this.i += 1; - if (this.i >= this.m) { - this.push(null); - } else { - this.push(this.i); - } - } -} -describe('dispatch through server(s)', () => { - const server1 = ezs.createServer(31976, false); - const server2 = ezs.createServer(30001, false); - const server3 = ezs.createServer(30002, false); - const server4 = ezs.createServer(30003, false); - - afterAll(() => { - server1.close(); - server2.close(); - server3.close(); - server4.close(); - }); - - describe('simple statements, one server', () => { - const script = ` - [use] - plugin = packages/core/test/locals - - [increment] - step = 3 - - [decrement] - step = 2 - `; - const commands = [ - { - name: 'increment', - args: { - step: 3, - }, - }, - { - name: 'decrement', - args: { - step: 2, - }, - }, - ]; - const server = '127.0.0.1'; - - it('with object', (done) => { - let res = 0; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { commands, server })) - // .pipe(ezs('debug')) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - - it('with script', (done) => { - let res = 0; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { - server, - script, - }, { toto: 1, titi: 'truc' })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - - it('with file', (done) => { - let res = 0; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { - server, - file: './script.ini', - })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - - it('with script', (done) => { - let res = 0; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { - server, - script, - })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - - it('with commands', (done) => { - let res = 0; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { - server, - commands, - })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - }); - - it('simple statements, N servers', (done) => { - let res = 0; - const commands = [ - { - name: 'increment', - args: { - step: 3, - }, - }, - { - name: 'decrement', - args: { - step: 2, - }, - }, - ]; - const server = [ - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - '127.0.0.1', - ]; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { commands, server })) - .pipe(ezs.catch()) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - - it('simple statements, one server but with different parameter', (done) => { - let res = 0; - const commands = [ - { - name: 'increment', - args: { - step: 3, - }, - }, - { - name: 'decrement', - args: { - step: 2, - }, - }, - ]; - const server = [ - '127.0.0.1', - ]; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { commands, server })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - - it('with commands using args contains UTF8 parameter', (done) => { - const res = []; - const commands = [ - { - name: 'replace', - args: { - path: 'id', - value: 'Les Châtiments', - }, - }, - ]; - const server = [ - '127.0.0.1', - ]; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { commands, server })) - .on('data', (chunk) => { - res.push(chunk); - }) - .on('end', () => { - assert.strictEqual(res[0].id, 'Les Châtiments'); - assert.strictEqual(res[1].id, 'Les Châtiments'); - assert.strictEqual(res[2].id, 'Les Châtiments'); - done(); - }); - }); - - it('with commands using global parameter', (done) => { - let res = 0; - const commands = [ - { - name: 'stepper', - args: { - step: 4, - }, - }, - ]; - const server = [ - '127.0.0.1', - ]; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { commands, server })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 81); - done(); - }); - }); - - it('with buggy statements', (done) => { - const commands = [ - { - name: 'increment', - args: { - step: 2, - }, - }, - { - name: 'boum', - args: { - step: 2, - }, - }, - ]; - const server = [ - '127.0.0.1', - ]; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { commands, server })) - .pipe(ezs.catch()) - .on('error', (error) => { - assert.ok(error instanceof Error); - done(); - }); - }); - it.skip('with unknowed server', (done) => { // because of github actions - const commands = [ - { - name: 'increment', - args: { - step: 2, - }, - }, - ]; - const server = [ - '127.0.0.0', - ]; - const ten = new Upto(10); - let semaphore = true; - ten - .pipe(ezs('dispatch', { commands, server })) - .pipe(ezs.catch()) - .on('error', (error) => { - assert(error instanceof Error); - if (semaphore) { - semaphore = false; - done(); - } - }); - }, 16000); - - it('with an unknowed statement', (done) => { - const commands = [ - { - name: 'increment', - args: { - step: 2, - }, - }, - { - name: 'turlututu', - args: { - step: 2, - }, - }, - ]; - const server = [ - '127.0.0.1', - ]; - const ten = new Upto(10); - let semaphore = true; - ten - .pipe(ezs('dispatch', { commands, server })) - .pipe(ezs.catch()) - .on('error', (error) => { - assert(error instanceof Error); - if (semaphore) { - semaphore = false; - ten.destroy(); - done(); - } - }); - }); - it('with commands in distributed pipeline', (done) => { - const commands = [ - { - name: 'increment', - args: { - step: 3, - }, - }, - { - name: 'decrement', - args: { - step: 2, - }, - }, - ]; - const server = [ - '127.0.0.1:30001', - '127.0.0.1:30002', - '127.0.0.1:30003', - ]; - let res = 0; - const ten = new Upto(10); - ten - .pipe(ezs('dispatch', { server, commands })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 54); - done(); - }); - }); - it('with commands in distributed pipeline #Bis', (done) => { - const commands = [ - { - name: 'increment', - args: { - step: 3, - }, - }, - { - name: 'decrement', - args: { - step: 2, - }, - }, - ]; - const server = [ - '127.0.0.1:30001', - '127.0.0.1:30002', - '127.0.0.1:30003', - ]; - let res = 0; - const ten = new Upto(10); - ten - .pipe(ezs('shift')) - .pipe(ezs('dispatch', { server, commands })) - .on('data', (chunk) => { - res += chunk; - }) - .on('end', () => { - assert.strictEqual(res, 2); - done(); - }); - }); - - it('with a lot of commands in distributed pipeline', (done) => { - const commands = [ - { - name: 'replace', - args: { - path: 'a', - value: 1, - }, - }, - ]; - const server = [ - '127.0.0.1:30001', - '127.0.0.1:30002', - '127.0.0.1:30003', - ]; - let res = 0; - const ten = new Upto(5001); - ten - .pipe(ezs('replace', { path: 'a', value: 'à remplacer' })) - .pipe(ezs('dispatch', { server, commands })) // ~ 9 seconds - .on('data', (chunk) => { - res += chunk.a; - }) - .on('end', () => { - assert.strictEqual(res, 5000); - done(); - }); - }, 200000); - - it('with a lot of delayed commands in distributed pipeline', (done) => { - const script = ` - [use] - plugin = packages/core/test/locals - - [beat] - - `; - const server = [ - '127.0.0.1:30001', - '127.0.0.1:30002', - '127.0.0.1:30003', - ]; - let res = 0; - const ten = new Upto(5001); - ten - .pipe(ezs('dispatch', { script, server })) - .on('data', (chunk) => { - res += chunk.beat; - }) - .on('end', () => { - assert.strictEqual(res, 5000); - done(); - }); - }, 100000); - - it('with a same commands', (done) => { - const script = ` - [use] - plugin = packages/core/test/locals - - [increment] - step = 1 - - `; - const commandsOBJ1 = ezs.parseString(script); - const commandsSTR1 = JSONezs.stringify(commandsOBJ1); - const commandsOBJ2 = JSONezs.parse(commandsSTR1); - const commandsSTR2 = JSONezs.stringify(commandsOBJ2); - // assert.strictEqual(commandsOBJ1[0].args, commandsOBJ2[0].args); - assert.strictEqual(commandsSTR1, commandsSTR2); - done(); - }); - - it('with stuck/unstuck simple pipeline', (done) => { - const script = ` - - [replace] - path = a - value = 7 - - [assign] - path = b - value = 6 - - [assign] - path = c - value = env('k') - - [env] - path = l - value = get('b') - - [assign] - path = d - value = env('l') - - - [transit] - `; - const server = [ - '127.0.0.1', - ]; - const env = { - k: 5, - }; - const res = []; - from([ - { a: 1, b: 9 }, - { a: 2, b: 9 }, - { a: 3, b: 9 }, - { a: 4, b: 9 }, - { a: 5, b: 9 }, - ]) - .pipe(ezs('dispatch', { script, server }, env)) - .on('data', (chunk) => { - assert(typeof chunk === 'object'); - res.push(chunk); - }) - .on('end', () => { - assert.equal(5, res.length); - assert.equal(7, res[0].a); - assert.equal(6, res[0].b); - assert.equal(5, res[0].c); - assert.equal(7, res[1].a); - assert.equal(6, res[1].b); - assert.equal(5, res[1].c); - assert.equal(7, res[2].a); - assert.equal(6, res[2].b); - assert.equal(5, res[2].c); - done(); - }); - }); - - it('an array of array in a pipeline', (done) => { - const script = ` - [transit] - `; - const server = [ - '127.0.0.1', - ]; - const res = []; - from([ - [1, 1, 1, 1], - [2, 2, 2, 2], - [3, 3, 3, 3], - [4, 4, 4, 4], - [5, 5, 5, 5], - ]) - .pipe(ezs('dispatch', { script, server })) - .on('data', (chunk) => { - assert(Array.isArray(chunk)); - res.push(chunk); - }) - .on('end', () => { - const resSorted = res.sort((a, b) => a[0] - b[0]); - assert.equal(5, resSorted.length); - assert.equal(4, resSorted[0].length); - assert.equal(4, resSorted[1].length); - assert.equal(4, resSorted[2].length); - assert.equal(4, resSorted[3].length); - assert.equal(4, resSorted[4].length); - assert.equal(1, resSorted[0][0]); - assert.equal(1, resSorted[0][1]); - assert.equal(1, resSorted[0][2]); - assert.equal(1, resSorted[0][3]); - assert.equal(2, resSorted[1][0]); - assert.equal(2, resSorted[1][1]); - assert.equal(2, resSorted[1][2]); - assert.equal(2, resSorted[1][3]); - assert.equal(5, resSorted[4][0]); - assert.equal(5, resSorted[4][1]); - assert.equal(5, resSorted[4][2]); - assert.equal(5, resSorted[4][3]); - done(); - }); - }); - - it('with wrong parameter', (done) => { - const server = [ - '127.0.0.1', - ]; - const commands = []; - from([0, 0, 0]) - .pipe(ezs('dispatch', { commands, server })) - .pipe(ezs.catch()) - .on('error', (error) => { - assert.ok(error instanceof Error); - done(); - }); - }); - - /**/ -}); - -describe('parseAddress', () => { - it('return null with invalid type', (done) => { - assert.equal(parseAddress({}, {})({}), null); - done(); - }); -});