Skip to content

Commit

Permalink
Merge pull request #414 from Inist-CNRS/add-breaker
Browse files Browse the repository at this point in the history
feat: 🎸 add [breaker]
  • Loading branch information
touv authored May 31, 2024
2 parents 6dd4015 + 2205210 commit 364b540
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 15 deletions.
61 changes: 61 additions & 0 deletions packages/core/src/fusible.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { access, constants, writeFile, unlink } from 'fs';
import { resolve, normalize } from 'path';
import { tmpdir } from 'os';
import generate from 'nanoid/async/generate';
import nolookalikes from 'nanoid-dictionary/nolookalikes';

import { checksum } from './statements/identify';

const location = tmpdir();
const extension = '.sid';

export const createFusible = async () => {
const fusible = await generate(nolookalikes, 16);
return fusible;
};

export const checkFusible = (fusible) => new Promise((next) => {
if (!fusible) {
return next(false);
}
const fusibleFile = resolve(normalize(location), fusible + extension);
return access(fusibleFile, constants.R_OK, (err) => {
if (err) {
return next(false);
}
return next(true);
});
});


export const enableFusible = (fusible) => new Promise((next, cancel) => {
const fusibleFile = resolve(normalize(location), fusible + extension);
checkFusible(fusible).then((check) => {
if (!check) {
const fileContent = checksum(fusible);
writeFile(fusibleFile, fileContent, (err) => {
if (err) {
return cancel(err);
}
return next(true);
});
}
return next(true);
});
});

export const disableFusible = (fusible) => new Promise((next, cancel) => {
const fusibleFile = resolve(normalize(location), fusible + extension);
checkFusible(fusible).then((check) => {
if (check) {
unlink(fusibleFile, (err) => {
if (err) {
return cancel(err);
}
return next(true);
});
}
});
return true;
});

21 changes: 15 additions & 6 deletions packages/core/src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import debug from 'debug';
import knownPipeline from './knownPipeline';
import unknownPipeline from './unknownPipeline';
import serverInformation from './serverInformation';
import serverControl from './serverControl';
import errorHandler from './errorHandler';
import settings from '../settings';
import { RX_FILENAME } from '../constants';
Expand All @@ -18,6 +19,11 @@ import {
httpRequestDurationMicroseconds,
aggregatorRegistry,
} from './metrics';
import {
createFusible,
enableFusible,
disableFusible
} from '../fusible';

