Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 🎸 add [breaker] #414

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions packages/core/src/fusible.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { access, constants, writeFile, unlink } from 'fs';
import { resolve, normalize } from 'path';
import { tmpdir } from 'os';
import generate from 'nanoid/async/generate';
import nolookalikes from 'nanoid-dictionary/nolookalikes';

import { checksum } from './statements/identify';

const location = tmpdir();
const extension = '.sid';

export const createFusible = async () => {
const fusible = await generate(nolookalikes, 16);
return fusible;
};

export const checkFusible = (fusible) => new Promise((next) => {
if (!fusible) {
return next(false);
}
const fusibleFile = resolve(normalize(location), fusible + extension);
return access(fusibleFile, constants.R_OK, (err) => {
if (err) {
return next(false);
}
return next(true);
});
});


export const enableFusible = (fusible) => new Promise((next, cancel) => {
const fusibleFile = resolve(normalize(location), fusible + extension);
checkFusible(fusible).then((check) => {
if (!check) {
const fileContent = checksum(fusible);
writeFile(fusibleFile, fileContent, (err) => {
if (err) {
return cancel(err);
}
return next(true);
});
}
return next(true);
});
});

export const disableFusible = (fusible) => new Promise((next, cancel) => {
const fusibleFile = resolve(normalize(location), fusible + extension);
checkFusible(fusible).then((check) => {
if (check) {
unlink(fusibleFile, (err) => {
if (err) {
return cancel(err);
}
return next(true);
});
}
});
return true;
});

21 changes: 15 additions & 6 deletions packages/core/src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import debug from 'debug';
import knownPipeline from './knownPipeline';
import unknownPipeline from './unknownPipeline';
import serverInformation from './serverInformation';
import serverControl from './serverControl';
import errorHandler from './errorHandler';
import settings from '../settings';
import { RX_FILENAME } from '../constants';
Expand All @@ -18,6 +19,11 @@ import {
httpRequestDurationMicroseconds,
aggregatorRegistry,
} from './metrics';
import {
createFusible,
enableFusible,
disableFusible
} from '../fusible';

