Skip to content

Commit

Permalink
live reconcile for selected recent matches
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 27, 2024
1 parent d537587 commit 34847a5
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 36 deletions.
12 changes: 9 additions & 3 deletions svc/fullhistory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import db from '../store/db';
import { runQueue } from '../store/queue';
import { getPlayerMatches } from '../util/buildPlayer';
import { insertMatch } from '../util/insert';
import { insertMatch, reconcileMatch } from '../util/insert';
import { ApiMatch } from '../util/pgroup';

async function updatePlayer(player: FullHistoryJob) {
Expand Down Expand Up @@ -141,12 +141,18 @@ async function processFullHistory(job: FullHistoryJob) {
// Log the match IDs we should reconcile, we can query our own DB for data
const playerSlot = match.players.find(p => p.account_id === player.account_id)?.player_slot;
if (playerSlot != null) {
await db.raw('INSERT INTO player_match_history(account_id, match_id, player_slot) VALUES (?, ?, ?) ON CONFLICT DO NOTHING', [player.account_id, match.match_id, playerSlot]);
// Record it
const row = { account_id: player.account_id, match_id: match.match_id, player_slot: playerSlot };
await db.raw('INSERT INTO player_match_history(account_id, match_id, player_slot) VALUES (?, ?, ?) ON CONFLICT DO NOTHING', [row.account_id, row.match_id, row.player_slot]);
await redisCount('pmh_fullhistory');
// Recent match so we probably have data, process it in real time
if (row.match_id > 7500000000) {
await reconcileMatch([row]);
}
}
},
);
// Control number of match details requests to send at once--note this is per worker
// Control number of match details requests in parallel--note this is per worker
await eachLimitPromise(promiseFuncs, 1);
}
if (isMatchDataDisabled != null) {
Expand Down
36 changes: 3 additions & 33 deletions svc/reconcile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,12 @@
*/
import { invokeIntervalAsync } from "../util/utility";
import db from "../store/db";
import { upsertPlayerCaches } from "../util/insert";
import { getPGroup } from "../util/pgroup";
import { getMatchDataFromBlobWithMetadata } from "../util/buildMatch";

type RowType = {account_id: number, match_id: number, player_slot: number};
import { HistoryType, reconcileMatch } from "../util/insert";

async function doReconcile() {
// Fetch rows for a single match (could be multiple players to fill)
const { rows }: { rows: RowType[] } = await db.raw('SELECT * from player_match_history WHERE match_id = (SELECT match_id FROM player_match_history TABLESAMPLE SYSTEM_ROWS(1))');
// optional: Verify each player/match combination doesn't exist in player_caches (or we have parsed data to update)
const [match] = await getMatchDataFromBlobWithMetadata(rows[0].match_id);
if (!match) {
// Note: unless we backfill, we have limited API data for old matches (pre-2019ish)
return;
}
const pgroup = getPGroup(match);
// If reconciling after fullhistory, the pgroup won't contain account_id info. Add it.
rows.forEach(r => {
if (!pgroup[r.player_slot]?.account_id) {
pgroup[r.player_slot].account_id = r.account_id;
}
});
const targetSlots = new Set(rows.map(r => r.player_slot));
// Filter to only players that we want to fill in
match.players = match.players.filter(p => targetSlots.has(p.player_slot));
if (!match.players.length) {
return;
}
// Call upsertPlayerCaches: pgroup will be used to populate account_id and heroes fields (for peers search)
const result = await upsertPlayerCaches(match, undefined, pgroup, 'reconcile');
if (result.every(Boolean)) {
// Delete the rows since we successfully updated
await Promise.all(rows.map(async (row) => {
return db.raw('DELETE FROM player_match_history WHERE account_id = ? AND match_id = ?', [row.account_id, row.match_id]);
}));
}
const { rows }: { rows: HistoryType[] } = await db.raw('SELECT * from player_match_history WHERE match_id = (SELECT match_id FROM player_match_history TABLESAMPLE SYSTEM_ROWS(1))');
await reconcileMatch(rows);
}

invokeIntervalAsync(doReconcile, 500);
36 changes: 36 additions & 0 deletions util/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
} from './queries';
import { ApiMatch, ApiMatchPro, ApiPlayer, getPGroup } from './pgroup';
import { Archive } from '../store/archive';
import { getMatchDataFromBlobWithMetadata } from './buildMatch';
// import scylla from './scylla';

moment.relativeTimeThreshold('ss', 0);
Expand Down Expand Up @@ -220,6 +221,41 @@ export async function upsertPlayerCaches(
);
}

export type HistoryType = {account_id: number, match_id: number, player_slot: number};

export async function reconcileMatch(rows: HistoryType[]) {
// optional: Verify each player/match combination doesn't exist in player_caches (or we have parsed data to update)
const [match] = await getMatchDataFromBlobWithMetadata(rows[0].match_id);
if (!match) {
// Note: unless we backfill, we have limited API data for old matches
// For more recent matches we're more likely to have data
// Maybe we can mark the more recent matches with a flag
// Or queue up recent matches from fullhistory and process them in order so fh requests show updates quicker
return;
}
const pgroup = getPGroup(match);
// If reconciling after fullhistory, the pgroup won't contain account_id info. Add it.
rows.forEach(r => {
if (!pgroup[r.player_slot]?.account_id) {
pgroup[r.player_slot].account_id = r.account_id;
}
});
const targetSlots = new Set(rows.map(r => r.player_slot));
// Filter to only players that we want to fill in
match.players = match.players.filter(p => targetSlots.has(p.player_slot));
if (!match.players.length) {
return;
}
// Call upsertPlayerCaches: pgroup will be used to populate account_id and heroes fields (for peers search)
const result = await upsertPlayerCaches(match, undefined, pgroup, 'reconcile');
if (result.every(Boolean)) {
// Delete the rows since we successfully updated
await Promise.all(rows.map(async (row) => {
return db.raw('DELETE FROM player_match_history WHERE account_id = ? AND match_id = ?', [row.account_id, row.match_id]);
}));
}
}

export type InsertMatchInput = ApiMatch | ApiMatchPro | ParserMatch | GcMatch;

/**
Expand Down

0 comments on commit 34847a5

Please sign in to comment.