From b42687861eac88bbdc8db8d81bb966207d566d54 Mon Sep 17 00:00:00 2001 From: Howard Chung Date: Tue, 7 Jan 2025 08:59:14 +0000 Subject: [PATCH] export fetchers from definitions --- dev/archiveTest.mts | 6 ++---- dev/legacyArchive.ts | 4 +--- fetcher/getApiData.ts | 8 +++++--- fetcher/getArchivedData.ts | 8 ++++---- fetcher/getGcData.ts | 9 +++++---- fetcher/getMeta.ts | 7 ++++--- fetcher/getParsedData.ts | 8 ++++---- fetcher/getPlayerArchive.ts | 11 ++++------- store/archive.ts | 8 +++++++- svc/backfill.ts | 4 +--- svc/gcdata.ts | 6 ++---- svc/parser.ts | 10 +++------- util/archiveUtil.ts | 12 ++++-------- util/buildMatch.ts | 16 +++++----------- util/buildPlayer.ts | 4 +--- util/insert.ts | 4 +--- 16 files changed, 53 insertions(+), 72 deletions(-) diff --git a/dev/archiveTest.mts b/dev/archiveTest.mts index 2c59ea67d..26f38f04b 100644 --- a/dev/archiveTest.mts +++ b/dev/archiveTest.mts @@ -1,13 +1,11 @@ -import { ArchivedFetcher } from '../fetcher/getArchivedData.js'; +import { archivedFetcher } from '../fetcher/getArchivedData.js'; -const { Archive } = await import('../store/archive.js'); -const archivedFetcher = new ArchivedFetcher(); +const { matchArchive } = await import('../store/archive.js'); // Read some match data // const match = await getMatchDataFromBlob(7465883253); // const blob = Buffer.from(JSON.stringify(match)); -const archive = new Archive('match'); // Archive it // const putRes = await archive.archivePut(match.match_id?.toString() ?? '', blob); // console.log(putRes); diff --git a/dev/legacyArchive.ts b/dev/legacyArchive.ts index feef68ab9..d48903a4c 100644 --- a/dev/legacyArchive.ts +++ b/dev/legacyArchive.ts @@ -3,9 +3,7 @@ import cassandra from '../store/cassandra'; import config from '../config'; import db from '../store/db'; import { deserialize, redisCount } from '../util/utility'; -import { Archive } from '../store/archive'; - -const matchArchive = new Archive('match'); +import { matchArchive } from '../store/archive'; function randomBigInt(byteCount: number) { return BigInt(`0x${crypto.randomBytes(byteCount).toString('hex')}`); diff --git a/fetcher/getApiData.ts b/fetcher/getApiData.ts index 3bdc2d8c0..b8f003487 100644 --- a/fetcher/getApiData.ts +++ b/fetcher/getApiData.ts @@ -1,11 +1,10 @@ import { SteamAPIUrls, getSteamAPIData, redisCount } from '../util/utility'; -import { Archive } from '../store/archive'; +import { blobArchive } from '../store/archive'; import cassandra from '../store/cassandra'; import type { ApiMatch } from '../util/types'; import { MatchFetcher } from './base'; import { insertMatch } from '../util/insert'; -const blobArchive = new Archive('blob'); /** * Return API data by reading it without fetching. * @param matchId @@ -89,10 +88,13 @@ async function getOrFetchApiData(matchId: number): Promise<{ }; } -export class ApiFetcher extends MatchFetcher { +class ApiFetcher extends MatchFetcher { readData = readApiData; getOrFetchData = getOrFetchApiData; checkAvailable = () => { throw new Error('not implemented'); }; } + +export const apiFetcher = new ApiFetcher(); + diff --git a/fetcher/getArchivedData.ts b/fetcher/getArchivedData.ts index b2ffa6cbe..5e9dba238 100644 --- a/fetcher/getArchivedData.ts +++ b/fetcher/getArchivedData.ts @@ -1,11 +1,9 @@ import { isDataComplete, redisCount } from '../util/utility'; -import { Archive } from '../store/archive'; +import { matchArchive } from '../store/archive'; import db from '../store/db'; import { MatchFetcher } from './base'; import { getMatchDataFromBlobWithMetadata } from '../util/buildMatch'; -const matchArchive = new Archive('match'); - /** * Consolidates separate match data blobs and stores as a single blob in archive * @param matchId @@ -42,7 +40,7 @@ async function doArchiveMatchFromBlobs(matchId: number) { } } -export class ArchivedFetcher extends MatchFetcher { +class ArchivedFetcher extends MatchFetcher { readData = async (matchId: number): Promise => { try { // Check if the parsed data is archived @@ -79,3 +77,5 @@ export class ArchivedFetcher extends MatchFetcher { ); }; } + +export const archivedFetcher = new ArchivedFetcher(); \ No newline at end of file diff --git a/fetcher/getGcData.ts b/fetcher/getGcData.ts index e62ce0cbc..504f86a73 100644 --- a/fetcher/getGcData.ts +++ b/fetcher/getGcData.ts @@ -7,11 +7,9 @@ import { getRandomRetrieverUrl, redisCount } from '../util/utility'; import axios from 'axios'; import retrieverMatch from '../test/data/retriever_match.json'; import { insertMatch, upsertPlayer } from '../util/insert'; -import { Archive } from '../store/archive'; +import { blobArchive } from '../store/archive'; import { MatchFetcher } from './base'; -const blobArchive = new Archive('blob'); - /** * Return GC data by reading it without fetching. * @param matchId @@ -238,7 +236,7 @@ async function getOrFetchGcDataWithRetry( return { data, error }; } -export class GcdataFetcher extends MatchFetcher { +class GcdataFetcher extends MatchFetcher { readData = readGcData; getOrFetchData = getOrFetchGcData; getOrFetchDataWithRetry = getOrFetchGcDataWithRetry; @@ -246,3 +244,6 @@ export class GcdataFetcher extends MatchFetcher { throw new Error('not implemented'); }; } + +export const gcFetcher = new GcdataFetcher(); + diff --git a/fetcher/getMeta.ts b/fetcher/getMeta.ts index 5b856d7dd..9cf7934b1 100644 --- a/fetcher/getMeta.ts +++ b/fetcher/getMeta.ts @@ -3,7 +3,7 @@ import { exec } from 'child_process'; import { promisify } from 'util'; import { buildReplayUrl, redisCount } from '../util/utility'; import { MatchFetcher } from './base'; -import { GcdataFetcher } from './getGcData'; +import { gcFetcher } from './getGcData'; const execPromise = promisify(exec); // Get a sample meta file @@ -14,7 +14,6 @@ const builder = root.loadSync('./proto/dota_match_metadata.proto', { keepCase: true, }); const CDOTAMatchMetadataFile = builder.lookupType('CDOTAMatchMetadataFile'); -const gcFetcher = new GcdataFetcher(); async function getMeta(matchId: number) { const gcdata = await gcFetcher.readData(matchId, false); @@ -71,7 +70,7 @@ export async function getMetaFromUrl(url: string) { } } -export class MetaFetcher extends MatchFetcher> { +class MetaFetcher extends MatchFetcher> { readData = () => { throw new Error('not implemented'); }; @@ -80,3 +79,5 @@ export class MetaFetcher extends MatchFetcher> { throw new Error('not implemented'); }; } + +export const metaFetcher = new MetaFetcher(); diff --git a/fetcher/getParsedData.ts b/fetcher/getParsedData.ts index 28890395f..ce2397b77 100644 --- a/fetcher/getParsedData.ts +++ b/fetcher/getParsedData.ts @@ -1,14 +1,12 @@ import config from '../config'; import { getRandomParserUrl, redisCount } from '../util/utility'; -import { Archive } from '../store/archive'; +import { blobArchive } from '../store/archive'; import cassandra from '../store/cassandra'; import db from '../store/db'; import { insertMatch } from '../util/insert'; import axios from 'axios'; import { MatchFetcher } from './base'; -const blobArchive = new Archive('blob'); - /** * Return parse data by reading it without fetching. * @param matchId @@ -117,7 +115,7 @@ async function getOrFetchParseData( return { data: null, skipped: false, error }; } -export class ParsedFetcher extends MatchFetcher { +class ParsedFetcher extends MatchFetcher { readData = readParsedData; getOrFetchData = getOrFetchParseData; checkAvailable = async (matchId: number) => { @@ -130,3 +128,5 @@ export class ParsedFetcher extends MatchFetcher { ); }; } + +export const parsedFetcher = new ParsedFetcher(); diff --git a/fetcher/getPlayerArchive.ts b/fetcher/getPlayerArchive.ts index 04268b477..f081d8439 100644 --- a/fetcher/getPlayerArchive.ts +++ b/fetcher/getPlayerArchive.ts @@ -1,13 +1,8 @@ import { PutObjectCommandOutput } from '@aws-sdk/client-s3'; -import config from '../config'; -import { Archive } from '../store/archive'; +import { playerArchive } from '../store/archive'; import { getFullPlayerMatchesWithMetadata } from '../util/buildPlayer'; import { PlayerFetcher } from './base'; -const playerArchive = config.ENABLE_PLAYER_ARCHIVE - ? new Archive('player') - : null; - async function doArchivePlayerMatches( accountId: number, ): Promise { @@ -55,10 +50,12 @@ async function readArchivedPlayerMatches( return arr; } -export class PlayerArchiveFetcher extends PlayerFetcher { +class PlayerArchiveFetcher extends PlayerFetcher { readData = readArchivedPlayerMatches; getOrFetchData = async (accountId: number) => { await doArchivePlayerMatches(accountId); return this.readData(accountId); }; } + +export const playerArchiveFetcher = new PlayerArchiveFetcher(); \ No newline at end of file diff --git a/store/archive.ts b/store/archive.ts index c20b1fb84..148fae48f 100644 --- a/store/archive.ts +++ b/store/archive.ts @@ -19,7 +19,7 @@ async function stream2buffer(stream: any): Promise { }); } -export class Archive { +class Archive { private endpoint: string = ''; private accessKeyId: string = ''; private secretAccessKey: string = ''; @@ -156,3 +156,9 @@ export class Archive { } }; } + +export const blobArchive = new Archive('blob'); +export const playerArchive = config.ENABLE_PLAYER_ARCHIVE + ? new Archive('player') + : null; +export const matchArchive = new Archive('match'); \ No newline at end of file diff --git a/svc/backfill.ts b/svc/backfill.ts index 646c2db78..1bb0ba4c2 100644 --- a/svc/backfill.ts +++ b/svc/backfill.ts @@ -1,6 +1,6 @@ // Fetches old matches from Steam API and writes to blob storage import config from '../config'; -import { Archive } from '../store/archive'; +import { blobArchive } from '../store/archive'; import type { ApiMatch } from '../util/types'; import { SteamAPIUrls, getSteamAPIData, transformMatch, getApiHosts } from '../util/utility'; import fs from 'fs'; @@ -12,8 +12,6 @@ import redis from '../store/redis'; // ARCHIVE_S3_KEY_SECRET: 'minioadmin', // ARCHIVE_S3_ENDPOINT: 'http://localhost:9000', -const blobArchive = new Archive('blob'); - // current run started at 5000000000 const stop = Number(process.env.BACKFILL_STOP) || 6200000000; async function scanApi() { diff --git a/svc/gcdata.ts b/svc/gcdata.ts index 3997aef79..06cc31880 100644 --- a/svc/gcdata.ts +++ b/svc/gcdata.ts @@ -2,12 +2,10 @@ // The parser will also request gcdata if needed import { runQueue } from '../store/queue'; import config from '../config'; -import { ApiFetcher } from '../fetcher/getApiData'; -import { GcdataFetcher } from '../fetcher/getGcData'; +import { apiFetcher } from '../fetcher/getApiData'; +import { gcFetcher } from '../fetcher/getGcData'; import { getPGroup } from '../util/pgroup'; -const apiFetcher = new ApiFetcher(); -const gcFetcher = new GcdataFetcher(); async function processGcData(job: GcDataJob) { const matchId = job.match_id; diff --git a/svc/parser.ts b/svc/parser.ts index 7d5f3916d..8645c8768 100755 --- a/svc/parser.ts +++ b/svc/parser.ts @@ -10,16 +10,12 @@ import { runReliableQueue } from '../store/queue'; import c from 'ansi-colors'; import { buildReplayUrl, redisCount } from '../util/utility'; import redis from '../store/redis'; -import { ApiFetcher } from '../fetcher/getApiData'; -import { ParsedFetcher } from '../fetcher/getParsedData'; -import { GcdataFetcher } from '../fetcher/getGcData'; +import { apiFetcher } from '../fetcher/getApiData'; +import { parsedFetcher } from '../fetcher/getParsedData'; +import { gcFetcher } from '../fetcher/getGcData'; import { getPGroup } from '../util/pgroup'; import moment from 'moment'; -import db from '../store/db'; -const apiFetcher = new ApiFetcher(); -const gcFetcher = new GcdataFetcher(); -const parsedFetcher = new ParsedFetcher(); const { PARSER_PARALLELISM } = config; async function parseProcessor(job: ParseJob, metadata: JobMetadata) { diff --git a/util/archiveUtil.ts b/util/archiveUtil.ts index 26724277f..f858b9a23 100644 --- a/util/archiveUtil.ts +++ b/util/archiveUtil.ts @@ -1,17 +1,13 @@ import config from '../config'; -import { Archive } from '../store/archive'; +import { blobArchive } from '../store/archive'; import cassandra from '../store/cassandra'; import QueryStream from 'pg-query-stream'; import { Client } from 'pg'; import crypto from 'crypto'; import db from '../store/db'; -import { ApiFetcher } from '../fetcher/getApiData'; -import { GcdataFetcher } from '../fetcher/getGcData'; -import { ParsedFetcher } from '../fetcher/getParsedData'; -const apiFetcher = new ApiFetcher(); -const gcFetcher = new GcdataFetcher(); -const parsedFetcher = new ParsedFetcher(); -const blobArchive = new Archive('blob'); +import { apiFetcher } from '../fetcher/getApiData'; +import { gcFetcher } from '../fetcher/getGcData'; +import { parsedFetcher } from '../fetcher/getParsedData'; async function processMatch(matchId: number) { // Check if we should archive the blobs (should be parsed and not archived) diff --git a/util/buildMatch.ts b/util/buildMatch.ts index 850679a81..280542de9 100644 --- a/util/buildMatch.ts +++ b/util/buildMatch.ts @@ -11,19 +11,13 @@ import { import redis from '../store/redis'; import db from '../store/db'; import { ApiMatch } from './types'; -import { ParsedFetcher } from '../fetcher/getParsedData'; -import { ApiFetcher } from '../fetcher/getApiData'; -import { GcdataFetcher } from '../fetcher/getGcData'; -import { ArchivedFetcher } from '../fetcher/getArchivedData'; -import { MetaFetcher } from '../fetcher/getMeta'; +import { parsedFetcher } from '../fetcher/getParsedData'; +import { apiFetcher } from '../fetcher/getApiData'; +import { gcFetcher } from '../fetcher/getGcData'; +import { archivedFetcher } from '../fetcher/getArchivedData'; +import { metaFetcher } from '../fetcher/getMeta'; import { benchmarks } from './benchmarksUtil'; -const apiFetcher = new ApiFetcher(); -const gcFetcher = new GcdataFetcher(); -const parsedFetcher = new ParsedFetcher(); -const archivedFetcher = new ArchivedFetcher(); -const metaFetcher = new MetaFetcher(); - function extendPlayerData( player: Player | ParsedPlayer, match: Match | ParsedMatch, diff --git a/util/buildPlayer.ts b/util/buildPlayer.ts index d295bb278..1a97af791 100644 --- a/util/buildPlayer.ts +++ b/util/buildPlayer.ts @@ -7,9 +7,7 @@ import { deserialize, pick, redisCount, redisCountDistinct } from './utility'; import { gzipSync, gunzipSync } from 'zlib'; import { cacheableCols } from '../routes/playerFields'; import { promises as fs } from 'fs'; -import { PlayerArchiveFetcher } from '../fetcher/getPlayerArchive'; - -const playerArchiveFetcher = new PlayerArchiveFetcher(); +import { playerArchiveFetcher } from '../fetcher/getPlayerArchive'; export async function getPlayerMatches( accountId: number, diff --git a/util/insert.ts b/util/insert.ts index 694d10f64..c1e0df479 100644 --- a/util/insert.ts +++ b/util/insert.ts @@ -27,14 +27,12 @@ import { isRecentlyVisited, } from './queries'; import { getPGroup } from './pgroup'; -import { Archive } from '../store/archive'; +import { blobArchive } from '../store/archive'; import type { ApiMatch, ApiPlayer, InsertMatchInput } from './types'; import { upsertPlayerCaches } from './playerCaches'; moment.relativeTimeThreshold('ss', 0); -const blobArchive = new Archive('blob'); - export async function upsert( db: knex.Knex, table: string,