From 34847a5ff77eaab38089dea53155ca718bc0a223 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 27 Dec 2024 20:20:09 +0000 Subject: [PATCH] live reconcile for selected recent matches --- svc/fullhistory.ts | 12 +++++++++--- svc/reconcile.ts | 36 +++--------------------------------- util/insert.ts | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/svc/fullhistory.ts b/svc/fullhistory.ts index 0dbcd392e..182d6b63b 100644 --- a/svc/fullhistory.ts +++ b/svc/fullhistory.ts @@ -10,7 +10,7 @@ import { import db from '../store/db'; import { runQueue } from '../store/queue'; import { getPlayerMatches } from '../util/buildPlayer'; -import { insertMatch } from '../util/insert'; +import { insertMatch, reconcileMatch } from '../util/insert'; import { ApiMatch } from '../util/pgroup'; async function updatePlayer(player: FullHistoryJob) { @@ -141,12 +141,18 @@ async function processFullHistory(job: FullHistoryJob) { // Log the match IDs we should reconcile, we can query our own DB for data const playerSlot = match.players.find(p => p.account_id === player.account_id)?.player_slot; if (playerSlot != null) { - await db.raw('INSERT INTO player_match_history(account_id, match_id, player_slot) VALUES (?, ?, ?) ON CONFLICT DO NOTHING', [player.account_id, match.match_id, playerSlot]); + // Record it + const row = { account_id: player.account_id, match_id: match.match_id, player_slot: playerSlot }; + await db.raw('INSERT INTO player_match_history(account_id, match_id, player_slot) VALUES (?, ?, ?) ON CONFLICT DO NOTHING', [row.account_id, row.match_id, row.player_slot]); await redisCount('pmh_fullhistory'); + // Recent match so we probably have data, process it in real time + if (row.match_id > 7500000000) { + await reconcileMatch([row]); + } } }, ); - // Control number of match details requests to send at once--note this is per worker + // Control number of match details requests in parallel--note this is per worker await eachLimitPromise(promiseFuncs, 1); } if (isMatchDataDisabled != null) { diff --git a/svc/reconcile.ts b/svc/reconcile.ts index 5ed878093..a9b81077c 100644 --- a/svc/reconcile.ts +++ b/svc/reconcile.ts @@ -6,42 +6,12 @@ */ import { invokeIntervalAsync } from "../util/utility"; import db from "../store/db"; -import { upsertPlayerCaches } from "../util/insert"; -import { getPGroup } from "../util/pgroup"; -import { getMatchDataFromBlobWithMetadata } from "../util/buildMatch"; - -type RowType = {account_id: number, match_id: number, player_slot: number}; +import { HistoryType, reconcileMatch } from "../util/insert"; async function doReconcile() { // Fetch rows for a single match (could be multiple players to fill) - const { rows }: { rows: RowType[] } = await db.raw('SELECT * from player_match_history WHERE match_id = (SELECT match_id FROM player_match_history TABLESAMPLE SYSTEM_ROWS(1))'); - // optional: Verify each player/match combination doesn't exist in player_caches (or we have parsed data to update) - const [match] = await getMatchDataFromBlobWithMetadata(rows[0].match_id); - if (!match) { - // Note: unless we backfill, we have limited API data for old matches (pre-2019ish) - return; - } - const pgroup = getPGroup(match); - // If reconciling after fullhistory, the pgroup won't contain account_id info. Add it. - rows.forEach(r => { - if (!pgroup[r.player_slot]?.account_id) { - pgroup[r.player_slot].account_id = r.account_id; - } - }); - const targetSlots = new Set(rows.map(r => r.player_slot)); - // Filter to only players that we want to fill in - match.players = match.players.filter(p => targetSlots.has(p.player_slot)); - if (!match.players.length) { - return; - } - // Call upsertPlayerCaches: pgroup will be used to populate account_id and heroes fields (for peers search) - const result = await upsertPlayerCaches(match, undefined, pgroup, 'reconcile'); - if (result.every(Boolean)) { - // Delete the rows since we successfully updated - await Promise.all(rows.map(async (row) => { - return db.raw('DELETE FROM player_match_history WHERE account_id = ? AND match_id = ?', [row.account_id, row.match_id]); - })); - } + const { rows }: { rows: HistoryType[] } = await db.raw('SELECT * from player_match_history WHERE match_id = (SELECT match_id FROM player_match_history TABLESAMPLE SYSTEM_ROWS(1))'); + await reconcileMatch(rows); } invokeIntervalAsync(doReconcile, 500); diff --git a/util/insert.ts b/util/insert.ts index ba1e82fe2..ea5b86c4c 100644 --- a/util/insert.ts +++ b/util/insert.ts @@ -28,6 +28,7 @@ import { } from './queries'; import { ApiMatch, ApiMatchPro, ApiPlayer, getPGroup } from './pgroup'; import { Archive } from '../store/archive'; +import { getMatchDataFromBlobWithMetadata } from './buildMatch'; // import scylla from './scylla'; moment.relativeTimeThreshold('ss', 0); @@ -220,6 +221,41 @@ export async function upsertPlayerCaches( ); } +export type HistoryType = {account_id: number, match_id: number, player_slot: number}; + +export async function reconcileMatch(rows: HistoryType[]) { + // optional: Verify each player/match combination doesn't exist in player_caches (or we have parsed data to update) + const [match] = await getMatchDataFromBlobWithMetadata(rows[0].match_id); + if (!match) { + // Note: unless we backfill, we have limited API data for old matches + // For more recent matches we're more likely to have data + // Maybe we can mark the more recent matches with a flag + // Or queue up recent matches from fullhistory and process them in order so fh requests show updates quicker + return; + } + const pgroup = getPGroup(match); + // If reconciling after fullhistory, the pgroup won't contain account_id info. Add it. + rows.forEach(r => { + if (!pgroup[r.player_slot]?.account_id) { + pgroup[r.player_slot].account_id = r.account_id; + } + }); + const targetSlots = new Set(rows.map(r => r.player_slot)); + // Filter to only players that we want to fill in + match.players = match.players.filter(p => targetSlots.has(p.player_slot)); + if (!match.players.length) { + return; + } + // Call upsertPlayerCaches: pgroup will be used to populate account_id and heroes fields (for peers search) + const result = await upsertPlayerCaches(match, undefined, pgroup, 'reconcile'); + if (result.every(Boolean)) { + // Delete the rows since we successfully updated + await Promise.all(rows.map(async (row) => { + return db.raw('DELETE FROM player_match_history WHERE account_id = ? AND match_id = ?', [row.account_id, row.match_id]); + })); + } +} + export type InsertMatchInput = ApiMatch | ApiMatchPro | ParserMatch | GcMatch; /**