Skip to content

Commit

Permalink
chore: add export parquet and fixes to fetch-data-gql
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Dec 13, 2024
1 parent 72c9922 commit 035c895
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 76 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
test-results.xml
/scripts/import-data/bundles
/scripts/import-data/transactions
/scripts/import-data/parquet

# Generated docs
/docs/sqlite/bundles
Expand Down
106 changes: 33 additions & 73 deletions scripts/import-data/count-fetched-ids.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,52 +83,6 @@ const countIds = async ({
return counter;
};

// const importFromFiles = async ({
// files,
// type,
// }: {
// files: string[];
// type: 'transactions' | 'bundles';
// }) => {
// let counter = 0;
// let folder: string;
// let endpoint: string;
// switch (type) {
// case 'transactions':
// folder = TRANSACTIONS_DIR;
// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-tx`;
// break;
// case 'bundles':
// folder = BUNDLES_DIR;
// endpoint = `${ARIO_ENDPOINT}/ar-io/admin/queue-bundle`;
// break;
// default:
// throw new Error('Invalid type');
// }

// for (const file of files) {
// const filePath = path.join(folder, file);
// const ids = JSON.parse(await fs.readFile(filePath, 'utf-8')) as string[];
// console.log(
// `Importing ${ids.length} ${type} from block ${file.split('.')[0]}`,
// );

// for (const id of ids) {
// counter++;
// await fetchWithRetry(endpoint, {
// method: 'POST',
// headers: {
// 'Content-Type': 'application/json',
// Authorization: `Bearer ${ADMIN_KEY}`,
// },
// body: JSON.stringify({ id }),
// });
// }
// }

// return { queued: counter };
// };

(async () => {
const transactionFiles = await getFilesInRange({
folder: TRANSACTIONS_DIR,
Expand All @@ -141,34 +95,40 @@ const countIds = async ({
max: MAX_BLOCK_HEIGHT,
});

const firstTransactionHeight = parseInt(
transactionFiles[0].split('.')[0],
10,
);
const lastTransactionHeight = parseInt(
transactionFiles[transactionFiles.length - 1].split('.')[0],
10,
);
const transactionCount = await countIds({
folder: TRANSACTIONS_DIR,
files: transactionFiles,
});
console.log({ transactionFiles, bundleFiles });

const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10);
const lastBundleHeight = parseInt(
bundleFiles[bundleFiles.length - 1].split('.')[0],
10,
);
const bundleCount = await countIds({
folder: BUNDLES_DIR,
files: bundleFiles,
});
if (transactionFiles.length > 0) {
const firstTransactionHeight = parseInt(
transactionFiles[0].split('.')[0],
10,
);
const lastTransactionHeight = parseInt(
transactionFiles[transactionFiles.length - 1].split('.')[0],
10,
);
const transactionCount = await countIds({
folder: TRANSACTIONS_DIR,
files: transactionFiles,
});

console.log(
`Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`,
);
console.log(
`Total transactions from ${firstTransactionHeight} to ${lastTransactionHeight}: ${transactionCount}`,
);
}

console.log(
`Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`,
);
if (bundleFiles.length > 0) {
const firstBundleHeight = parseInt(bundleFiles[0].split('.')[0], 10);
const lastBundleHeight = parseInt(
bundleFiles[bundleFiles.length - 1].split('.')[0],
10,
);
const bundleCount = await countIds({
folder: BUNDLES_DIR,
files: bundleFiles,
});

console.log(
`Total bundles from ${firstBundleHeight} to ${lastBundleHeight}: ${bundleCount}`,
);
}
})();
114 changes: 114 additions & 0 deletions scripts/import-data/export-parquet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* AR.IO Gateway
* Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { fetchLatestBlockHeight, fetchWithRetry } from './utils.js';
const args = process.argv.slice(2);
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

let ARIO_ENDPOINT = 'http://localhost:4000';
let ADMIN_KEY: string | undefined;
let OUTPUT_DIR = path.join(__dirname, 'parquet');
let MAX_FILE_ROWS = 1_000_000;
let MIN_BLOCK_HEIGHT = 0;
let MAX_BLOCK_HEIGHT: number | undefined;

args.forEach((arg, index) => {
switch (arg) {
case '--adminKey':
if (args[index + 1]) {
ADMIN_KEY = args[index + 1];
} else {
console.error('Missing value for --adminKey');
process.exit(1);
}
break;
case '--arioNode':
if (args[index + 1]) {
ARIO_ENDPOINT = args[index + 1];
} else {
console.error('Missing value for --arioNode');
process.exit(1);
}
break;
case '--outputDir':
if (args[index + 1]) {
OUTPUT_DIR = args[index + 1];
} else {
console.error('Missing value for --outputDir');
process.exit(1);
}
break;
case '--minHeight':
if (args[index + 1]) {
MIN_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
} else {
console.error('Missing value for --minHeight');
process.exit(1);
}
break;
case '--maxHeight':
if (args[index + 1]) {
MAX_BLOCK_HEIGHT = parseInt(args[index + 1], 10);
} else {
console.error('Missing value for --maxHeight');
process.exit(1);
}
break;

case '--maxFileRows':
if (args[index + 1]) {
MAX_FILE_ROWS = parseInt(args[index + 1], 10);
} else {
console.error('Missing value for --maxFileRows');
process.exit(1);
}
break;
default:
break;
}
});

(async () => {
if (ADMIN_KEY === undefined) {
throw new Error('Missing admin key');
}

if (MAX_BLOCK_HEIGHT === undefined) {
MAX_BLOCK_HEIGHT = await fetchLatestBlockHeight();
}

await fetchWithRetry(`${ARIO_ENDPOINT}/ar-io/admin/export-parquet`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${ADMIN_KEY}`,
},
body: JSON.stringify({
outputDir: OUTPUT_DIR,
startHeight: MIN_BLOCK_HEIGHT,
endHeight: MAX_BLOCK_HEIGHT,
maxFileRows: MAX_FILE_ROWS,
}),
});

console.log(
`Parquet export started from block ${MIN_BLOCK_HEIGHT} to ${MAX_BLOCK_HEIGHT}`,
);
})();
9 changes: 7 additions & 2 deletions scripts/import-data/fetch-data-gql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ const getRootTxId = async (txId: string) => {
}

const { data } = result;
const bundleId = data.transaction.bundledIn?.id;
const bundleId = data.transaction?.bundledIn?.id;

if (bundleId === undefined) {
rootTxId = currentId;
Expand Down Expand Up @@ -243,6 +243,7 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => {
let hasNextPage = true;
const transactions: BlockTransactions = new Map();
const bundles: BlockTransactions = new Map();
const rootTxIdsForBundles: Map<string, string> = new Map();

while (hasNextPage) {
const {
Expand Down Expand Up @@ -270,7 +271,11 @@ const getTransactionsForRange = async ({ min, max }: BlockRange) => {

if (bundleId !== undefined) {
if (BUNDLES_FETCH_ROOT_TX) {
const rootTxId = await getRootTxId(bundleId);
let rootTxId = rootTxIdsForBundles.get(bundleId);
if (rootTxId === undefined) {
rootTxId = await getRootTxId(bundleId);
rootTxIdsForBundles.set(bundleId, rootTxId);
}
bundles.get(blockHeight)?.add(rootTxId);
} else {
bundles.get(blockHeight)?.add(bundleId);
Expand Down
2 changes: 1 addition & 1 deletion scripts/import-data/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const fetchWithRetry = async (
try {
const response = await fetch(url, options);

console.log({ status: response.status });
console.log(response);
if (response.ok) {
return response;
}
Expand Down

0 comments on commit 035c895

Please sign in to comment.