|
| 1 | +import { findWith, last, is, isEmpty } from "@asd14/m" |
| 2 | +import isDeepEqual from "fast-deep-equal" |
| 3 | + |
| 4 | +/** |
| 5 | + * Unique, sequential, promise based queue |
| 6 | + * |
| 7 | + * @example |
| 8 | + * const queue = buildQueue() |
| 9 | + * |
| 10 | + * queue.enqueue(Users.login, { |
| 11 | + * args:{ |
| 12 | + * body: {email: "lorem@test.com", password: "secret"} |
| 13 | + * } |
| 14 | + * }) |
| 15 | + * .then(...) |
| 16 | + * .catch(...) |
| 17 | + * |
| 18 | + * @returns {object<enqueue, dequeue>} |
| 19 | + */ |
| 20 | +export const buildQueue = () => { |
| 21 | + const jobsList = [] |
| 22 | + let isProcessing = false |
| 23 | + |
| 24 | + return { |
| 25 | + enqueue({ id, fn, args }) { |
| 26 | + const runningJob = findWith({ |
| 27 | + id, |
| 28 | + args: source => isDeepEqual(source, args), |
| 29 | + })(jobsList) |
| 30 | + |
| 31 | + if (is(runningJob)) { |
| 32 | + return runningJob.fnPromise |
| 33 | + } |
| 34 | + |
| 35 | + let deferredResolve = null |
| 36 | + let deferredReject = null |
| 37 | + const fnResultPromise = new Promise((resolve, reject) => { |
| 38 | + deferredResolve = resolve |
| 39 | + deferredReject = reject |
| 40 | + }) |
| 41 | + |
| 42 | + // add job at begining of queue |
| 43 | + jobsList.unshift({ |
| 44 | + id, |
| 45 | + args, |
| 46 | + fn, |
| 47 | + fnResultPromise, |
| 48 | + onResolve: results => { |
| 49 | + deferredResolve(results) |
| 50 | + }, |
| 51 | + onReject: error => { |
| 52 | + deferredReject(error) |
| 53 | + }, |
| 54 | + }) |
| 55 | + |
| 56 | + // start processing jobs |
| 57 | + this.dequeue() |
| 58 | + |
| 59 | + // return promise that will be resolved after fn is called and resolved |
| 60 | + return fnResultPromise |
| 61 | + }, |
| 62 | + |
| 63 | + dequeue() { |
| 64 | + const shouldStartRunningJobs = !isProcessing && !isEmpty(jobsList) |
| 65 | + |
| 66 | + if (!shouldStartRunningJobs) { |
| 67 | + return undefined |
| 68 | + } |
| 69 | + |
| 70 | + const { fn, args, onResolve, onReject } = last(jobsList) |
| 71 | + |
| 72 | + // no jobs will be started until current one finishes |
| 73 | + isProcessing = true |
| 74 | + |
| 75 | + return Promise.resolve() |
| 76 | + .then(() => { |
| 77 | + // Need to remove ourselves before job.fn resolves. If another |
| 78 | + // action of the same signature runs right after it will return this |
| 79 | + // job because job still exists in queue |
| 80 | + jobsList.pop() |
| 81 | + |
| 82 | + return fn(...args) |
| 83 | + }) |
| 84 | + .then(onResolve) |
| 85 | + .catch(onReject) |
| 86 | + .finally(() => { |
| 87 | + // process next in queue |
| 88 | + isProcessing = false |
| 89 | + this.dequeue() |
| 90 | + }) |
| 91 | + }, |
| 92 | + } |
| 93 | +} |
0 commit comments