From 035c895cf3290302d3352b32efcebe8deeb7e44a Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Fri, 13 Dec 2024 13:37:03 -0300 Subject: [PATCH] chore: add export parquet and fixes to fetch-data-gql --- .gitignore | 1 + scripts/import-data/count-fetched-ids.ts | 106 +++++++-------------- scripts/import-data/export-parquet.ts | 114 +++++++++++++++++++++++ scripts/import-data/fetch-data-gql.ts | 9 +- scripts/import-data/utils.ts | 2 +- 5 files changed, 156 insertions(+), 76 deletions(-) create mode 100644 scripts/import-data/export-parquet.ts diff --git a/.gitignore b/.gitignore index b2b5210d..51c23f50 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ test-results.xml /scripts/import-data/bundles /scripts/import-data/transactions +/scripts/import-data/parquet # Generated docs /docs/sqlite/bundles diff --git a/scripts/import-data/count-fetched-ids.ts b/scripts/import-data/count-fetched-ids.ts index 7f9dbd69..69a3c851 100644 --- a/scripts/import-data/count-fetched-ids.ts +++ b/scripts/import-data/count-fetched-ids.ts @@ -83,52 +83,6 @@ const countIds = async ({ return counter; }; -// const importFromFiles = async ({ -// files, -// type, -// }: { -// files: string[]; -// type: 'transactions' | 'bundles'; -// }) => { -// let counter = 0; -// let folder: string; -// let endpoint: string; -// switch (type) { -// case 'transactions': -// folder = TRANSACTIONS_DIR; -// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-tx`; -// break; -// case 'bundles': -// folder = BUNDLES_DIR; -// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-bundle`; -// break; -// default: -// throw new Error('Invalid type'); -// } - -// for (const file of files) { -// const filePath = path.join(folder, file); -// const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[]; -// console.log( -// `Importing ${ids.length} ${type} from block ${file.split('.')[0]}`, -// ); - -// for (const id of ids) { -// counter++; -// await fetchWithRetry(endpoint, { -// method: 'POST', -// headers: { -// 'Content-Type': 'application/json', -// Authorization: `Bearer ${ADMIN_KEY}`, -// }, -// body: JSON.stringify({ id }), -// }); -// } -// } - -// return { queued: counter }; -// }; - (async () => { const transactionFiles = await getFilesInRange({ folder: TRANSACTIONS_DIR, @@ -141,34 +95,40 @@ const countIds = async ({ max: MAX_BLOCK_HEIGHT, }); - const firstTransactionHeight = parseInt( - transactionFiles[0].split('.')[0], - 10, - ); - const lastTransactionHeight = parseInt( - transactionFiles[transactionFiles.length - 1].split('.')[0], - 10, - ); - const transactionCount = await countIds({ - folder: TRANSACTIONS_DIR, - files: transactionFiles, - }); + console.log({ transactionFiles, bundleFiles }); - const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10); - const lastBundleHeight = parseInt( - bundleFiles[bundleFiles.length - 1].split('.')[0], - 10, - ); - const bundleCount = await countIds({ - folder: BUNDLES_DIR, - files: bundleFiles, - }); + if (transactionFiles.length > 0) { + const firstTransactionHeight = parseInt( + transactionFiles[0].split('.')[0], + 10, + ); + const lastTransactionHeight = parseInt( + transactionFiles[transactionFiles.length - 1].split('.')[0], + 10, + ); + const transactionCount = await countIds({ + folder: TRANSACTIONS_DIR, + files: transactionFiles, + }); - console.log( - `Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`, - ); + console.log( + `Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`, + ); + } - console.log( - `Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`, - ); + if (bundleFiles.length > 0) { + const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10); + const lastBundleHeight = parseInt( + bundleFiles[bundleFiles.length - 1].split('.')[0], + 10, + ); + const bundleCount = await countIds({ + folder: BUNDLES_DIR, + files: bundleFiles, + }); + + console.log( + `Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`, + ); + } })(); diff --git a/scripts/import-data/export-parquet.ts b/scripts/import-data/export-parquet.ts new file mode 100644 index 00000000..4e041542 --- /dev/null +++ b/scripts/import-data/export-parquet.ts @@ -0,0 +1,114 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js'; +const args = process.argv.slice(2); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +let ARIO_ENDPOINT = 'http://localhost:4000'; +let ADMIN_KEY: string | undefined; +let OUTPUT_DIR = path.join(__dirname, 'parquet'); +let MAX_FILE_ROWS = 1_000_000; +let MIN_BLOCK_HEIGHT = 0; +let MAX_BLOCK_HEIGHT: number | undefined; + +args.forEach((arg, index) => { + switch (arg) { + case '--adminKey': + if (args[index + 1]) { + ADMIN_KEY = args[index + 1]; + } else { + console.error('Missing value for --adminKey'); + process.exit(1); + } + break; + case '--arioNode': + if (args[index + 1]) { + ARIO_ENDPOINT = args[index + 1]; + } else { + console.error('Missing value for --arioNode'); + process.exit(1); + } + break; + case '--outputDir': + if (args[index + 1]) { + OUTPUT_DIR = args[index + 1]; + } else { + console.error('Missing value for --outputDir'); + process.exit(1); + } + break; + case '--minHeight': + if (args[index + 1]) { + MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --minHeight'); + process.exit(1); + } + break; + case '--maxHeight': + if (args[index + 1]) { + MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxHeight'); + process.exit(1); + } + break; + + case '--maxFileRows': + if (args[index + 1]) { + MAX_FILE_ROWS = parseInt(args[index + 1], 10); + } else { + console.error('Missing value for --maxFileRows'); + process.exit(1); + } + break; + default: + break; + } +}); + +(async () => { + if (ADMIN_KEY === undefined) { + throw new Error('Missing admin key'); + } + + if (MAX_BLOCK_HEIGHT === undefined) { + MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight(); + } + + await fetchWithRetry(`${ARIO_ENDPOINT}/ar-io/admin/export-parquet`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${ADMIN_KEY}`, + }, + body: JSON.stringify({ + outputDir: OUTPUT_DIR, + startHeight: MIN_BLOCK_HEIGHT, + endHeight: MAX_BLOCK_HEIGHT, + maxFileRows: MAX_FILE_ROWS, + }), + }); + + console.log( + `Parquet export started from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`, + ); +})(); diff --git a/scripts/import-data/fetch-data-gql.ts b/scripts/import-data/fetch-data-gql.ts index 7b6d4945..69bf3f70 100644 --- a/scripts/import-data/fetch-data-gql.ts +++ b/scripts/import-data/fetch-data-gql.ts @@ -199,7 +199,7 @@ const getRootTxId = async (txId: string) => { } const { data } = result; - const bundleId = data.transaction.bundledIn?.id; + const bundleId = data.transaction?.bundledIn?.id; if (bundleId === undefined) { rootTxId = currentId; @@ -243,6 +243,7 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => { let hasNextPage = true; const transactions: BlockTransactions = new Map(); const bundles: BlockTransactions = new Map(); + const rootTxIdsForBundles: Map = new Map(); while (hasNextPage) { const { @@ -270,7 +271,11 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => { if (bundleId !== undefined) { if (BUNDLES_FETCH_ROOT_TX) { - const rootTxId = await getRootTxId(bundleId); + let rootTxId = rootTxIdsForBundles.get(bundleId); + if (rootTxId === undefined) { + rootTxId = await getRootTxId(bundleId); + rootTxIdsForBundles.set(bundleId, rootTxId); + } bundles.get(blockHeight)?.add(rootTxId); } else { bundles.get(blockHeight)?.add(bundleId); diff --git a/scripts/import-data/utils.ts b/scripts/import-data/utils.ts index ae4b13f5..59315f4f 100644 --- a/scripts/import-data/utils.ts +++ b/scripts/import-data/utils.ts @@ -31,7 +31,7 @@ export const fetchWithRetry = async ( try { const response = await fetch(url, options); - console.log({ status: response.status }); + console.log(response); if (response.ok) { return response; }