function isPipeline() {
const f = this.pathName.match(RX_FILENAME);
Expand All @@ -32,20 +38,26 @@ const signals = ['SIGINT', 'SIGTERM'];

function createServer(ezs, serverPort, serverPath, workerId) {
const app = connect();
app.use((request, response, next) => {
app.use( async (request, response, next) => {
const stopTimer = httpRequestDurationMicroseconds.startTimer();
request.workerId = workerId;
request.catched = false;
request.serverPath = serverPath;
request.urlParsed = parse(request.url, true);
request.pathName = request.urlParsed.pathname;
request.methodMatch = methodMatch;
request.isPipeline = isPipeline;
const stopTimer = httpRequestDurationMicroseconds.startTimer();
eos(response, () => stopTimer());
request.fusible = await createFusible();
await enableFusible(request.fusible);
eos(response, async () => {
stopTimer();
await disableFusible(request.fusible);
});
next();
});
app.use(metrics(ezs));
app.use(serverInformation(ezs));
app.use(serverControl(ezs));
app.use(unknownPipeline(ezs));
app.use(knownPipeline(ezs));
app.use((request, response, next) => {
Expand All @@ -63,13 +75,10 @@ function createServer(ezs, serverPort, serverPath, workerId) {
server.setTimeout(0);
server.listen(serverPort);
server.addListener('connection', (socket) => {
const uniqId = `${Date.now()}-${Math.floor(Math.random() * 1e6)}`;
debug('ezs')('New connection', uniqId);
httpConnectionTotal.inc();
httpConnectionOpen.inc();
socket.on('close', () => {
httpConnectionOpen.dec();
debug('ezs')('Connection closed', uniqId);
});
});
signals.forEach((signal) => process.on(signal, () => {
Expand Down
9 changes: 6 additions & 3 deletions packages/core/src/server/knownPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import _ from 'lodash';
import { metricsHandle } from './metrics';
import errorHandler from './errorHandler';
import { isFile } from '../file';
import breaker from '../statements/breaker';
import settings from '../settings';


const dispositionFrom = ({ extension }) => (extension ? `attachment; filename="dump.${extension}"` : 'inline');

const encodingFrom = (headers) => (headers
Expand All @@ -33,7 +33,7 @@ const knownPipeline = (ezs) => (request, response, next) => {
request.catched = true;
debug('ezs')(`Create middleware 'knownPipeline' for ${request.method} ${request.pathName}`);

const { headers } = request;
const { headers, fusible } = request;
const triggerError = errorHandler(request, response);
const { query } = request.urlParsed;
const files = ezs.memoize(`knownPipeline>${request.pathName}`,
Expand All @@ -46,7 +46,6 @@ const knownPipeline = (ezs) => (request, response, next) => {
triggerError(new Error(`Cannot find ${request.pathName}`), 404);
return false;
}

debug('ezs')(
`PID ${process.pid} will execute ${request.pathName} commands with ${sizeof(query)}B of global parameters`,
);
Expand All @@ -64,6 +63,8 @@ const knownPipeline = (ezs) => (request, response, next) => {
response.setHeader('Content-Encoding', contentEncoding);
response.setHeader('Content-Disposition', contentDisposition);
response.setHeader('Content-Type', contentType);
response.setHeader('X-Request-ID', fusible);

response.socket.setNoDelay(false);

if (request.method !== 'POST') {
Expand Down Expand Up @@ -98,6 +99,8 @@ const knownPipeline = (ezs) => (request, response, next) => {
statements.unshift(ezs('metrics', { bucket: 'input' }));
statements.push(ezs('metrics', { bucket: 'output' }));
}
statements.unshift(ezs(breaker, { fusible }));
statements.push(ezs(breaker, { fusible }));

const rawStream = new PassThrough();
let emptyStream = true;
Expand Down
31 changes: 31 additions & 0 deletions packages/core/src/server/serverControl.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import debug from 'debug';
import { disableFusible } from '../fusible';

const serverInformation = () => (request, response, next) => {
if (!request.methodMatch(['DELETE']) || request.pathName !== '/') {
return next();
}
request.catched = true;
debug('ezs')(`Create middleware 'serverControl' for ${request.method} ${request.pathName}`);
const input = [];
return request
.on('error', err => next(err))
.on('data', chunk => {
input.push(chunk);
})
.on('end', async () => {
try {
const body = Buffer.concat(input).toString();
const bodyParsed = JSON.parse(body);
await disableFusible(bodyParsed['x-request-id'] || bodyParsed['X-Request-ID']);
response.writeHead(202);
response.end();
next();
}
catch (e) {
next(e);
}
});
};

export default serverInformation;
46 changes: 43 additions & 3 deletions packages/core/src/server/serverInformation.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,35 @@ const keyOfPathItemObject = [ // https://swagger.io/specification/
'servers',
'parameters',
];
const globalSwaggerPaths = {
'/': {
'delete': {
description: 'Cancel asynchronous requests',
summary: 'A way to cancel too long request.'
},
requestBody: {
description: '',
content: {
'application/json': {
schema: {
$ref: '#/components/schemas/serverControl'
}
}
},
required: true
},
responses: {
'202': {
description: 'successful operation',
},
'400': {
description: 'Invalid input value',
}
}
}
};


const collectMetadata = async (dirPath, hostName) => {
const globalSwagger = {
openapi: '3.0.0',
Expand Down Expand Up @@ -124,6 +153,16 @@ const collectMetadata = async (dirPath, hostName) => {
items: {
$ref: '#/components/schemas/minimalObject'
}
},
serverControl: {
type: 'object',
properties: {
'x-request-id': {
description: 'Request identifier sent in the http response header.',
type: 'string',
example: 'qdrfgtyhbvdeftgh'
}
}
}
},
},
Expand All @@ -140,11 +179,12 @@ const collectMetadata = async (dirPath, hostName) => {
return globalSwagger;
}
return globalSwagger;
}
};

const collectPaths = (ezs, dirPath) => new Promise((resolve) => {
dir.files(dirPath, (err, files) => {
const filenames = err ? [] : files;
const paths = filenames
const localPaths = filenames
.filter((f) => (f.search(/\.(ini|ezs)$/) > 0))
.map((f) => ({
[f.replace(dirPath, '').replace(/\.\w+/, '')]:
Expand All @@ -166,7 +206,7 @@ const collectPaths = (ezs, dirPath) => new Promise((resolve) => {
...cur,
}), {},
);
resolve(paths);
resolve(_.merge(globalSwaggerPaths, localPaths));
});
});

Expand Down
24 changes: 24 additions & 0 deletions packages/core/src/statements/breaker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import debug from 'debug';
import { checkFusible } from '../fusible';
/**
* Break the stream if the control file cannot be checked
*
*
* @name delegate
* @param {String} [fusible] file to check
* @returns {Object}
*/
export default async function breaker(data, feed) {
if (this.isFirst()) {
this.fusible = this.getParam('fusible');
}
if (this.isLast()) {
return feed.close();
}
const check = await checkFusible(this.fusible);
if (!check) {
debug('ezs')(`Stream break, ${this.fusible} no longer active.`);
return feed.close(data);
}
return feed.send(data);
}
6 changes: 4 additions & 2 deletions packages/core/src/statements/identify.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export function ncda(input, alphabet = []) {
return alphabet[x] || '';
}

export const checksum = (input) => ncda(input, nolookalikes);

/**
* Take `Object`, and compute & add an identifier
*
Expand All @@ -59,8 +61,8 @@ export default async function identify(data, feed) {
identifier = await sha(data);
}
if (identifier) {
const checksum = ncda(identifier, nolookalikes);
_.set(data, path, `${scheme}:/${identifier}${checksum}`);
const digit = checksum(identifier);
_.set(data, path, `${scheme}:/${identifier}${digit}`);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/statements/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import combine from './combine';
import expand from './expand';
import overturn from './overturn';
import fork from './fork';
import breaker from './breaker';

export default {
extract,
Expand Down Expand Up @@ -76,4 +77,5 @@ export default {
combine,
overturn,
fork,
breaker,
};
20 changes: 20 additions & 0 deletions packages/core/test/fusible.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {
createFusible,
enableFusible,
checkFusible,
disableFusible
} from '../src/fusible';

test('fusible', async () => {
const fusible = await createFusible();
expect(fusible).toMatch(/.+/);
const isEnable = await enableFusible(fusible);
expect(isEnable).toBeTruthy();
const isCheckOK = await checkFusible(fusible);
expect(isCheckOK).toBeTruthy();
const isDisable = await disableFusible(fusible);
expect(isDisable).toBeTruthy();
const isCheckKO = await checkFusible(fusible);
expect(isCheckKO).not.toBeTruthy();
});

2 changes: 1 addition & 1 deletion packages/core/test/knownPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ describe(' through server(s)', () => {
});
res.on('end', () => {
assert.equal(output.join(''), 'a');
assert(check < 5);
assert(check < (input.length / 2));
done();
});
});
Expand Down
Loading

0 comments on commit 364b540

Please sign in to comment.