Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimenting: seeding worker #218

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@

# Sets the mempool polling interval in milliseconds
# MEMPOOL_POLLING_INTERVAL_MS=30000

# Sets the filter for the SeedingWorker
# SEEDING_WORKER_FILTER='{"isLayer1": true}'
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ services:
- WRITE_TRANSACTION_DB_SIGNATURES=${WRITE_TRANSACTION_DB_SIGNATURES:-}
- ENABLE_DATA_DB_WAL_CLEANUP=${ENABLE_DATA_DB_WAL_CLEANUP:-}
- MAX_DATA_ITEM_QUEUE_SIZE=${MAX_DATA_ITEM_QUEUE_SIZE:-}
- SEEDING_WORKER_FILTER=${SEEDING_WORKER_FILTER:-}
networks:
- ar-io-network
depends_on:
Expand Down
1 change: 1 addition & 0 deletions docs/envs.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ This document describes the environment variables that can be used to configure
| WRITE_TRANSACTION_DB_SIGNATURES | Boolean | true | If true, the transactions signatures will be written to the database. |
| ENABLE_DATA_DB_WAL_CLEANUP | Boolean | false | If true, the data database WAL cleanup worker will be enabled |
| MAX_DATA_ITEM_QUEUE_SIZE | Number | 100000 | Sets the maximum number of data items to queue for indexing before skipping indexing new data items |
| SEEDING_WORKER_FILTER | String | undefined | The filter used to control what is seeded via the seeding worker (example via torrent protocol) |
| ARNS_ROOT_HOST | String | undefined | Domain name for ArNS host |
| SANDBOX_PROTOCOL | String | undefined | Protocol setting in process of creating sandbox domain in ArNS (ARNS_ROOT_HOST needs to be set for this env to have any effect) |
| START_WRITERS | Boolean | true | If true, start indexing blocks, tx, ANS104 bundles |
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"swagger-ui-express": "^4.5.0",
"umzug": "^3.2.1",
"wait": "^0.4.2",
"webtorrent": "^2.5.1",
"winston": "^3.7.2",
"yaml": "^2.3.4",
"yesql": "^7.0.0"
Expand All @@ -70,6 +71,7 @@
"@types/stream-json": "^1.7.2",
"@types/supertest": "^2.0.16",
"@types/swagger-ui-express": "^4.1.3",
"@types/webtorrent": "^0.109.8",
"@typescript-eslint/eslint-plugin": "^5.26.0",
"@typescript-eslint/parser": "^5.26.0",
"c8": "^8.0.1",
Expand Down
10 changes: 10 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ export const ON_DEMAND_RETRIEVAL_ORDER = env
)
.split(',');

// seeding-worker (torrent) filter
export const SEEDING_WORKER_FILTER_STRING = env.varOrUndefined(
'SEEDING_WORKER_FILTER',
);

export const SEEDING_WORKER_FILTER =
SEEDING_WORKER_FILTER_STRING === undefined
? undefined
: createFilter(JSON.parse(SEEDING_WORKER_FILTER_STRING));

//
// Indexing
//
Expand Down
12 changes: 12 additions & 0 deletions src/filters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ export class MatchNestedBundle implements ItemFilter {
}
}

export class MatchLayer1 implements ItemFilter {
async match(item: MatchableItem): Promise<boolean> {
return (
item.parent_id === '' ||
item.parent_id === null ||
item.parent_id === undefined
);
}
}

