Skip to content

Commit

Permalink
old parsed match blob archival
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 1, 2023
1 parent b79d069 commit 861aa64
Show file tree
Hide file tree
Showing 10 changed files with 10,378 additions and 6,664 deletions.
5 changes: 4 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ const defaults = {
STRIPE_SECRET: "rk_test_gRqwhv4xqv0a1olp8kk8fZ94", // for stripe payment processing (kept on server)
STRIPE_API_PLAN: "plan_CgLthOgwrDgz2K", // plan id for stripe metering
ES_SEARCH_PERCENT: 0, // % of users to roll out elasticsearch to
TRACKED_ACCOUNT_URL: "", // URL where account IDs of tracked players can be found
MATCH_ARCHIVE_S3_KEY_ID: '', // S3-compatible key ID to archive parsed match blobs
MATCH_ARCHIVE_S3_KEY_SECRET: '', // S3-compatible key secret to archive parsed match blobs
MATCH_ARCHIVE_S3_ENDPOINT: '', // S3-compatible endpoint to archive parsed match blobs
MATCH_ARCHIVE_S3_BUCKET: 'opendota', // name of the S3 bucket to archive parsed match blobs
};
// ensure that process.env has all values in defaults, but prefer the process.env value
Object.keys(defaults).forEach((key) => {
Expand Down
18 changes: 18 additions & 0 deletions dev/archiveTest.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { archivePut, archiveGet } from "../store/archive.js";
import { getMatchData, getPlayerMatchData } from "../store/queries.js";

// Read some match data
const match = await getMatchData(7465883253);
const players = await getPlayerMatchData(7465883253);
const blob = Buffer.from(JSON.stringify({...match, players }));

// Archive it
await archivePut(match.match_id.toString(), blob);

// Read it back
const readBack = await archiveGet(match.match_id.toString());

console.log(blob.length, readBack.length);

// Confirm API returns the same data whether we used the archive or not

16,797 changes: 10,187 additions & 6,610 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"url": "http://github.com/odota/core"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.462.0",
"@elastic/elasticsearch": "^7.17.0",
"async": "^2.6.1",
"body-parser": "^1.19.0",
Expand Down
1 change: 0 additions & 1 deletion routes/spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,6 @@ The OpenDota API offers 50,000 free calls per month and a rate limit of 60 reque
},
route: () => "/explorer",
func: async (req, res) => {
// TODO handle NQL (@nicholashh query language)
const input = req.query.sql;
const client = new Client({
connectionString: config.READONLY_POSTGRES_URL,
Expand Down
65 changes: 65 additions & 0 deletions store/archive.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
const config = require("../config");
const { gzipSync, gunzipSync } = require('zlib');
const { S3Client, PutObjectCommand, GetObjectCommand } = require("@aws-sdk/client-s3");

const client = config.MATCH_ARCHIVE_S3_ENDPOINT ? new S3Client({
region: 'us-east-1',
credentials: {
accessKeyId: config.MATCH_ARCHIVE_S3_KEY_ID,
secretAccessKey: config.MATCH_ARCHIVE_S3_KEY_SECRET,
},
endpoint: 'https://' + config.MATCH_ARCHIVE_S3_ENDPOINT,
// any other options are passed to new AWS.S3()
// See: http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html#constructor-property
}) : null;

async function stream2buffer(stream) {
return new Promise((resolve, reject) => {
const _buf = [];
stream.on("data", (chunk) => _buf.push(chunk));
stream.on("end", () => resolve(Buffer.concat(_buf)));
stream.on("error", (err) => reject(err));
});
}

async function archiveGet(key) {
if (!client) {
return;
}
const command = new GetObjectCommand({
Bucket: config.MATCH_ARCHIVE_S3_BUCKET,
Key: key,
});
try {
const data = await client.send(command);
if (!data.Body) {
return;
}
const buffer = await stream2buffer(data.Body);
const result = gunzipSync(buffer);
console.log('[ARCHIVE] read %s bytes, decompressed %s bytes', buffer.length, result.length);
return result;
} catch (e) {
return;
}
}

async function archivePut(key, blob) {
if (!client) {
return;
}
const data = gzipSync(blob);
const command = new PutObjectCommand({
Bucket: config.MATCH_ARCHIVE_S3_BUCKET,
Key: key,
Body: data,
});
const result = await client.send(command);
console.log('[ARCHIVE] original %s bytes, archived %s bytes', blob.length, data.length);
return result;
}

module.exports = {
archiveGet,
archivePut,
};
88 changes: 37 additions & 51 deletions store/buildMatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,13 @@ const utility = require("../util/utility");
const cassandra = require("./cassandra");
const redis = require("./redis");
const db = require("./db");
const { archiveGet } = require("./archive");
const { getPlayerMatchData, getMatchData } = require("./queries");

const { computeMatchData } = compute;
const { deserialize, buildReplayUrl, isContributor } = utility;
const { buildReplayUrl, isContributor } = utility;
const getRedisAsync = promisify(redis.get).bind(redis);

async function getMatchData(matchId) {
const result = await cassandra.execute(
"SELECT * FROM matches where match_id = ?",
[Number(matchId)],
{
prepare: true,
fetchSize: 1,
autoPage: true,
}
);
const deserializedResult = result.rows.map((m) => deserialize(m));
return Promise.resolve(deserializedResult[0]);
}

async function getPlayerMatchData(matchId) {
const result = await cassandra.execute(
"SELECT * FROM player_matches where match_id = ?",
[Number(matchId)],
{
prepare: true,
fetchSize: 24,
autoPage: true,
}
);
const deserializedResult = result.rows.map((m) => deserialize(m));
return Promise.all(
deserializedResult.map((r) =>
db
.raw(
`
SELECT personaname, name, last_login
FROM players
LEFT JOIN notable_players
ON players.account_id = notable_players.account_id
WHERE players.account_id = ?
`,
[r.account_id]
)
.then((names) => ({ ...r, ...names.rows[0] }))
)
);
}

async function extendPlayerData(player, match) {
const p = {
...player,
Expand Down Expand Up @@ -136,7 +95,7 @@ async function backfill(matchId) {
},
() => {
// Count for logging
utility.redisCount(redis, "cassandra_repair");
utility.redisCount(redis, "steam_api_backfill");
resolve();
}
);
Expand All @@ -151,18 +110,27 @@ async function getMatch(matchId) {
}
let match = await getMatchData(matchId);
if (!match) {
// if we don't have it, try backfilling it from Steam API and then check again
// check the parsed match archive to see if we have it
const blob = await archiveGet(matchId.toString());
if (blob) {
match = JSON.parse(blob);
utility.redisCount(redis, "match_archive_read");
}
}
if (!match) {
// if we still don't have it, try backfilling it from Steam API and then check again
await backfill(matchId);
match = await getMatchData(matchId);
if (!match) {
// Still don't have it
return Promise.resolve();
}
}
if (!match) {
// Still don't have it
return Promise.resolve();
}
utility.redisCount(redis, "build_match");
let playersMatchData = [];
try {
playersMatchData = await getPlayerMatchData(matchId);
// If we fetched from archive we already have players
playersMatchData = match.players || await getPlayerMatchData(matchId);
if (playersMatchData.length === 0) {
throw new Error("no players found for match");
}
Expand All @@ -173,6 +141,7 @@ async function getMatch(matchId) {
e.message.startsWith("Unexpected") ||
e.message.includes("Attempt to access memory outside buffer bounds")
) {
utility.redisCount(redis, "cassandra_repair");
// Delete corrupted data and backfill
await cassandra.execute(
"DELETE FROM player_matches where match_id = ?",
Expand All @@ -185,6 +154,23 @@ async function getMatch(matchId) {
throw e;
}
}
// Get names, last login for players from DB
playersMatchData = await Promise.all(
playersMatchData.map((r) =>
db
.raw(
`
SELECT personaname, name, last_login
FROM players
LEFT JOIN notable_players
ON players.account_id = notable_players.account_id
WHERE players.account_id = ?
`,
[r.account_id]
)
.then((names) => ({ ...r, ...names.rows[0] }))
)
);
const playersPromise = Promise.all(
playersMatchData.map((p) => extendPlayerData(p, match))
);
Expand Down
6 changes: 6 additions & 0 deletions store/buildStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ module.exports = function buildStatus(db, redis, cb) {
requests_api_key_last_day(cb) {
utility.getRedisCountDay(redis, "request_api_key", cb);
},
steam_api_backfill_last_day(cb) {
utility.getRedisCountDay(redis, "steam_api_backfill", cb);
},
match_archive_read_last_day(cb) {
utility.getRedisCountDay(redis, "match_archive_read", cb);
},
cassandra_repair_last_day(cb) {
utility.getRedisCountDay(redis, "cassandra_repair", cb);
},
Expand Down
33 changes: 32 additions & 1 deletion store/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,8 @@ function insertMatch(match, options, cb) {
}

function upsertMatchCassandra(cb) {
// console.log('[INSERTMATCH] upserting into Cassandra');
// NOTE parsed insert doesn't have original match info so can't archive here
// unless we insert then read it back from cassandra
return cleanRowCassandra(cassandra, "matches", match, (err, match) => {
if (err) {
return cb(err);
Expand Down Expand Up @@ -1645,6 +1646,34 @@ function getMetadata(req, callback) {
);
}

async function getMatchData(matchId) {
const result = await cassandra.execute(
"SELECT * FROM matches where match_id = ?",
[Number(matchId)],
{
prepare: true,
fetchSize: 1,
autoPage: true,
}
);
const deserializedResult = result.rows.map((m) => deserialize(m));
return Promise.resolve(deserializedResult[0]);
}

async function getPlayerMatchData(matchId) {
const result = await cassandra.execute(
"SELECT * FROM player_matches where match_id = ?",
[Number(matchId)],
{
prepare: true,
fetchSize: 24,
autoPage: true,
}
);
const deserializedResult = result.rows.map((m) => deserialize(m));
return deserializedResult;
}

module.exports = {
upsert,
insertPlayer,
Expand Down Expand Up @@ -1673,4 +1702,6 @@ module.exports = {
getTeamScenarios,
getMetadata,
getMatchRankTier,
getMatchData,
getPlayerMatchData,
};
28 changes: 28 additions & 0 deletions svc/cassandraDelete.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
const crypto = require("crypto");
const cassandra = require("../store/cassandra");
const db = require("../store/db");
const { archivePut } = require("../store/archive");
const { getMatchData, getPlayerMatchData } = require("../store/queries");
const config = require("../config");

function genRandomNumber(byteCount, radix) {
return BigInt(`0x${ crypto.randomBytes(byteCount).toString("hex")}`).toString(
Expand Down Expand Up @@ -69,11 +72,36 @@ async function start() {
)
);
const parsedIds = result.rows.filter(result => result.version != null).map(result => result.match_id);
config.MATCH_ARCHIVE_S3_ENDPOINT && await Promise.all(parsedIds.map(id => doArchive(id)));

// TODO remove insert once backfill complete
await Promise.all(parsedIds.map(id => db.raw("INSERT INTO parsed_matches(match_id) VALUES(?) ON CONFLICT DO NOTHING", [Number(id)])));
} catch (e) {
console.log(e);
}
}
}

async function doArchive(matchId) {
// archive old parsed match blobs to s3 compatible storage
const match = await getMatchData(matchId);
const playerMatches = await getPlayerMatchData(matchId);
const blob = Buffer.from(JSON.stringify({...match, players: playerMatches }));
const result = await archivePut(matchId.toString(), blob);
if (result) {
// TODO Delete from Cassandra after archival
// await cassandra.execute("DELETE from matches where match_id = ?", [matchId], {
// prepare: true,
// });
// await cassandra.execute(
// "DELETE from player_matches where match_id = ?",
// [matchId],
// {
// prepare: true,
// }
// );
}
return;
}

start();

0 comments on commit 861aa64

Please sign in to comment.