Skip to content

Commit

Permalink
export fetchers from definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Jan 7, 2025
1 parent 537edaf commit b426878
Show file tree
Hide file tree
Showing 16 changed files with 53 additions and 72 deletions.
6 changes: 2 additions & 4 deletions dev/archiveTest.mts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { ArchivedFetcher } from '../fetcher/getArchivedData.js';
import { archivedFetcher } from '../fetcher/getArchivedData.js';

const { Archive } = await import('../store/archive.js');
const archivedFetcher = new ArchivedFetcher();
const { matchArchive } = await import('../store/archive.js');

// Read some match data
// const match = await getMatchDataFromBlob(7465883253);
// const blob = Buffer.from(JSON.stringify(match));

const archive = new Archive('match');
// Archive it
// const putRes = await archive.archivePut(match.match_id?.toString() ?? '', blob);
// console.log(putRes);
Expand Down
4 changes: 1 addition & 3 deletions dev/legacyArchive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ import cassandra from '../store/cassandra';
import config from '../config';
import db from '../store/db';
import { deserialize, redisCount } from '../util/utility';
import { Archive } from '../store/archive';

const matchArchive = new Archive('match');
import { matchArchive } from '../store/archive';

function randomBigInt(byteCount: number) {
return BigInt(`0x${crypto.randomBytes(byteCount).toString('hex')}`);
Expand Down
8 changes: 5 additions & 3 deletions fetcher/getApiData.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { SteamAPIUrls, getSteamAPIData, redisCount } from '../util/utility';
import { Archive } from '../store/archive';
import { blobArchive } from '../store/archive';
import cassandra from '../store/cassandra';
import type { ApiMatch } from '../util/types';
import { MatchFetcher } from './base';
import { insertMatch } from '../util/insert';

const blobArchive = new Archive('blob');
/**
* Return API data by reading it without fetching.
* @param matchId
Expand Down Expand Up @@ -89,10 +88,13 @@ async function getOrFetchApiData(matchId: number): Promise<{
};
}

export class ApiFetcher extends MatchFetcher<ApiMatch> {
class ApiFetcher extends MatchFetcher<ApiMatch> {
readData = readApiData;
getOrFetchData = getOrFetchApiData;
checkAvailable = () => {
throw new Error('not implemented');
};
}

export const apiFetcher = new ApiFetcher();

8 changes: 4 additions & 4 deletions fetcher/getArchivedData.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { isDataComplete, redisCount } from '../util/utility';
import { Archive } from '../store/archive';
import { matchArchive } from '../store/archive';
import db from '../store/db';
import { MatchFetcher } from './base';
import { getMatchDataFromBlobWithMetadata } from '../util/buildMatch';

const matchArchive = new Archive('match');

/**
* Consolidates separate match data blobs and stores as a single blob in archive
* @param matchId
Expand Down Expand Up @@ -42,7 +40,7 @@ async function doArchiveMatchFromBlobs(matchId: number) {
}
}

export class ArchivedFetcher extends MatchFetcher<ParsedMatch> {
class ArchivedFetcher extends MatchFetcher<ParsedMatch> {
readData = async (matchId: number): Promise<ParsedMatch | null> => {
try {
// Check if the parsed data is archived
Expand Down Expand Up @@ -79,3 +77,5 @@ export class ArchivedFetcher extends MatchFetcher<ParsedMatch> {
);
};
}

export const archivedFetcher = new ArchivedFetcher();
9 changes: 5 additions & 4 deletions fetcher/getGcData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import { getRandomRetrieverUrl, redisCount } from '../util/utility';
import axios from 'axios';
import retrieverMatch from '../test/data/retriever_match.json';
import { insertMatch, upsertPlayer } from '../util/insert';
import { Archive } from '../store/archive';
import { blobArchive } from '../store/archive';
import { MatchFetcher } from './base';

const blobArchive = new Archive('blob');

/**
* Return GC data by reading it without fetching.
* @param matchId
Expand Down Expand Up @@ -238,11 +236,14 @@ async function getOrFetchGcDataWithRetry(
return { data, error };
}

export class GcdataFetcher extends MatchFetcher<GcMatch> {
class GcdataFetcher extends MatchFetcher<GcMatch> {
readData = readGcData;
getOrFetchData = getOrFetchGcData;
getOrFetchDataWithRetry = getOrFetchGcDataWithRetry;
checkAvailable = async (matchId: number) => {
throw new Error('not implemented');
};
}

export const gcFetcher = new GcdataFetcher();

7 changes: 4 additions & 3 deletions fetcher/getMeta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { exec } from 'child_process';
import { promisify } from 'util';
import { buildReplayUrl, redisCount } from '../util/utility';
import { MatchFetcher } from './base';
import { GcdataFetcher } from './getGcData';
import { gcFetcher } from './getGcData';
const execPromise = promisify(exec);

// Get a sample meta file
Expand All @@ -14,7 +14,6 @@ const builder = root.loadSync('./proto/dota_match_metadata.proto', {
keepCase: true,
});
const CDOTAMatchMetadataFile = builder.lookupType('CDOTAMatchMetadataFile');
const gcFetcher = new GcdataFetcher();

async function getMeta(matchId: number) {
const gcdata = await gcFetcher.readData(matchId, false);
Expand Down Expand Up @@ -71,7 +70,7 @@ export async function getMetaFromUrl(url: string) {
}
}

export class MetaFetcher extends MatchFetcher<Record<string, any>> {
class MetaFetcher extends MatchFetcher<Record<string, any>> {
readData = () => {
throw new Error('not implemented');
};
Expand All @@ -80,3 +79,5 @@ export class MetaFetcher extends MatchFetcher<Record<string, any>> {
throw new Error('not implemented');
};
}

export const metaFetcher = new MetaFetcher();
8 changes: 4 additions & 4 deletions fetcher/getParsedData.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import config from '../config';
import { getRandomParserUrl, redisCount } from '../util/utility';
import { Archive } from '../store/archive';
import { blobArchive } from '../store/archive';
import cassandra from '../store/cassandra';
import db from '../store/db';
import { insertMatch } from '../util/insert';
import axios from 'axios';
import { MatchFetcher } from './base';

const blobArchive = new Archive('blob');

/**
* Return parse data by reading it without fetching.
* @param matchId
Expand Down Expand Up @@ -117,7 +115,7 @@ async function getOrFetchParseData(
return { data: null, skipped: false, error };
}

export class ParsedFetcher extends MatchFetcher<ParserMatch> {
class ParsedFetcher extends MatchFetcher<ParserMatch> {
readData = readParsedData;
getOrFetchData = getOrFetchParseData;
checkAvailable = async (matchId: number) => {
Expand All @@ -130,3 +128,5 @@ export class ParsedFetcher extends MatchFetcher<ParserMatch> {
);
};
}

export const parsedFetcher = new ParsedFetcher();
11 changes: 4 additions & 7 deletions fetcher/getPlayerArchive.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import { PutObjectCommandOutput } from '@aws-sdk/client-s3';
import config from '../config';
import { Archive } from '../store/archive';
import { playerArchive } from '../store/archive';
import { getFullPlayerMatchesWithMetadata } from '../util/buildPlayer';
import { PlayerFetcher } from './base';

const playerArchive = config.ENABLE_PLAYER_ARCHIVE
? new Archive('player')
: null;

async function doArchivePlayerMatches(
accountId: number,
): Promise<PutObjectCommandOutput | null> {
Expand Down Expand Up @@ -55,10 +50,12 @@ async function readArchivedPlayerMatches(
return arr;
}

export class PlayerArchiveFetcher extends PlayerFetcher<ParsedPlayerMatch[]> {
class PlayerArchiveFetcher extends PlayerFetcher<ParsedPlayerMatch[]> {
readData = readArchivedPlayerMatches;
getOrFetchData = async (accountId: number) => {
await doArchivePlayerMatches(accountId);
return this.readData(accountId);
};
}

export const playerArchiveFetcher = new PlayerArchiveFetcher();
8 changes: 7 additions & 1 deletion store/archive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async function stream2buffer(stream: any): Promise<Buffer> {
});
}

export class Archive {
class Archive {
private endpoint: string = '';
private accessKeyId: string = '';
private secretAccessKey: string = '';
Expand Down Expand Up @@ -156,3 +156,9 @@ export class Archive {
}
};
}

export const blobArchive = new Archive('blob');
export const playerArchive = config.ENABLE_PLAYER_ARCHIVE
? new Archive('player')
: null;
export const matchArchive = new Archive('match');
4 changes: 1 addition & 3 deletions svc/backfill.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Fetches old matches from Steam API and writes to blob storage
import config from '../config';
import { Archive } from '../store/archive';
import { blobArchive } from '../store/archive';
import type { ApiMatch } from '../util/types';
import { SteamAPIUrls, getSteamAPIData, transformMatch, getApiHosts } from '../util/utility';
import fs from 'fs';
Expand All @@ -12,8 +12,6 @@ import redis from '../store/redis';
// ARCHIVE_S3_KEY_SECRET: 'minioadmin',
// ARCHIVE_S3_ENDPOINT: 'http://localhost:9000',

const blobArchive = new Archive('blob');

// current run started at 5000000000
const stop = Number(process.env.BACKFILL_STOP) || 6200000000;
async function scanApi() {
Expand Down
6 changes: 2 additions & 4 deletions svc/gcdata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
// The parser will also request gcdata if needed
import { runQueue } from '../store/queue';
import config from '../config';
import { ApiFetcher } from '../fetcher/getApiData';
import { GcdataFetcher } from '../fetcher/getGcData';
import { apiFetcher } from '../fetcher/getApiData';
import { gcFetcher } from '../fetcher/getGcData';
import { getPGroup } from '../util/pgroup';

const apiFetcher = new ApiFetcher();
const gcFetcher = new GcdataFetcher();

async function processGcData(job: GcDataJob) {
const matchId = job.match_id;
Expand Down
10 changes: 3 additions & 7 deletions svc/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@ import { runReliableQueue } from '../store/queue';
import c from 'ansi-colors';
import { buildReplayUrl, redisCount } from '../util/utility';
import redis from '../store/redis';
import { ApiFetcher } from '../fetcher/getApiData';
import { ParsedFetcher } from '../fetcher/getParsedData';
import { GcdataFetcher } from '../fetcher/getGcData';
import { apiFetcher } from '../fetcher/getApiData';
import { parsedFetcher } from '../fetcher/getParsedData';
import { gcFetcher } from '../fetcher/getGcData';
import { getPGroup } from '../util/pgroup';
import moment from 'moment';
import db from '../store/db';

const apiFetcher = new ApiFetcher();
const gcFetcher = new GcdataFetcher();
const parsedFetcher = new ParsedFetcher();
const { PARSER_PARALLELISM } = config;

async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
Expand Down
12 changes: 4 additions & 8 deletions util/archiveUtil.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import config from '../config';
import { Archive } from '../store/archive';
import { blobArchive } from '../store/archive';
import cassandra from '../store/cassandra';
import QueryStream from 'pg-query-stream';
import { Client } from 'pg';
import crypto from 'crypto';
import db from '../store/db';
import { ApiFetcher } from '../fetcher/getApiData';
import { GcdataFetcher } from '../fetcher/getGcData';
import { ParsedFetcher } from '../fetcher/getParsedData';
const apiFetcher = new ApiFetcher();
const gcFetcher = new GcdataFetcher();
const parsedFetcher = new ParsedFetcher();
const blobArchive = new Archive('blob');
import { apiFetcher } from '../fetcher/getApiData';
import { gcFetcher } from '../fetcher/getGcData';
import { parsedFetcher } from '../fetcher/getParsedData';

async function processMatch(matchId: number) {
// Check if we should archive the blobs (should be parsed and not archived)
Expand Down
16 changes: 5 additions & 11 deletions util/buildMatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,13 @@ import {
import redis from '../store/redis';
import db from '../store/db';
import { ApiMatch } from './types';
import { ParsedFetcher } from '../fetcher/getParsedData';
import { ApiFetcher } from '../fetcher/getApiData';
import { GcdataFetcher } from '../fetcher/getGcData';
import { ArchivedFetcher } from '../fetcher/getArchivedData';
import { MetaFetcher } from '../fetcher/getMeta';
import { parsedFetcher } from '../fetcher/getParsedData';
import { apiFetcher } from '../fetcher/getApiData';
import { gcFetcher } from '../fetcher/getGcData';
import { archivedFetcher } from '../fetcher/getArchivedData';
import { metaFetcher } from '../fetcher/getMeta';
import { benchmarks } from './benchmarksUtil';

const apiFetcher = new ApiFetcher();
const gcFetcher = new GcdataFetcher();
const parsedFetcher = new ParsedFetcher();
const archivedFetcher = new ArchivedFetcher();
const metaFetcher = new MetaFetcher();

function extendPlayerData(
player: Player | ParsedPlayer,
match: Match | ParsedMatch,
Expand Down
4 changes: 1 addition & 3 deletions util/buildPlayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import { deserialize, pick, redisCount, redisCountDistinct } from './utility';
import { gzipSync, gunzipSync } from 'zlib';
import { cacheableCols } from '../routes/playerFields';
import { promises as fs } from 'fs';
import { PlayerArchiveFetcher } from '../fetcher/getPlayerArchive';

const playerArchiveFetcher = new PlayerArchiveFetcher();
import { playerArchiveFetcher } from '../fetcher/getPlayerArchive';

export async function getPlayerMatches(
accountId: number,
Expand Down
4 changes: 1 addition & 3 deletions util/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ import {
isRecentlyVisited,
} from './queries';
import { getPGroup } from './pgroup';
import { Archive } from '../store/archive';
import { blobArchive } from '../store/archive';
import type { ApiMatch, ApiPlayer, InsertMatchInput } from './types';
import { upsertPlayerCaches } from './playerCaches';

moment.relativeTimeThreshold('ss', 0);

const blobArchive = new Archive('blob');

export async function upsert(
db: knex.Knex,
table: string,
Expand Down

0 comments on commit b426878

Please sign in to comment.