function isPipeline() {
const f = this.pathName.match(RX_FILENAME);
Expand All @@ -32,20 +38,26 @@ const signals = ['SIGINT', 'SIGTERM'];

function createServer(ezs, serverPort, serverPath, workerId) {
const app = connect();
app.use((request, response, next) => {
app.use( async (request, response, next) => {
const stopTimer = httpRequestDurationMicroseconds.startTimer();
request.workerId = workerId;
request.catched = false;
request.serverPath = serverPath;
request.urlParsed = parse(request.url, true);
request.pathName = request.urlParsed.pathname;
request.methodMatch = methodMatch;
request.isPipeline = isPipeline;
const stopTimer = httpRequestDurationMicroseconds.startTimer();
eos(response, () => stopTimer());
request.fusible = await createFusible();
await enableFusible(request.fusible);
eos(response, async () => {
stopTimer();
await disableFusible(request.fusible);
});
next();
});
app.use(metrics(ezs));
app.use(serverInformation(ezs));
app.use(serverControl(ezs));
app.use(unknownPipeline(ezs));
app.use(knownPipeline(ezs));
app.use((request, response, next) => {
Expand All @@ -63,13 +75,10 @@ function createServer(ezs, serverPort, serverPath, workerId) {
server.setTimeout(0);
server.listen(serverPort);
server.addListener('connection', (socket) => {
const uniqId = `${Date.now()}-${Math.floor(Math.random() * 1e6)}`;
debug('ezs')('New connection', uniqId);
httpConnectionTotal.inc();
httpConnectionOpen.inc();
socket.on('close', () => {
httpConnectionOpen.dec();
debug('ezs')('Connection closed', uniqId);
});
});
signals.forEach((signal) => process.on(signal, () => {
Expand Down
9 changes: 6 additions & 3 deletions packages/core/src/server/knownPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import _ from 'lodash';
import { metricsHandle } from './metrics';
import errorHandler from './errorHandler';
import { isFile } from '../file';
import breaker from '../statements/breaker';
import settings from '../settings';


const dispositionFrom = ({ extension }) => (extension ? `attachment; filename="dump.${extension}"` : 'inline');

const encodingFrom = (headers) => (headers
Expand All @@ -33,7 +33,7 @@ const knownPipeline = (ezs) => (request, response, next) => {
request.catched = true;
debug('ezs')(`Create middleware 'knownPipeline' for ${request.method} ${request.pathName}`);

const { headers } = request;
const { headers, fusible } = request;
const triggerError = errorHandler(request, response);
const { query } = request.urlParsed;
const files = ezs.memoize(`knownPipeline>${request.pathName}`,
Expand All @@ -46,7 +46,6 @@ const knownPipeline = (ezs) => (request, response, next) => {
triggerError(new Error(`Cannot find ${request.pathName}`), 404);
return false;
}

debug('ezs')(
`PID ${process.pid} will execute ${request.pathName} commands with ${sizeof(query)}B of global parameters`,
);
Expand All @@ -64,6 +63,8 @@ const knownPipeline = (ezs) => (request, response, next) => {
response.setHeader('Content-Encoding', contentEncoding);
response.setHeader('Content-Disposition', contentDisposition);
response.setHeader('Content-Type', contentType);
response.setHeader('X-Request-ID', fusible);

response.socket.setNoDelay(false);

if (request.method !== 'POST') {
Expand Down Expand Up @@ -98,6 +99,8 @@ const knownPipeline = (ezs) => (request, response, next) => {
statements.unshift(ezs('metrics', { bucket: 'input' }));
statements.push(ezs('metrics', { bucket: 'output' }));
}
statements.unshift(ezs(breaker, { fusible }));
statements.push(ezs(breaker, { fusible }));

const rawStream = new PassThrough();
let emptyStream = true;
Expand Down
31 changes: 31 additions & 0 deletions packages/core/src/server/serverControl.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import debug from 'debug';
import { disableFusible } from '../fusible';

const serverInformation = () => (request, response, next) => {
if (!request.methodMatch(['DELETE']) || request.pathName !== '/') {
return next();
}
request.catched = true;
debug('ezs')(`Create middleware 'serverControl' for ${request.method} ${request.pathName}`);
const input = [];
return request
.on('error', err => next(err))
.on('data', chunk => {
input.push(chunk);
})
.on('end', async () => {
try {
const body = Buffer.concat(input).toString();
const bodyParsed = JSON.parse(body);
await disableFusible(bodyParsed['x-request-id'] || bodyParsed['X-Request-ID']);
response.writeHead(202);
response.end();
next();
}
catch (e) {
next(e);
}
});
};

export default serverInformation;
46 changes: 43 additions & 3 deletions packages/core/src/server/serverInformation.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,35 @@ const keyOfPathItemObject = [ // https://swagger.io/specification/
'servers',
'parameters',
];
const globalSwaggerPaths = {
'/': {
'delete': {
description: 'Cancel asynchronous requests',
summary: 'A way to cancel too long request.'
},
requestBody: {
description: '',
content: {
'application/json': {
schema: {
$ref: '#/components/schemas/serverControl'
}
}
},
required: true
},
responses: {
'202': {
description: 'successful operation',
},
'400': {
description: 'Invalid input value',
}
}
}
};


const collectMetadata = async (dirPath, hostName) => {
const globalSwagger = {
openapi: '3.0.0',
Expand Down Expand Up @@ -124,6 +153,16 @@ const collectMetadata = async (dirPath, hostName) => {
items: {
$ref: '#/components/schemas/minimalObject'
}
},
serverControl: {
type: 'object',
properties: {
'x-request-id': {
description: 'Request identifier sent in the http response header.',
type: 'string',
example: 'qdrfgtyhbvdeftgh'
}
}
}
},
},
Expand All @@ -140,11 +179,12 @@ const collectMetadata = async (dirPath, hostName) => {
return globalSwagger;
}
return globalSwagger;
}
};

const collectPaths = (ezs, dirPath) => new Promise((resolve) => {
dir.files(dirPath, (err, files) => {
const filenames = err ? [] : files;
const paths = filenames
const localPaths = filenames
.filter((f) => (f.search(/\.(ini|ezs)$/) > 0))
.map((f) => ({
[f.replace(dirPath, '').replace(/\.\w+/, '')]:
Expand All @@ -166,7 +206,7 @@ const collectPaths = (ezs, dirPath) => new Promise((resolve) => {
...cur,
}), {},
);
resolve(paths);
resolve(_.merge(globalSwaggerPaths, localPaths));
});
});

Expand Down
24 changes: 24 additions & 0 deletions packages/core/src/statements/breaker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import debug from 'debug';
import { checkFusible } from '../fusible';
/**
* Break the stream if the control file cannot be checked
*
*
* @name delegate
* @param {String} [fusible] file to check
* @returns {Object}
*/
export default async function breaker(data, feed) {
if (this.isFirst()) {
this.fusible = this.getParam('fusible');
}
if (this.isLast()) {
return feed.close();
}
const check = await checkFusible(this.fusible);
if (!check) {
debug('ezs')(`Stream break, ${this.fusible} no longer active.`);
return feed.close(data);
}
return feed.send(data);
}
6 changes: 4 additions & 2 deletions packages/core/src/statements/identify.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export function ncda(input, alphabet = []) {
return alphabet[x] || '';
}

export const checksum = (input) => ncda(input, nolookalikes);

/**
* Take `Object`, and compute & add an identifier
*
Expand All @@ -59,8 +61,8 @@ export default async function identify(data, feed) {
identifier = await sha(data);
}
if (identifier) {
const checksum = ncda(identifier, nolookalikes);
_.set(data, path, `${scheme}:/${identifier}${checksum}`);
const digit = checksum(identifier);
_.set(data, path, `${scheme}:/${identifier}${digit}`);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/statements/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import combine from './combine';
import expand from './expand';
import overturn from './overturn';
import fork from './fork';
import breaker from './breaker';

export default {
extract,
Expand Down Expand Up @@ -76,4 +77,5 @@ export default {
combine,
overturn,
fork,
breaker,
};
20 changes: 20 additions & 0 deletions packages/core/test/fusible.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {
createFusible,
enableFusible,
checkFusible,
disableFusible
} from '../src/fusible';

test('fusible', async () => {
const fusible = await createFusible();
expect(fusible).toMatch(/.+/);
const isEnable = await enableFusible(fusible);
expect(isEnable).toBeTruthy();
const isCheckOK = await checkFusible(fusible);
expect(isCheckOK).toBeTruthy();
const isDisable = await disableFusible(fusible);
expect(isDisable).toBeTruthy();
const isCheckKO = await checkFusible(fusible);
expect(isCheckKO).not.toBeTruthy();
});

2 changes: 1 addition & 1 deletion packages/core/test/knownPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ describe(' through server(s)', () => {
});
res.on('end', () => {
assert.equal(output.join(''), 'a');
assert(check < 5);
assert(check < (input.length / 2));
done();
});
});
Expand Down
Loading
Loading