diff --git a/dev/scratch.mjs b/dev/scratch.mjs index 49d513ead..98697ab57 100644 --- a/dev/scratch.mjs +++ b/dev/scratch.mjs @@ -1,5 +1,8 @@ -import uuid from 'uuid'; - -const uuidV4 = uuid.v4; - -console.log(uuidV4()); +Array.from(new Array(3), (v, i) => i).forEach(async (i) => { + while (true) { + if (Math.random() < 0.1) { + throw new Error('failed'); + } + console.log(i, 'success'); + } +}); diff --git a/store/queue.mjs b/store/queue.mjs index d15c8763b..925b78490 100644 --- a/store/queue.mjs +++ b/store/queue.mjs @@ -1,90 +1,59 @@ import moment from 'moment'; -import async from 'async'; import redis from './redis.mjs'; import db from './db.mjs'; -function runQueue(queueName, parallelism, processor) { - function processOneJob(cb) { - redis.blpop(queueName, '0', (err, job) => { - if (err) { - throw err; - } +async function runQueue(queueName, parallelism, processor) { + Array.from(new Array(parallelism), (v, i) => i).forEach(async (i) => { + while (true) { + const job = await redis.blpop(queueName, '0'); const jobData = JSON.parse(job[1]); - processor(jobData, (err) => { - if (err) { - console.error(err); - } - process.nextTick(cb); - }); - }); - } - for (let i = 0; i < parallelism; i += 1) { - async.forever(processOneJob, (err) => { - throw err; - }); - } + await processor(jobData); + } + }); } -function runReliableQueue(queueName, parallelism, processor) { - function processOneJob(cb) { - db.transaction((trx) => { - trx - .raw( - ` - UPDATE queue SET attempts = attempts - 1, next_attempt_time = ? - WHERE id = ( - SELECT id - FROM queue - WHERE type = ? - AND (next_attempt_time IS NULL OR next_attempt_time < now()) - ORDER BY priority ASC NULLS LAST, id ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - RETURNING * - `, - [moment().add(2, 'minute'), queueName] + +async function runReliableQueue(queueName, parallelism, processor) { + Array.from(new Array(parallelism), (v, i) => i).forEach(async (i) => { + while (true) { + const trx = await db.transaction(); + const result = await trx.raw( + ` + UPDATE queue SET attempts = attempts - 1, next_attempt_time = ? + WHERE id = ( + SELECT id + FROM queue + WHERE type = ? + AND (next_attempt_time IS NULL OR next_attempt_time < now()) + ORDER BY priority ASC NULLS LAST, id ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 ) - .asCallback((err, result) => { - const job = result && result.rows && result.rows[0]; - if (err) { - throw err; - } - if (!job) { - trx.commit(); - // console.log('no job available, waiting'); - return setTimeout(cb, 5000); - } - return processor(job.data, (err) => { - if (err) { - // processor encountered an error, just log it and commit the transaction - console.error(err); - } - if (!err || job.attempts <= 0) { - // remove the job from the queue if successful or out of attempts - trx - .raw('DELETE FROM queue WHERE id = ?', [job.id]) - .asCallback((err) => { - if (err) { - throw err; - } - trx.commit(); - process.nextTick(cb); - }); - } else { - trx.commit(); - process.nextTick(cb); - } - }); - }); - }).catch((err) => { - throw err; - }); - } - for (let i = 0; i < parallelism; i += 1) { - async.forever(processOneJob, (err) => { - throw err; - }); - } + RETURNING * + `, + [moment().add(2, 'minute'), queueName] + ); + const job = result && result.rows && result.rows[0]; + if (job) { + // Handle possible exception here since we still need to commit the transaction to update attempts + let success = false; + try { + success = await processor(job.data); + } catch (e) { + // Don't crash the process as we expect some processing failures + console.error(e); + } + if (success || job.attempts <= 0) { + // remove the job from the queue if successful or out of attempts + await trx.raw('DELETE FROM queue WHERE id = ?', [job.id]); + } + await trx.commit(); + } else { + await trx.commit(); + // console.log('no job available, waiting'); + await new Promise((resolve) => setTimeout(resolve, 5000)); + } + } + }); } async function addJob(queueName, job) { return await redis.rpush(queueName, job); diff --git a/svc/counts.mjs b/svc/counts.mjs index 2c4e4d6d4..c01581e7a 100644 --- a/svc/counts.mjs +++ b/svc/counts.mjs @@ -273,17 +273,12 @@ function updateMatchups(match) { */ async function processCounts(match, cb) { console.log('match %s', match.match_id); - try { - await updateHeroRankings(match); - await upsertMatchSample(match); - await updateRecords(match); - await updateLastPlayed(match); - await updateHeroSearch(match); - await updateTurbo(match); - await updateBenchmarks(match); - cb(); - } catch (e) { - cb(e); - } + await updateHeroRankings(match); + await upsertMatchSample(match); + await updateRecords(match); + await updateLastPlayed(match); + await updateHeroSearch(match); + await updateTurbo(match); + await updateBenchmarks(match); } queue.runQueue('countsQueue', 1, processCounts); diff --git a/svc/fullhistory.mjs b/svc/fullhistory.mjs index ffca6b5f8..e960004fa 100644 --- a/svc/fullhistory.mjs +++ b/svc/fullhistory.mjs @@ -1,5 +1,6 @@ // Processes a queue of full history/refresh requests for players import urllib from 'url'; +import { promisify } from 'util'; import config from '../config.js'; import { redisCount, @@ -45,10 +46,10 @@ async function processMatch(matchId) { }); } -function processFullHistory(job, cb) { +async function processFullHistory(job) { const player = job; if (Number(player.account_id) === 0) { - return cb(); + return; } // if test or only want last 100 (no paging), set short_history // const heroArray = job.short_history || config.NODE_ENV === 'test' ? ['0'] : Object.keys(constants.heroes); @@ -95,42 +96,37 @@ function processFullHistory(job, cb) { return getApiMatchPage(player, url, cb); }); }; - getApiMatchPage(player, container.url, async (err) => { - console.log('%s matches found', Object.keys(player.match_ids).length); - player.fh_unavailable = Boolean(err); - try { - if (err) { - // non-retryable error while scanning, user had a private account - console.log('error: %s', JSON.stringify(err)); - await updatePlayer(player); - } else { - // check what matches the player is already associated with - const docs = await getPlayerMatchesPromise(player.account_id, { - project: ['match_id'], - }); - console.log( - '%s matches found, %s already in db, %s to add', - Object.keys(player.match_ids).length, - docs.length, - Object.keys(player.match_ids).length - docs.length - ); - // iterate through db results, delete match_id key if this player has this match already - // will re-request and update matches where this player was previously anonymous - for (let i = 0; i < docs.length; i += 1) { - const matchId = docs[i].match_id; - delete player.match_ids[matchId]; - } - // make api_details requests for matches - const promiseFuncs = Object.keys(player.match_ids).map( - (matchId) => () => processMatch(matchId) - ); - await eachLimit(promiseFuncs, parallelism); - await updatePlayer(player); - } - cb(); - } catch (e) { - cb(err); - } + try { + await promisify(getApiMatchPage)(player, container.url); + } catch (err) { + // non-retryable error while scanning, user had a private account + console.log('error: %s', JSON.stringify(err)); + player.fh_unavailable = true; + await updatePlayer(player); + } + console.log('%s matches found', Object.keys(player.match_ids).length); + player.fh_unavailable = false; + // check what matches the player is already associated with + const docs = await getPlayerMatchesPromise(player.account_id, { + project: ['match_id'], }); + console.log( + '%s matches found, %s already in db, %s to add', + Object.keys(player.match_ids).length, + docs.length, + Object.keys(player.match_ids).length - docs.length + ); + // iterate through db results, delete match_id key if this player has this match already + // will re-request and update matches where this player was previously anonymous + for (let i = 0; i < docs.length; i += 1) { + const matchId = docs[i].match_id; + delete player.match_ids[matchId]; + } + // make api_details requests for matches + const promiseFuncs = Object.keys(player.match_ids).map( + (matchId) => () => processMatch(matchId) + ); + await eachLimit(promiseFuncs, parallelism); + await updatePlayer(player); } queue.runQueue('fhQueue', 10, processFullHistory); diff --git a/svc/gcdata.mjs b/svc/gcdata.mjs index ff193b8ec..5b9f2d5c4 100644 --- a/svc/gcdata.mjs +++ b/svc/gcdata.mjs @@ -4,16 +4,13 @@ import getGcData from '../store/getGcData.mjs'; import queue from '../store/queue.mjs'; import config from '../config.js'; import utility from '../util/utility.mjs'; + const { getRetrieverArr } = utility; const retrieverArr = getRetrieverArr(); -async function processGcData(job, cb) { + +async function processGcData(job) { job.useGcDataArr = true; - try { - await getGcData(job); - cb(); - } catch (e) { - cb(e); - } + await getGcData(job); } queue.runQueue( 'gcQueue', diff --git a/svc/mmr.mjs b/svc/mmr.mjs index c129a4536..4b1bd290b 100644 --- a/svc/mmr.mjs +++ b/svc/mmr.mjs @@ -10,37 +10,32 @@ import { getRetrieverArr, } from '../util/utility.mjs'; const retrieverArr = getRetrieverArr(); -async function processMmr(job, cb) { +async function processMmr(job) { const accountId = job.account_id; const urls = retrieverArr.map( (r) => `http://${r}?key=${config.RETRIEVER_SECRET}&account_id=${accountId}` ); - try { - const data = await getDataPromise({ url: urls }); - redisCount(redis, 'retriever_player'); - // NOTE: This leads to a massive number of updates on the player table - // Only write it sometimes, unless we're in dev mode - if (config.NODE_ENV === 'development' || Math.random() < 0.05) { - const player = { - account_id: job.account_id || null, - plus: Boolean(data.is_plus_subscriber), - }; - await insertPlayerPromise(db, player, false); - } - if ( - data.solo_competitive_rank || - data.competitive_rank || - data.rank_tier || - data.leaderboard_rank - ) { - data.account_id = job.account_id || null; - data.match_id = job.match_id || null; - data.time = new Date(); - await insertPlayerRating(data); - } - cb(); - } catch (e) { - cb(e); + const data = await getDataPromise({ url: urls }); + redisCount(redis, 'retriever_player'); + // NOTE: This leads to a massive number of updates on the player table + // Only write it sometimes, unless we're in dev mode + if (config.NODE_ENV === 'development' || Math.random() < 0.05) { + const player = { + account_id: job.account_id || null, + plus: Boolean(data.is_plus_subscriber), + }; + await insertPlayerPromise(db, player, false); + } + if ( + data.solo_competitive_rank || + data.competitive_rank || + data.rank_tier || + data.leaderboard_rank + ) { + data.account_id = job.account_id || null; + data.match_id = job.match_id || null; + data.time = new Date(); + await insertPlayerRating(data); } } queue.runQueue( diff --git a/svc/parser.mjs b/svc/parser.mjs index 7758dcfd6..50f5e9758 100755 --- a/svc/parser.mjs +++ b/svc/parser.mjs @@ -25,50 +25,42 @@ app.get('/healthz', (req, res) => { res.end('ok'); }); app.listen(PORT || PARSER_PORT); -async function parseProcessor(job, cb) { +async function parseProcessor(job) { const match = job; - try { - const gcdata = await getGcData(match); - let url = buildReplayUrl( - gcdata.match_id, - gcdata.cluster, - gcdata.replay_salt - ); - if (NODE_ENV === 'test') { - url = `https://odota.github.io/testfiles/${match.match_id}_1.dem`; - } - console.log('[PARSER] parsing replay at:', url); - const { stdout } = await execPromise( - `curl --max-time 180 --fail ${url} | ${ - url && url.slice(-3) === 'bz2' ? 'bunzip2' : 'cat' - } | curl -X POST -T - ${PARSER_HOST} | node processors/createParsedDataBlob.mjs ${ - match.match_id - }`, - { shell: true, maxBuffer: 10 * 1024 * 1024 } - ); - const result = { ...JSON.parse(stdout), ...match }; - await insertMatchPromise(result, { - type: 'parsed', - skipParse: true, - }); - // Mark this match parsed - await db.raw( - 'INSERT INTO parsed_matches(match_id) VALUES(?) ON CONFLICT DO NOTHING', - [Number(match.match_id)] - ); - // Decide if we want to do scenarios (requires parsed match) - // Only if it originated from scanner to avoid triggering on requests - if ( - match.origin === 'scanner' && - match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT - ) { - await queue.addJob('scenariosQueue', match.match_id); - } - console.log('[PARSER] completed parse of match %s', match.match_id); - cb(null, match.match_id); - } catch (e) { - cb(e); + const gcdata = await getGcData(match); + let url = buildReplayUrl(gcdata.match_id, gcdata.cluster, gcdata.replay_salt); + if (NODE_ENV === 'test') { + url = `https://odota.github.io/testfiles/${match.match_id}_1.dem`; } + console.log('[PARSER] parsing replay at:', url); + const { stdout } = await execPromise( + `curl --max-time 180 --fail ${url} | ${ + url && url.slice(-3) === 'bz2' ? 'bunzip2' : 'cat' + } | curl -X POST -T - ${PARSER_HOST} | node processors/createParsedDataBlob.mjs ${ + match.match_id + }`, + { shell: true, maxBuffer: 10 * 1024 * 1024 } + ); + const result = { ...JSON.parse(stdout), ...match }; + await insertMatchPromise(result, { + type: 'parsed', + skipParse: true, + }); + // Mark this match parsed + await db.raw( + 'INSERT INTO parsed_matches(match_id) VALUES(?) ON CONFLICT DO NOTHING', + [Number(match.match_id)] + ); + // Decide if we want to do scenarios (requires parsed match) + // Only if it originated from scanner to avoid triggering on requests + if ( + match.origin === 'scanner' && + match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT + ) { + await queue.addJob('scenariosQueue', match.match_id); + } + console.log('[PARSER] completed parse of match %s', match.match_id); + return true; } runReliableQueue( 'parse', diff --git a/svc/scenarios.mjs b/svc/scenarios.mjs index 48a598e99..f9079e847 100644 --- a/svc/scenarios.mjs +++ b/svc/scenarios.mjs @@ -6,48 +6,43 @@ import buildMatch from '../store/buildMatch.mjs'; import db from '../store/db.mjs'; import utility from '../util/utility.mjs'; import su from '../util/scenariosUtil.mjs'; -async function processScenarios(matchID, cb) { +async function processScenarios(matchID) { console.log('[SCENARIOS] match: %s', matchID); - try { - // Using buildMatch is unnecessarily expensive here since it also looks up player names etc. - const match = await buildMatch(matchID); - if (!su.validateMatchProperties(match)) { - console.error( - `Skipping scenario checks for match ${matchID}. Invalid match object.` - ); - return cb(); - } - const currentWeek = utility.epochWeek(); - Object.keys(su.scenarioChecks).forEach((table) => { - su.scenarioChecks[table].forEach((scenarioCheck) => { - const rows = scenarioCheck(match); - async.eachSeries(rows, (row, cb) => { - row = Object.assign(row, { - epoch_week: currentWeek, - wins: row.wins ? '1' : '0', - }); - const values = Object.keys(row).map(() => '?'); - const query = util.format( - 'INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET wins = %s.wins + EXCLUDED.wins, games = %s.games + 1', - table, - Object.keys(row).join(','), - values.join(','), - Object.keys(row) - .filter((column) => column !== 'wins') - .join(','), - table, - table - ); - db.raw( - query, - Object.keys(row).map((key) => row[key]) - ).asCallback(cb); + // Using buildMatch is unnecessarily expensive here since it also looks up player names etc. + const match = await buildMatch(matchID); + if (!su.validateMatchProperties(match)) { + console.error( + `Skipping scenario checks for match ${matchID}. Invalid match object.` + ); + return; + } + const currentWeek = utility.epochWeek(); + Object.keys(su.scenarioChecks).forEach((table) => { + su.scenarioChecks[table].forEach((scenarioCheck) => { + const rows = scenarioCheck(match); + async.eachSeries(rows, (row, cb) => { + row = Object.assign(row, { + epoch_week: currentWeek, + wins: row.wins ? '1' : '0', }); + const values = Object.keys(row).map(() => '?'); + const query = util.format( + 'INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET wins = %s.wins + EXCLUDED.wins, games = %s.games + 1', + table, + Object.keys(row).join(','), + values.join(','), + Object.keys(row) + .filter((column) => column !== 'wins') + .join(','), + table, + table + ); + db.raw( + query, + Object.keys(row).map((key) => row[key]) + ).asCallback(cb); }); }); - return cb(); - } catch (err) { - return cb(err); - } + }); } queue.runQueue('scenariosQueue', 1, processScenarios);