-
Notifications
You must be signed in to change notification settings - Fork 0
/
timeout-promise-queue.js
74 lines (64 loc) · 2.01 KB
/
timeout-promise-queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
const EventEmitter = require('events');
function PromiseQueue (threshold) {
let queue = []
let inPlay = 0
return {
/** queue (I/O) functions which return promises.
* @pfunc returns a promise
* If add is called with a @timeout, @pfunc will be called with an
* EventEmitter which will emit a 'timeout' if @timeout is exceeded.
*/
add: function (pfunc, timeout, rejection) {
if (++inPlay > threshold) {
return new Promise((resolve, reject) => {
queue.push(() => {
resolve(makeTimeout())
})
})
} else {
return makeTimeout()
}
function makeTimeout () {
let ret = timeout === undefined
? pfunc()
: new Promise((resolve, reject) => {
let clientCancellation = new EventEmitter();
// Create a timer to send a cancellation to pfunc()'s promise.
let timer = setTimeout(() => {
let r = typeof rejection === 'undefined'
? Error('timeout of ' + timeout + ' exceeded')
: typeof rejection === 'function'
? rejection()
: rejection
clientCancellation.emit('timeout', r);
reject(r)
}, timeout)
// Delete timer after resolution.
resolve(pfunc(clientCancellation).then(result => {
clearTimeout(timer)
return result
}).catch(result => {
clearTimeout(timer)
throw result
}))
})
return ret.then(nextEntry).catch(result => {throw nextEntry(result)})
}
},
/** number of entries in the queue.
*/
size: function () {
return queue.length
}
}
/** After each resolution or rejection, check the queue.
*/
function nextEntry (ret) {
--inPlay
if (queue.length > 0) {
queue.pop()()
}
return ret
}
}
module.exports = { PromiseQueue: PromiseQueue }