Skip to content

Commit

Permalink
mostly use await in fullhistory
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 3, 2023
1 parent 88e01e6 commit 67f8466
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 110 deletions.
2 changes: 2 additions & 0 deletions store/queries.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ function getMmrEstimate(accountId, cb) {
.where({ account_id: accountId })
.asCallback(cb);
}
export const getPlayerMatchesPromise = util.promisify(getPlayerMatches);
function getPlayerMatches(accountId, queryObj, cb) {
// Validate accountId
if (!accountId || Number.isNaN(Number(accountId)) || Number(accountId) <= 0) {
Expand Down Expand Up @@ -1632,6 +1633,7 @@ export default {
getMatchRating,
getLeaderboard,
getPlayerMatches,
getPlayerMatchesPromise,
getPlayerRatings,
getPlayerHeroRankings,
getPlayer,
Expand Down
196 changes: 86 additions & 110 deletions svc/fullhistory.mjs
Original file line number Diff line number Diff line change
@@ -1,41 +1,72 @@
// Processes a queue of full history/refresh requests for players
import async from 'async';
import urllib from 'url';
import config from '../config.js';
import { redisCount, getData, generateJob } from '../util/utility.mjs';
import {
redisCount,
getData,
getDataPromise,
generateJob,
eachLimit,
} from '../util/utility.mjs';
import db from '../store/db.mjs';
import redis from '../store/redis.mjs';
import queue from '../store/queue.mjs';
import queries from '../store/queries.mjs';
const { insertMatchPromise } = queries;
import {
insertMatchPromise,
getPlayerMatchesPromise,
} from '../store/queries.mjs';
const apiKeys = config.STEAM_API_KEY.split(',');
// number of api requests to send at once
const parallelism = Math.min(20, apiKeys.length);
const parallelism = Math.min(40, apiKeys.length);

async function updatePlayer(player) {
// done with this player, update
await db('players')
.update({
full_history_time: new Date(),
fh_unavailable: player.fh_unavailable,
})
.where({
account_id: player.account_id,
});
console.log('got full match history for %s', player.account_id);
redisCount(redis, 'fullhistory');
}

async function processMatch(matchId) {
const container = generateJob('api_details', {
match_id: Number(matchId),
});
const body = await getDataPromise(container.url);
const match = body.result;
await insertMatchPromise(match, {
type: 'api',
skipParse: true,
});
}

function processFullHistory(job, cb) {
function updatePlayer(player, cb) {
// done with this player, update
db('players')
.update({
full_history_time: new Date(),
fh_unavailable: player.fh_unavailable,
})
.where({
account_id: player.account_id,
})
.asCallback((err) => {
if (err) {
return cb(err);
}
console.log('got full match history for %s', player.account_id);
redisCount(redis, 'fullhistory');
return cb(err);
});
const player = job;
if (Number(player.account_id) === 0) {
return cb();
}
function getApiMatchPage(player, url, cb) {
// if test or only want last 100 (no paging), set short_history
// const heroArray = job.short_history || config.NODE_ENV === 'test' ? ['0'] : Object.keys(constants.heroes);
// As of December 2021 filtering by hero ID doesn't work
// const heroArray = ['0'];
const heroId = '0';
// use steamapi via specific player history and specific hero id (up to 500 games per hero)
player.match_ids = {};
// make a request for every possible hero
const container = generateJob('api_history', {
account_id: player.account_id,
hero_id: heroId,
matches_requested: 100,
});
const getApiMatchPage = (player, url, cb) => {
getData(url, (err, body) => {
if (err) {
// non-retryable error, probably the user's account is private
console.log('non-retryable error');
return cb(err);
}
// if !body.result, retry
Expand All @@ -54,7 +85,7 @@ function processFullHistory(job, cb) {
const rem = body.result.results_remaining;
if (rem === 0 || player.short_history) {
// no more pages
return cb(err);
return cb();
}
// paginate through to max 500 games if necessary with start_at_match_id=
const parse = urllib.parse(url, true);
Expand All @@ -63,97 +94,42 @@ function processFullHistory(job, cb) {
url = urllib.format(parse);
return getApiMatchPage(player, url, cb);
});
}
const player = job;
if (Number(player.account_id) === 0) {
return cb();
}
// if test or only want last 100 (no paging), set short_history
// const heroArray = job.short_history || config.NODE_ENV === 'test' ? ['0'] : Object.keys(constants.heroes);
// As of December 2021 filtering by hero ID doesn't work
const heroArray = ['0'];
// use steamapi via specific player history and specific hero id (up to 500 games per hero)
player.match_ids = {};
return async.eachLimit(
heroArray,
parallelism,
(heroId, cb) => {
// make a request for every possible hero
const container = generateJob('api_history', {
account_id: player.account_id,
hero_id: heroId,
matches_requested: 100,
});
getApiMatchPage(player, container.url, (err) => {
console.log('%s matches found', Object.keys(player.match_ids).length);
cb(err);
});
},
(err) => {
player.fh_unavailable = Boolean(err);
};
getApiMatchPage(player, container.url, async (err) => {
console.log('%s matches found', Object.keys(player.match_ids).length);
player.fh_unavailable = Boolean(err);
try {
if (err) {
// non-retryable error while scanning, user had a private account
console.log('error: %s', JSON.stringify(err));
updatePlayer(player, cb);
await updatePlayer(player);
} else {
// check what matches the player is already associated with
queries.getPlayerMatches(
player.account_id,
{
project: ['match_id'],
},
(err, docs) => {
if (err) {
return cb(err);
}
console.log(
'%s matches found, %s already in db, %s to add',
Object.keys(player.match_ids).length,
docs.length,
Object.keys(player.match_ids).length - docs.length
);
// iterate through db results, delete match_id key if this player has this match already
// will re-request and update matches where this player was previously anonymous
for (let i = 0; i < docs.length; i += 1) {
const matchId = docs[i].match_id;
delete player.match_ids[matchId];
}
// iterate through keys, make api_details requests
return async.eachLimit(
Object.keys(player.match_ids),
parallelism,
(matchId, cb) => {
// process api jobs directly with parallelism
const container = generateJob('api_details', {
match_id: Number(matchId),
});
getData(container.url, async (err, body) => {
if (err) {
return cb(err);
}
const match = body.result;
try {
await insertMatchPromise(match, {
type: 'api',
skipParse: true,
});
cb();
} catch (e) {
cb(e);
}
});
},
(err) => {
if (err) {
return cb(err);
}
return updatePlayer(player, cb);
}
);
}
const docs = await getPlayerMatchesPromise(player.account_id, {
project: ['match_id'],
});
console.log(
'%s matches found, %s already in db, %s to add',
Object.keys(player.match_ids).length,
docs.length,
Object.keys(player.match_ids).length - docs.length
);
// iterate through db results, delete match_id key if this player has this match already
// will re-request and update matches where this player was previously anonymous
for (let i = 0; i < docs.length; i += 1) {
const matchId = docs[i].match_id;
delete player.match_ids[matchId];
}
// make api_details requests for matches
const promises = Object.keys(player.match_ids).map((matchId) =>
processMatch(matchId)
);
await eachLimit(promises, parallelism);
await updatePlayer(player);
}
} catch (e) {
cb(err);
}
);
});
}
queue.runQueue('fhQueue', 1, processFullHistory);

0 comments on commit 67f8466

Please sign in to comment.