diff --git a/packages/core/src/fusible.js b/packages/core/src/fusible.js new file mode 100644 index 00000000..39748722 --- /dev/null +++ b/packages/core/src/fusible.js @@ -0,0 +1,61 @@ +import { access, constants, writeFile, unlink } from 'fs'; +import { resolve, normalize } from 'path'; +import { tmpdir } from 'os'; +import generate from 'nanoid/async/generate'; +import nolookalikes from 'nanoid-dictionary/nolookalikes'; + +import { checksum } from './statements/identify'; + +const location = tmpdir(); +const extension = '.sid'; + +export const createFusible = async () => { + const fusible = await generate(nolookalikes, 16); + return fusible; +}; + +export const checkFusible = (fusible) => new Promise((next) => { + if (!fusible) { + return next(false); + } + const fusibleFile = resolve(normalize(location), fusible + extension); + return access(fusibleFile, constants.R_OK, (err) => { + if (err) { + return next(false); + } + return next(true); + }); +}); + + +export const enableFusible = (fusible) => new Promise((next, cancel) => { + const fusibleFile = resolve(normalize(location), fusible + extension); + checkFusible(fusible).then((check) => { + if (!check) { + const fileContent = checksum(fusible); + writeFile(fusibleFile, fileContent, (err) => { + if (err) { + return cancel(err); + } + return next(true); + }); + } + return next(true); + }); +}); + +export const disableFusible = (fusible) => new Promise((next, cancel) => { + const fusibleFile = resolve(normalize(location), fusible + extension); + checkFusible(fusible).then((check) => { + if (check) { + unlink(fusibleFile, (err) => { + if (err) { + return cancel(err); + } + return next(true); + }); + } + }); + return true; +}); + diff --git a/packages/core/src/server/index.js b/packages/core/src/server/index.js index ea66e763..259afdd3 100644 --- a/packages/core/src/server/index.js +++ b/packages/core/src/server/index.js @@ -8,6 +8,7 @@ import debug from 'debug'; import knownPipeline from './knownPipeline'; import unknownPipeline from './unknownPipeline'; import serverInformation from './serverInformation'; +import serverControl from './serverControl'; import errorHandler from './errorHandler'; import settings from '../settings'; import { RX_FILENAME } from '../constants'; @@ -18,6 +19,11 @@ import { httpRequestDurationMicroseconds, aggregatorRegistry, } from './metrics'; +import { + createFusible, + enableFusible, + disableFusible +} from '../fusible'; function isPipeline() { const f = this.pathName.match(RX_FILENAME); @@ -32,7 +38,8 @@ const signals = ['SIGINT', 'SIGTERM']; function createServer(ezs, serverPort, serverPath, workerId) { const app = connect(); - app.use((request, response, next) => { + app.use( async (request, response, next) => { + const stopTimer = httpRequestDurationMicroseconds.startTimer(); request.workerId = workerId; request.catched = false; request.serverPath = serverPath; @@ -40,12 +47,17 @@ function createServer(ezs, serverPort, serverPath, workerId) { request.pathName = request.urlParsed.pathname; request.methodMatch = methodMatch; request.isPipeline = isPipeline; - const stopTimer = httpRequestDurationMicroseconds.startTimer(); - eos(response, () => stopTimer()); + request.fusible = await createFusible(); + await enableFusible(request.fusible); + eos(response, async () => { + stopTimer(); + await disableFusible(request.fusible); + }); next(); }); app.use(metrics(ezs)); app.use(serverInformation(ezs)); + app.use(serverControl(ezs)); app.use(unknownPipeline(ezs)); app.use(knownPipeline(ezs)); app.use((request, response, next) => { @@ -63,13 +75,10 @@ function createServer(ezs, serverPort, serverPath, workerId) { server.setTimeout(0); server.listen(serverPort); server.addListener('connection', (socket) => { - const uniqId = `${Date.now()}-${Math.floor(Math.random() * 1e6)}`; - debug('ezs')('New connection', uniqId); httpConnectionTotal.inc(); httpConnectionOpen.inc(); socket.on('close', () => { httpConnectionOpen.dec(); - debug('ezs')('Connection closed', uniqId); }); }); signals.forEach((signal) => process.on(signal, () => { diff --git a/packages/core/src/server/knownPipeline.js b/packages/core/src/server/knownPipeline.js index ba3478e5..8195be31 100644 --- a/packages/core/src/server/knownPipeline.js +++ b/packages/core/src/server/knownPipeline.js @@ -8,9 +8,9 @@ import _ from 'lodash'; import { metricsHandle } from './metrics'; import errorHandler from './errorHandler'; import { isFile } from '../file'; +import breaker from '../statements/breaker'; import settings from '../settings'; - const dispositionFrom = ({ extension }) => (extension ? `attachment; filename="dump.${extension}"` : 'inline'); const encodingFrom = (headers) => (headers @@ -33,7 +33,7 @@ const knownPipeline = (ezs) => (request, response, next) => { request.catched = true; debug('ezs')(`Create middleware 'knownPipeline' for ${request.method} ${request.pathName}`); - const { headers } = request; + const { headers, fusible } = request; const triggerError = errorHandler(request, response); const { query } = request.urlParsed; const files = ezs.memoize(`knownPipeline>${request.pathName}`, @@ -46,7 +46,6 @@ const knownPipeline = (ezs) => (request, response, next) => { triggerError(new Error(`Cannot find ${request.pathName}`), 404); return false; } - debug('ezs')( `PID ${process.pid} will execute ${request.pathName} commands with ${sizeof(query)}B of global parameters`, ); @@ -64,6 +63,8 @@ const knownPipeline = (ezs) => (request, response, next) => { response.setHeader('Content-Encoding', contentEncoding); response.setHeader('Content-Disposition', contentDisposition); response.setHeader('Content-Type', contentType); + response.setHeader('X-Request-ID', fusible); + response.socket.setNoDelay(false); if (request.method !== 'POST') { @@ -98,6 +99,8 @@ const knownPipeline = (ezs) => (request, response, next) => { statements.unshift(ezs('metrics', { bucket: 'input' })); statements.push(ezs('metrics', { bucket: 'output' })); } + statements.unshift(ezs(breaker, { fusible })); + statements.push(ezs(breaker, { fusible })); const rawStream = new PassThrough(); let emptyStream = true; diff --git a/packages/core/src/server/serverControl.js b/packages/core/src/server/serverControl.js new file mode 100644 index 00000000..1e51eddd --- /dev/null +++ b/packages/core/src/server/serverControl.js @@ -0,0 +1,31 @@ +import debug from 'debug'; +import { disableFusible } from '../fusible'; + +const serverInformation = () => (request, response, next) => { + if (!request.methodMatch(['DELETE']) || request.pathName !== '/') { + return next(); + } + request.catched = true; + debug('ezs')(`Create middleware 'serverControl' for ${request.method} ${request.pathName}`); + const input = []; + return request + .on('error', err => next(err)) + .on('data', chunk => { + input.push(chunk); + }) + .on('end', async () => { + try { + const body = Buffer.concat(input).toString(); + const bodyParsed = JSON.parse(body); + await disableFusible(bodyParsed['x-request-id'] || bodyParsed['X-Request-ID']); + response.writeHead(202); + response.end(); + next(); + } + catch (e) { + next(e); + } + }); +}; + +export default serverInformation; diff --git a/packages/core/src/server/serverInformation.js b/packages/core/src/server/serverInformation.js index 7fc7093d..0ea6b01e 100644 --- a/packages/core/src/server/serverInformation.js +++ b/packages/core/src/server/serverInformation.js @@ -25,6 +25,35 @@ const keyOfPathItemObject = [ // https://swagger.io/specification/ 'servers', 'parameters', ]; +const globalSwaggerPaths = { + '/': { + 'delete': { + description: 'Cancel asynchronous requests', + summary: 'A way to cancel too long request.' + }, + requestBody: { + description: '', + content: { + 'application/json': { + schema: { + $ref: '#/components/schemas/serverControl' + } + } + }, + required: true + }, + responses: { + '202': { + description: 'successful operation', + }, + '400': { + description: 'Invalid input value', + } + } + } +}; + + const collectMetadata = async (dirPath, hostName) => { const globalSwagger = { openapi: '3.0.0', @@ -124,6 +153,16 @@ const collectMetadata = async (dirPath, hostName) => { items: { $ref: '#/components/schemas/minimalObject' } + }, + serverControl: { + type: 'object', + properties: { + 'x-request-id': { + description: 'Request identifier sent in the http response header.', + type: 'string', + example: 'qdrfgtyhbvdeftgh' + } + } } }, }, @@ -140,11 +179,12 @@ const collectMetadata = async (dirPath, hostName) => { return globalSwagger; } return globalSwagger; -} +}; + const collectPaths = (ezs, dirPath) => new Promise((resolve) => { dir.files(dirPath, (err, files) => { const filenames = err ? [] : files; - const paths = filenames + const localPaths = filenames .filter((f) => (f.search(/\.(ini|ezs)$/) > 0)) .map((f) => ({ [f.replace(dirPath, '').replace(/\.\w+/, '')]: @@ -166,7 +206,7 @@ const collectPaths = (ezs, dirPath) => new Promise((resolve) => { ...cur, }), {}, ); - resolve(paths); + resolve(_.merge(globalSwaggerPaths, localPaths)); }); }); diff --git a/packages/core/src/statements/breaker.js b/packages/core/src/statements/breaker.js new file mode 100644 index 00000000..8cf1267d --- /dev/null +++ b/packages/core/src/statements/breaker.js @@ -0,0 +1,24 @@ +import debug from 'debug'; +import { checkFusible } from '../fusible'; +/** + * Break the stream if the control file cannot be checked + * + * + * @name delegate + * @param {String} [fusible] file to check + * @returns {Object} + */ +export default async function breaker(data, feed) { + if (this.isFirst()) { + this.fusible = this.getParam('fusible'); + } + if (this.isLast()) { + return feed.close(); + } + const check = await checkFusible(this.fusible); + if (!check) { + debug('ezs')(`Stream break, ${this.fusible} no longer active.`); + return feed.close(data); + } + return feed.send(data); +} diff --git a/packages/core/src/statements/identify.js b/packages/core/src/statements/identify.js index faada7ce..51140964 100644 --- a/packages/core/src/statements/identify.js +++ b/packages/core/src/statements/identify.js @@ -35,6 +35,8 @@ export function ncda(input, alphabet = []) { return alphabet[x] || ''; } +export const checksum = (input) => ncda(input, nolookalikes); + /** * Take `Object`, and compute & add an identifier * @@ -59,8 +61,8 @@ export default async function identify(data, feed) { identifier = await sha(data); } if (identifier) { - const checksum = ncda(identifier, nolookalikes); - _.set(data, path, `${scheme}:/${identifier}${checksum}`); + const digit = checksum(identifier); + _.set(data, path, `${scheme}:/${identifier}${digit}`); } } } diff --git a/packages/core/src/statements/index.js b/packages/core/src/statements/index.js index 378c6640..12cb036a 100644 --- a/packages/core/src/statements/index.js +++ b/packages/core/src/statements/index.js @@ -36,6 +36,7 @@ import combine from './combine'; import expand from './expand'; import overturn from './overturn'; import fork from './fork'; +import breaker from './breaker'; export default { extract, @@ -76,4 +77,5 @@ export default { combine, overturn, fork, + breaker, }; diff --git a/packages/core/test/fusible.js b/packages/core/test/fusible.js new file mode 100644 index 00000000..02d3aa7d --- /dev/null +++ b/packages/core/test/fusible.js @@ -0,0 +1,20 @@ +import { + createFusible, + enableFusible, + checkFusible, + disableFusible +} from '../src/fusible'; + +test('fusible', async () => { + const fusible = await createFusible(); + expect(fusible).toMatch(/.+/); + const isEnable = await enableFusible(fusible); + expect(isEnable).toBeTruthy(); + const isCheckOK = await checkFusible(fusible); + expect(isCheckOK).toBeTruthy(); + const isDisable = await disableFusible(fusible); + expect(isDisable).toBeTruthy(); + const isCheckKO = await checkFusible(fusible); + expect(isCheckKO).not.toBeTruthy(); +}); + diff --git a/packages/core/test/knownPipeline.js b/packages/core/test/knownPipeline.js index 36b1de28..63ddbee1 100644 --- a/packages/core/test/knownPipeline.js +++ b/packages/core/test/knownPipeline.js @@ -373,7 +373,7 @@ describe(' through server(s)', () => { }); res.on('end', () => { assert.equal(output.join(''), 'a'); - assert(check < 5); + assert(check < (input.length / 2)); done(); }); }); diff --git a/packages/core/test/serverControl.js b/packages/core/test/serverControl.js new file mode 100644 index 00000000..77ce31e6 --- /dev/null +++ b/packages/core/test/serverControl.js @@ -0,0 +1,77 @@ +import assert from 'assert'; +import from from 'from'; +import fetch from 'node-fetch'; +import ezs from '../src'; + +ezs.use(require('./locals')); + +ezs.addPath(__dirname); + +ezs.settings.servePath = __dirname; +ezs.settings.cacheEnable = true; +ezs.settings.tracerEnable = false; +ezs.settings.metricsEnable = false; + +describe('through server(s)', () => { + const server = ezs.createServer(33377, __dirname); + afterAll(() => { + server.close(); + }); + + it('cancel slow request', async (done) => { + const stream = from([ + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', + ]); + const response1 = await fetch('http://127.0.0.1:33377/transit6.ini?time=300', { method: 'POST', body: stream }); + const requestID = response1.headers.get('x-request-id'); + const output = []; + response1.body.on('error', (e) => { + done(e); + }); + response1.body.on('data', (chunk) => { + output.push(chunk); + }); + response1.body.on('end', () => { + expect(output.length).toBe(1); + done(); + }); + const response2 = await fetch('http://127.0.0.1:33377/', { + method: 'DELETE', + headers: { + 'Accept': 'application/json', + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ 'x-request-id': requestID }), + }); + expect(response2.status).toBe(202); + }); + it('wrong request', async (done) => { + const stream = from([ + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', + ]); + const response1 = await fetch('http://127.0.0.1:33377/transit6.ini?time=300', { method: 'POST', body: stream }); + const requestID = response1.headers.get('x-request-id'); + const output = []; + response1.body.on('error', (e) => { + done(e); + }); + response1.body.on('data', (chunk) => { + output.push(chunk); + }); + response1.body.on('end', () => { + expect(output.length).toBe(1); + done(); + }); + const response2 = await fetch('http://127.0.0.1:33377/', { + method: 'DELETE', + headers: { + 'Accept': 'application/json', + 'Content-Type': 'application/json' + }, + body: {}, + }); + expect(response2.status).toBe(400); + }); + +}); + diff --git a/packages/core/test/transit5.ini b/packages/core/test/transit5.ini new file mode 100644 index 00000000..93628948 --- /dev/null +++ b/packages/core/test/transit5.ini @@ -0,0 +1,8 @@ +[use] +plugin = packages/core/test/locals + +[transit] + +[slowAtTheEnd] +time = 40000 + diff --git a/packages/core/test/transit6.ini b/packages/core/test/transit6.ini new file mode 100644 index 00000000..e16a2125 --- /dev/null +++ b/packages/core/test/transit6.ini @@ -0,0 +1,8 @@ +[use] +plugin = packages/core/test/locals + +[transit] + +[slow] +time = env('time') +