Skip to content

Commit

Permalink
Merge pull request #416 from Inist-CNRS/add-forkrequest-id
Browse files Browse the repository at this point in the history
fix: 🐛 async requests should have their own request ID
  • Loading branch information
touv authored Jun 7, 2024
2 parents b4c9df4 + 238a9b4 commit 16671b6
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 6 deletions.
46 changes: 40 additions & 6 deletions packages/core/src/statements/fork.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import debug from 'debug';
import _ from 'lodash';
import { clone, set } from 'lodash';

import breaker from './breaker';
import {
createFusible,
enableFusible,
disableFusible
} from '../fusible';



/**
* fork the current pipeline
Expand All @@ -13,18 +22,25 @@ import _ from 'lodash';
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @param {String} [target=x-request-id] choose the key to set with the forked request identifier
* @returns {Object}
*/
export default function fork(data, feed) {
export default async function fork(data, feed) {
const { ezs } = this;
const standalone = Number([]
.concat(this.getParam('standalone', false))
.filter(Boolean)
.shift());
const target = []
.concat(this.getParam('target', 'x-request-id'))
.filter(Boolean)
.shift();

if (this.isFirst()) {
let output;
try {
this.fusible = await createFusible();
await enableFusible(this.fusible);
this.input = ezs.createStream(ezs.objectMode());
const commands = ezs.createCommands({
file: this.getParam('file'),
Expand All @@ -35,6 +51,8 @@ export default function fork(data, feed) {
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
statements.unshift(ezs(breaker, { fusible: this.fusible }));
statements.push(ezs(breaker, { fusible: this.fusible }));
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
output = ezs.createPipeline(this.input, statements, logger);
}
Expand All @@ -44,13 +62,24 @@ export default function fork(data, feed) {
if (standalone) {
output
.on('data', () => true)
.once('end', () => true);
.once('error', async () => {
await disableFusible(this.fusible);
})
.once('end', async () => {
await disableFusible(this.fusible);
});
} else {
this.whenFinish = new Promise((resolve) => output
.pipe(ezs.catch((e) => feed.write(e))) // avoid to break pipeline at each error
.once('error', (e) => feed.stop(e))
.once('error', async (e) => {
await disableFusible(this.fusible);
feed.stop(e);
})
.on('data', () => true)
.once('end', resolve)
.once('end', async () => {
await disableFusible(this.fusible);
resolve();
})
);
}
}
Expand All @@ -64,5 +93,10 @@ export default function fork(data, feed) {
}
return true;
}
return ezs.writeTo(this.input, _.clone(data), () => feed.send(data));
return ezs.writeTo(this.input, clone(data), () => {
if (target) {
set(data, target, this.fusible);
}
feed.send(data);
});
}
62 changes: 62 additions & 0 deletions packages/core/test/fork.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,75 @@ describe('fork)', () => {
.on('error', done)
.on('data', (chunk) => {
assert(typeof chunk === 'object');
assert(typeof chunk['x-request-id'] === 'string');
res += chunk.a;
})
.on('end', () => {
assert.equal(6, res);
done();
});
});
it('#1bis (standalone)', (done) => {
let res = 0;
const script = `
[assign]
path = a
value = 99
`;
from([
{ a: 1, b: 9 },
{ a: 2, b: 9 },
{ a: 1, b: 9 },
{ a: 1, b: 9 },
{ a: 1, b: 9 },
])
.pipe(ezs('fork', {
script,
standalone: true,
target: false,
}))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res += chunk.a;
})
.on('end', () => {
assert.equal(6, res);
done();
});
});
it('#1ter(standalone)', (done) => {
let res = 0;
const script = `
[assign]
path = a
value = 99
`;
from([
1,
2,
3,
4,
5,
])
.pipe(ezs('fork', {
script,
standalone: true,
target: true,
}))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
res += Number(chunk);
})
.on('end', () => {
assert.equal(15, res);
done();
});
});



it('#2', (done) => {
const script = `
Expand Down
23 changes: 23 additions & 0 deletions packages/core/test/serverControl.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,28 @@ describe('through server(s)', () => {
expect(response2.status).toBe(400);
});


it('async request', async (done) => {
try {
const stream = from([
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h',
]);
const response1 = await fetch('http://127.0.0.1:33377/transit7.ini?time=300', { method: 'POST', body: stream });
const requestID = await response1.text();
const response2 = await fetch('http://127.0.0.1:33377/', {
method: 'DELETE',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify({ 'x-request-id': requestID }),
});
expect(response2.status).toBe(202);
done();
} catch(e) {
done(e);
}
});

});

18 changes: 18 additions & 0 deletions packages/core/test/transit7.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[use]
plugin = packages/core/test/locals

[replace]
path = value
value = self()
[fork]
standalone = true

[fork/transit]

[fork/slow]
time = env('time')

[shift]

[exchange]
value = get('x-request-id')

0 comments on commit 16671b6

Please sign in to comment.