Skip to content

Commit

Permalink
make queue methods async
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 4, 2023
1 parent 9235eea commit a2627f7
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 250 deletions.
13 changes: 8 additions & 5 deletions dev/scratch.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import uuid from 'uuid';

const uuidV4 = uuid.v4;

console.log(uuidV4());
Array.from(new Array(3), (v, i) => i).forEach(async (i) => {
while (true) {
if (Math.random() < 0.1) {
throw new Error('failed');
}
console.log(i, 'success');
}
});
129 changes: 49 additions & 80 deletions store/queue.mjs
Original file line number Diff line number Diff line change
@@ -1,90 +1,59 @@
import moment from 'moment';
import async from 'async';
import redis from './redis.mjs';
import db from './db.mjs';

function runQueue(queueName, parallelism, processor) {
function processOneJob(cb) {
redis.blpop(queueName, '0', (err, job) => {
if (err) {
throw err;
}
async function runQueue(queueName, parallelism, processor) {
Array.from(new Array(parallelism), (v, i) => i).forEach(async (i) => {
while (true) {
const job = await redis.blpop(queueName, '0');
const jobData = JSON.parse(job[1]);
processor(jobData, (err) => {
if (err) {
console.error(err);
}
process.nextTick(cb);
});
});
}
for (let i = 0; i < parallelism; i += 1) {
async.forever(processOneJob, (err) => {
throw err;
});
}
await processor(jobData);
}
});
}
function runReliableQueue(queueName, parallelism, processor) {
function processOneJob(cb) {
db.transaction((trx) => {
trx
.raw(
`
UPDATE queue SET attempts = attempts - 1, next_attempt_time = ?
WHERE id = (
SELECT id
FROM queue
WHERE type = ?
AND (next_attempt_time IS NULL OR next_attempt_time < now())
ORDER BY priority ASC NULLS LAST, id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
`,
[moment().add(2, 'minute'), queueName]

async function runReliableQueue(queueName, parallelism, processor) {
Array.from(new Array(parallelism), (v, i) => i).forEach(async (i) => {
while (true) {
const trx = await db.transaction();
const result = await trx.raw(
`
UPDATE queue SET attempts = attempts - 1, next_attempt_time = ?
WHERE id = (
SELECT id
FROM queue
WHERE type = ?
AND (next_attempt_time IS NULL OR next_attempt_time < now())
ORDER BY priority ASC NULLS LAST, id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
.asCallback((err, result) => {
const job = result && result.rows && result.rows[0];
if (err) {
throw err;
}
if (!job) {
trx.commit();
// console.log('no job available, waiting');
return setTimeout(cb, 5000);
}
return processor(job.data, (err) => {
if (err) {
// processor encountered an error, just log it and commit the transaction
console.error(err);
}
if (!err || job.attempts <= 0) {
// remove the job from the queue if successful or out of attempts
trx
.raw('DELETE FROM queue WHERE id = ?', [job.id])
.asCallback((err) => {
if (err) {
throw err;
}
trx.commit();
process.nextTick(cb);
});
} else {
trx.commit();
process.nextTick(cb);
}
});
});
}).catch((err) => {
throw err;
});
}
for (let i = 0; i < parallelism; i += 1) {
async.forever(processOneJob, (err) => {
throw err;
});
}
RETURNING *
`,
[moment().add(2, 'minute'), queueName]
);
const job = result && result.rows && result.rows[0];
if (job) {
// Handle possible exception here since we still need to commit the transaction to update attempts
let success = false;
try {
success = await processor(job.data);
} catch (e) {
// Don't crash the process as we expect some processing failures
console.error(e);
}
if (success || job.attempts <= 0) {
// remove the job from the queue if successful or out of attempts
await trx.raw('DELETE FROM queue WHERE id = ?', [job.id]);
}
await trx.commit();
} else {
await trx.commit();
// console.log('no job available, waiting');
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
});
}
async function addJob(queueName, job) {
return await redis.rpush(queueName, job);
Expand Down
19 changes: 7 additions & 12 deletions svc/counts.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,12 @@ function updateMatchups(match) {
*/
async function processCounts(match, cb) {
console.log('match %s', match.match_id);
try {
await updateHeroRankings(match);
await upsertMatchSample(match);
await updateRecords(match);
await updateLastPlayed(match);
await updateHeroSearch(match);
await updateTurbo(match);
await updateBenchmarks(match);
cb();
} catch (e) {
cb(e);
}
await updateHeroRankings(match);
await upsertMatchSample(match);
await updateRecords(match);
await updateLastPlayed(match);
await updateHeroSearch(match);
await updateTurbo(match);
await updateBenchmarks(match);
}
queue.runQueue('countsQueue', 1, processCounts);
72 changes: 34 additions & 38 deletions svc/fullhistory.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Processes a queue of full history/refresh requests for players
import urllib from 'url';
import { promisify } from 'util';
import config from '../config.js';
import {
redisCount,
Expand Down Expand Up @@ -45,10 +46,10 @@ async function processMatch(matchId) {
});
}

function processFullHistory(job, cb) {
async function processFullHistory(job) {
const player = job;
if (Number(player.account_id) === 0) {
return cb();
return;
}
// if test or only want last 100 (no paging), set short_history
// const heroArray = job.short_history || config.NODE_ENV === 'test' ? ['0'] : Object.keys(constants.heroes);
Expand Down Expand Up @@ -95,42 +96,37 @@ function processFullHistory(job, cb) {
return getApiMatchPage(player, url, cb);
});
};
getApiMatchPage(player, container.url, async (err) => {
console.log('%s matches found', Object.keys(player.match_ids).length);
player.fh_unavailable = Boolean(err);
try {
if (err) {
// non-retryable error while scanning, user had a private account
console.log('error: %s', JSON.stringify(err));
await updatePlayer(player);
} else {
// check what matches the player is already associated with
const docs = await getPlayerMatchesPromise(player.account_id, {
project: ['match_id'],
});
console.log(
'%s matches found, %s already in db, %s to add',
Object.keys(player.match_ids).length,
docs.length,
Object.keys(player.match_ids).length - docs.length
);
// iterate through db results, delete match_id key if this player has this match already
// will re-request and update matches where this player was previously anonymous
for (let i = 0; i < docs.length; i += 1) {
const matchId = docs[i].match_id;
delete player.match_ids[matchId];
}
// make api_details requests for matches
const promiseFuncs = Object.keys(player.match_ids).map(
(matchId) => () => processMatch(matchId)
);
await eachLimit(promiseFuncs, parallelism);
await updatePlayer(player);
}
cb();
} catch (e) {
cb(err);
}
try {
await promisify(getApiMatchPage)(player, container.url);
} catch (err) {
// non-retryable error while scanning, user had a private account
console.log('error: %s', JSON.stringify(err));
player.fh_unavailable = true;
await updatePlayer(player);
}
console.log('%s matches found', Object.keys(player.match_ids).length);
player.fh_unavailable = false;
// check what matches the player is already associated with
const docs = await getPlayerMatchesPromise(player.account_id, {
project: ['match_id'],
});
console.log(
'%s matches found, %s already in db, %s to add',
Object.keys(player.match_ids).length,
docs.length,
Object.keys(player.match_ids).length - docs.length
);
// iterate through db results, delete match_id key if this player has this match already
// will re-request and update matches where this player was previously anonymous
for (let i = 0; i < docs.length; i += 1) {
const matchId = docs[i].match_id;
delete player.match_ids[matchId];
}
// make api_details requests for matches
const promiseFuncs = Object.keys(player.match_ids).map(
(matchId) => () => processMatch(matchId)
);
await eachLimit(promiseFuncs, parallelism);
await updatePlayer(player);
}
queue.runQueue('fhQueue', 10, processFullHistory);
11 changes: 4 additions & 7 deletions svc/gcdata.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@ import getGcData from '../store/getGcData.mjs';
import queue from '../store/queue.mjs';
import config from '../config.js';
import utility from '../util/utility.mjs';

const { getRetrieverArr } = utility;
const retrieverArr = getRetrieverArr();
async function processGcData(job, cb) {

async function processGcData(job) {
job.useGcDataArr = true;
try {
await getGcData(job);
cb();
} catch (e) {
cb(e);
}
await getGcData(job);
}
queue.runQueue(
'gcQueue',
Expand Down
49 changes: 22 additions & 27 deletions svc/mmr.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,32 @@ import {
getRetrieverArr,
} from '../util/utility.mjs';
const retrieverArr = getRetrieverArr();
async function processMmr(job, cb) {
async function processMmr(job) {
const accountId = job.account_id;
const urls = retrieverArr.map(
(r) => `http://${r}?key=${config.RETRIEVER_SECRET}&account_id=${accountId}`
);
try {
const data = await getDataPromise({ url: urls });
redisCount(redis, 'retriever_player');
// NOTE: This leads to a massive number of updates on the player table
// Only write it sometimes, unless we're in dev mode
if (config.NODE_ENV === 'development' || Math.random() < 0.05) {
const player = {
account_id: job.account_id || null,
plus: Boolean(data.is_plus_subscriber),
};
await insertPlayerPromise(db, player, false);
}
if (
data.solo_competitive_rank ||
data.competitive_rank ||
data.rank_tier ||
data.leaderboard_rank
) {
data.account_id = job.account_id || null;
data.match_id = job.match_id || null;
data.time = new Date();
await insertPlayerRating(data);
}
cb();
} catch (e) {
cb(e);
const data = await getDataPromise({ url: urls });
redisCount(redis, 'retriever_player');
// NOTE: This leads to a massive number of updates on the player table
// Only write it sometimes, unless we're in dev mode
if (config.NODE_ENV === 'development' || Math.random() < 0.05) {
const player = {
account_id: job.account_id || null,
plus: Boolean(data.is_plus_subscriber),
};
await insertPlayerPromise(db, player, false);
}
if (
data.solo_competitive_rank ||
data.competitive_rank ||
data.rank_tier ||
data.leaderboard_rank
) {
data.account_id = job.account_id || null;
data.match_id = job.match_id || null;
data.time = new Date();
await insertPlayerRating(data);
}
}
queue.runQueue(
Expand Down
Loading

0 comments on commit a2627f7

Please sign in to comment.