/**
* Examples:
*
Expand Down Expand Up @@ -202,6 +212,8 @@ export function createFilter(filter: any): ItemFilter {
return new MatchAttributes(filter.attributes);
} else if (filter?.isNestedBundle) {
return new MatchNestedBundle();
} else if (filter?.isLayer1) {
return new MatchLayer1();
} else if (filter?.not) {
return new NegateMatch(createFilter(filter.not));
} else if (filter?.and) {
Expand Down
2 changes: 1 addition & 1 deletion src/store/fs-data-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class FsDataStore implements ContiguousDataStore {
return `${this.baseDir}/data/${hashPrefix}`;
}

private dataPath(hash: string) {
public dataPath(hash: string) {
return `${this.dataDir(hash)}/${hash}`;
}

Expand Down
21 changes: 20 additions & 1 deletion src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import { S3DataSource } from './data/s3-data-source.js';
import { connect } from '@permaweb/aoconnect';
import { DataContentAttributeImporter } from './workers/data-content-attribute-importer.js';
import { SignatureFetcher } from './data/signature-fetcher.js';
import { SeedingWorker } from './workers/seeding-worker.js';
import { SQLiteWalCleanupWorker } from './workers/sqlite-wal-cleanup-worker.js';
import { KvArnsStore } from './store/kv-arns-store.js';
import { parquetExporter } from './routes/ar-io.js';
Expand Down Expand Up @@ -240,6 +241,16 @@ eventEmitter.on(events.TX_INDEXED, async (tx: MatchableItem) => {
eventEmitter.emit(events.ANS104_TX_INDEXED, tx);
eventEmitter.emit(events.ANS104_BUNDLE_INDEXED, tx);
}

const seedingWorkerFilter = config.SEEDING_WORKER_FILTER;

if (
seedingWorkerFilter !== undefined &&
tx.id !== undefined &&
(await seedingWorkerFilter.match(tx))
) {
seedingWorker.seed(tx.id);
}
});

eventEmitter.on(
Expand Down Expand Up @@ -383,13 +394,15 @@ metrics.registerQueueLengthGauge('dataContentAttributeImporter', {
length: () => dataContentAttributeImporter.queueDepth(),
});

const fsDataStore = new FsDataStore({ log, baseDir: 'data/contiguous' });

export const contiguousDataSource = new ReadThroughDataCache({
log,
dataSource: new SequentialDataSource({
log,
dataSources,
}),
dataStore: new FsDataStore({ log, baseDir: 'data/contiguous' }),
dataStore: fsDataStore,
contiguousDataIndex,
dataContentAttributeImporter,
});
Expand Down Expand Up @@ -616,6 +629,12 @@ if (dataSqliteWalCleanupWorker !== undefined) {
dataSqliteWalCleanupWorker.start();
}

export const seedingWorker = new SeedingWorker({
log,
contiguousDataSource,
fsDataStore,
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding seedingWorker to the shutdown process.

The seedingWorker has been correctly initialized and integrated into the system. However, it's not included in the shutdown process. To ensure proper cleanup and resource management, consider adding the seedingWorker to the shutdown function.

Add the following line to the shutdown function, along with the other worker shutdown calls:

await seedingWorker.stop();

This will ensure that the seedingWorker is properly stopped when the system is shutting down.

let isShuttingDown = false;

export const shutdown = async (express: Server) => {
Expand Down
67 changes: 67 additions & 0 deletions src/workers/seeding-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* AR.IO Gateway
* Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import { Logger } from 'winston';
import WebTorrent from 'webtorrent';
import { FsDataStore } from '../store/fs-data-store.js';
import { ContiguousDataSource } from '../types.js';

export class SeedingWorker {
private log: Logger;
private contiguousDataSource: ContiguousDataSource;
private fsDataStore: FsDataStore;

public webTorrentClient: WebTorrent.Instance;

constructor({
log,
contiguousDataSource,
fsDataStore,
}: {
log: Logger;
contiguousDataSource: ContiguousDataSource;
fsDataStore: FsDataStore;
}) {
this.webTorrentClient = new WebTorrent();
this.contiguousDataSource = contiguousDataSource;
this.fsDataStore = fsDataStore;
this.log = log.child({ class: 'SeedingWorker' });
}

async seed(txId: string) {
this.log.debug(`Seeding ${txId}`);
await this.contiguousDataSource.getData({ id: txId });
const dataPath = this.fsDataStore.dataPath(txId);
await new Promise<void>((resolve) =>
this.webTorrentClient.seed(
dataPath,
{
announce: [
'wss://tracker.btorrent.xyz',
'wss://tracker.openwebtorrent.com',
'wss://tracker.webtorrent.io',
],
},
(torrent: WebTorrent.Torrent) => {
this.log.debug(`Seeding ${txId} started: ${torrent.magnetURI}`);
resolve();
},
),
);
}
Comment on lines +46 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling and Promise management in the seed method.

While the seed method implements the core functionality, there are several areas for improvement:

  1. Error handling: The method doesn't handle potential errors from getData or dataPath operations.
  2. Promise handling: The method creates a new Promise but doesn't properly handle rejections.
  3. Async/await usage: The method is declared as async but doesn't use await consistently.

Consider refactoring the method as follows:

async seed(txId: string): Promise<void> {
  this.log.debug(`Seeding ${txId}`);
  try {
    await this.contiguousDataSource.getData({ id: txId });
    const dataPath = this.fsDataStore.dataPath(txId);
    
    return new Promise<void>((resolve, reject) => {
      this.webTorrentClient.seed(
        dataPath,
        {
          announce: [
            'wss://tracker.btorrent.xyz',
            'wss://tracker.openwebtorrent.com',
            'wss://tracker.webtorrent.io',
          ],
        },
        (torrent: WebTorrent.Torrent) => {
          this.log.debug(`Seeding ${txId} started: ${torrent.magnetURI}`);
          resolve();
        }
      ).on('error', (error) => {
        this.log.error(`Error seeding ${txId}`, { error });
        reject(error);
      });
    });
  } catch (error) {
    this.log.error(`Error preparing data for seeding ${txId}`, { error });
    throw error;
  }
}

This refactored version:

  • Adds error handling for getData and dataPath operations.
  • Properly handles Promise rejection for WebTorrent seeding errors.
  • Uses async/await consistently.
  • Returns the Promise to allow proper asynchronous handling by the caller.

}
Loading