diff --git a/digital_asset_types/src/dao/generated/tree_transactions.rs b/digital_asset_types/src/dao/generated/tree_transactions.rs index 3fdddf058..65fd65b27 100644 --- a/digital_asset_types/src/dao/generated/tree_transactions.rs +++ b/digital_asset_types/src/dao/generated/tree_transactions.rs @@ -15,7 +15,7 @@ impl EntityName for Entity { #[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)] pub struct Model { pub signature: String, - pub tree: Vec, + pub tree: String, pub slot: i64, pub created_at: Option, pub processed_at: Option, @@ -49,8 +49,8 @@ impl ColumnTrait for Column { type EntityName = Entity; fn def(&self) -> ColumnDef { match self { - Self::Signature => ColumnType::Char(Some(84u32)).def(), - Self::Tree => ColumnType::Binary.def(), + Self::Signature => ColumnType::Text.def(), + Self::Tree => ColumnType::Text.def(), Self::Slot => ColumnType::BigInteger.def(), Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(), Self::ProcessedAt => ColumnType::TimestampWithTimeZone.def().null(), diff --git a/migration/src/m20231208_103949_create_tree_transactions_table.rs b/migration/src/m20231208_103949_create_tree_transactions_table.rs index 1889e242a..b8aad2270 100644 --- a/migration/src/m20231208_103949_create_tree_transactions_table.rs +++ b/migration/src/m20231208_103949_create_tree_transactions_table.rs @@ -15,11 +15,11 @@ impl MigrationTrait for Migration { .if_not_exists() .col( ColumnDef::new(TreeTransactions::Signature) - .char_len(88) + .string() .not_null() .primary_key(), ) - .col(ColumnDef::new(TreeTransactions::Tree).binary().not_null()) + .col(ColumnDef::new(TreeTransactions::Tree).string().not_null()) .col(ColumnDef::new(TreeTransactions::Slot).big_integer().not_null()) .col(ColumnDef::new(TreeTransactions::CreatedAt).timestamp_with_time_zone().default("now()")) .col(ColumnDef::new(TreeTransactions::ProcessedAt).timestamp_with_time_zone()) @@ -27,14 +27,6 @@ impl MigrationTrait for Migration { ) .await?; - let stmt = Statement::from_sql_and_values( - manager.get_database_backend(), - r#"CREATE INDEX signature_processed_at_not_null_index ON tree_transactions (signature, processed_at) WHERE processed_at IS NOT NULL"#, - [] - ); - - db.execute(stmt).await?; - manager .create_index( Index::create() @@ -48,10 +40,6 @@ impl MigrationTrait for Migration { } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager - .drop_index(Index::drop().name("signature_processed_at_null_index").table(TreeTransactions::Table).to_owned()) - .await?; - manager .drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned()) .await?; diff --git a/tree_backfiller/README.md b/tree_backfiller/README.md index a24902b27..6a3e135df 100644 --- a/tree_backfiller/README.md +++ b/tree_backfiller/README.md @@ -1,6 +1,3 @@ - - - # Tree Backfiller The Tree Backfiller crawls all trees on-chain and backfills any transactions related to a tree that have not already been observed. @@ -18,7 +15,7 @@ Usage: das-tree-backfiller run [OPTIONS] --solana-rpc-url --dat Options: --solana-rpc-url - Solana RPC URL [env: SOLANA_RPC_URL=https://index.rpcpool.com/a4d23a00546272efeba9843a4ae4R] + Solana RPC URL [env: SOLANA_RPC_URL=] --tree-crawler-count Number of tree crawler workers [env: TREE_CRAWLER_COUNT=] [default: 100] --signature-channel-size @@ -51,7 +48,6 @@ The Tree Backfiller provides several metrics for monitoring performance and stat Metric | Description --- | --- -transaction.workers | Gauge of active worker threads transaction.failed | Count of failed transaction transaction.queued | Time for a transaction to be queued tree.crawled | Time to crawl a tree diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index 98d9c9882..2bfbf9e5e 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -11,9 +11,7 @@ use log::{error, info}; use sea_orm::SqlxPostgresConnector; use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Signature; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use std::time::Instant; use tokio::sync::{mpsc, Semaphore}; @@ -28,12 +26,15 @@ pub struct Args { pub tree_crawler_count: usize, /// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel. - #[arg(long, env, default_value = "10000")] + #[arg(long, env, default_value = "1000")] pub signature_channel_size: usize, #[arg(long, env, default_value = "1000")] pub queue_channel_size: usize, + #[arg(long, env)] + pub only_trees: Option>, + /// Database configuration #[clap(flatten)] pub database: db::PoolArgs, @@ -47,50 +48,6 @@ pub struct Args { pub metrics: MetricsArgs, } -/// A thread-safe counter. -pub struct Counter(Arc); - -impl Counter { - /// Creates a new counter initialized to zero. - pub fn new() -> Self { - Self(Arc::new(AtomicUsize::new(0))) - } - - /// Increments the counter by one. - pub fn increment(&self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - - /// Decrements the counter by one. - pub fn decrement(&self) { - self.0.fetch_sub(1, Ordering::SeqCst); - } - - /// Returns the current value of the counter. - pub fn get(&self) -> usize { - self.0.load(Ordering::SeqCst) - } - - /// Returns a future that resolves when the counter reaches zero. - /// The future periodically checks the counter value and sleeps for a short duration. - pub fn zero(&self) -> impl std::future::Future { - let counter = self.clone(); - async move { - while counter.get() > 0 { - tokio::time::sleep(Duration::from_millis(100)).await; - } - } - } -} - -impl Clone for Counter { - /// Returns a clone of the counter. - /// The returned counter shares the same underlying atomic integer. - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } -} - /// The main function for running the backfiller. /// /// This function does the following: @@ -124,54 +81,12 @@ pub async fn run(config: Args) -> Result<()> { let queue_metrics = metrics.clone(); let (queue_sender, mut queue_receiver) = mpsc::channel::>(config.queue_channel_size); + let signature_queue_sender = queue_sender.clone(); let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); - let transaction_worker_count = Counter::new(); - let transaction_worker_count_check = transaction_worker_count.clone(); - - tokio::spawn(async move { - loop { - tokio::select! { - Some(signature) = sig_receiver.recv() => { - let solana_rpc = Arc::clone(&sig_solana_rpc); - let transaction_worker_count_sig = transaction_worker_count.clone(); - let queue_sender = queue_sender.clone(); - let metrics = signature_metrics.clone(); - - transaction_worker_count_sig.increment(); - - if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) { - metrics.gauge("transaction.workers", transaction_workers_running); - } - - let transaction_task = async move { - let timing = Instant::now(); - if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await { - metrics.increment("transaction.failed"); - error!("retrieving transaction: {:?}", e); - } - - transaction_worker_count_sig.decrement(); - - if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) { - metrics.gauge("transaction.workers", transaction_workers_running); - } - - metrics.time("transaction.queued", timing.elapsed()); - }; - - tokio::spawn(transaction_task); - }, - else => break, - } - } - - Ok::<(), anyhow::Error>(()) - }); - - let queue_handler = tokio::spawn(async move { - let mut queue = queue::Queue::setup(config.queue).await?; + let mut queue = queue::Queue::setup(config.queue).await?; + let queue_handle = tokio::spawn(async move { while let Some(data) = queue_receiver.recv().await { if let Err(e) = queue.push(&data).await { queue_metrics.increment("transaction.failed"); @@ -180,8 +95,25 @@ pub async fn run(config: Args) -> Result<()> { queue_metrics.increment("transaction.succeeded"); } } + }); - Ok::<(), anyhow::Error>(()) + let signature_handle = tokio::spawn(async move { + while let Some(signature) = sig_receiver.recv().await { + let solana_rpc = Arc::clone(&sig_solana_rpc); + let queue_sender = signature_queue_sender.clone(); + let metrics = signature_metrics.clone(); + + tokio::spawn(async move { + let timing = Instant::now(); + + if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await { + metrics.increment("transaction.failed"); + error!("sending to queue: {:?}", e); + } else { + metrics.time("transaction.queued", timing.elapsed()); + } + }); + } }); let started = Instant::now(); @@ -196,7 +128,7 @@ pub async fn run(config: Args) -> Result<()> { ); let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); - let mut crawl_handlers = Vec::with_capacity(tree_count); + let mut crawl_handles = Vec::with_capacity(tree_count); for tree in trees { let client = Arc::clone(&solana_rpc); @@ -206,7 +138,7 @@ pub async fn run(config: Args) -> Result<()> { let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); let metrics = tree_metrics.clone(); - let crawl_handler = tokio::spawn(async move { + let crawl_handle = tokio::spawn(async move { let _permit = semaphore.acquire().await?; let timing = Instant::now(); @@ -223,12 +155,16 @@ pub async fn run(config: Args) -> Result<()> { Ok::<(), anyhow::Error>(()) }); - crawl_handlers.push(crawl_handler); + crawl_handles.push(crawl_handle); } - futures::future::try_join_all(crawl_handlers).await?; - transaction_worker_count_check.zero().await; - let _ = queue_handler.await?; + futures::future::try_join_all(crawl_handles).await?; + drop(sig_sender); + + signature_handle.await?; + drop(queue_sender); + + queue_handle.await?; metrics.time("job.completed", started.elapsed()); diff --git a/tree_backfiller/src/metrics.rs b/tree_backfiller/src/metrics.rs index e8789888b..99cede4d7 100644 --- a/tree_backfiller/src/metrics.rs +++ b/tree_backfiller/src/metrics.rs @@ -38,12 +38,6 @@ impl Metrics { } } - pub fn gauge(&self, key: &str, amount: u64) { - if let Err(e) = self.0.gauge(key, amount) { - error!("submitting gauge: {:?}", e) - } - } - pub fn increment(&self, key: &str) { if let Err(e) = self.0.count(key, 1) { error!("submitting increment: {:?}", e) diff --git a/tree_backfiller/src/queue.rs b/tree_backfiller/src/queue.rs index ceef75a66..d534a61ac 100644 --- a/tree_backfiller/src/queue.rs +++ b/tree_backfiller/src/queue.rs @@ -4,8 +4,9 @@ use figment::value::{Dict, Value}; use plerkle_messenger::{ redis_messenger::RedisMessenger, Messenger, MessengerConfig, MessengerError, MessengerType, }; +use std::sync::{Arc, Mutex}; -const TRANSACTION_BACKFILL_STREAM: &str = "TXNFILL"; +const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL"; #[derive(Clone, Debug, Parser)] pub struct QueueArgs { @@ -33,11 +34,10 @@ impl From for MessengerConfig { Self { messenger_type: MessengerType::Redis, - connection_config, + connection_config: connection_config, } } } - #[derive(Debug)] pub struct Queue(RedisMessenger); @@ -46,7 +46,6 @@ impl Queue { let mut messenger = RedisMessenger::new(config.clone().into()).await?; messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?; - messenger .set_buffer_size( TRANSACTION_BACKFILL_STREAM, diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs index 328e25c94..7519a9d89 100644 --- a/tree_backfiller/src/tree.rs +++ b/tree_backfiller/src/tree.rs @@ -3,7 +3,7 @@ use borsh::BorshDeserialize; use clap::Args; use digital_asset_types::dao::tree_transactions; use flatbuffers::FlatBufferBuilder; -use log::info; +use log::{error, info}; use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; use sea_orm::{ sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, @@ -53,6 +53,8 @@ pub enum TreeErrorKind { PlerkleMessenger(#[from] plerkle_messenger::MessengerError), #[error("queue send")] QueueSend(#[from] tokio::sync::mpsc::error::SendError>), + #[error("parse pubkey")] + ParsePubkey(#[from] solana_sdk::pubkey::ParsePubkeyError), } #[derive(Debug, Clone)] pub struct TreeHeaderResponse { @@ -108,7 +110,7 @@ impl TreeResponse { let mut before = None; let until = tree_transactions::Entity::find() - .filter(tree_transactions::Column::Tree.eq(self.pubkey.as_ref())) + .filter(tree_transactions::Column::Tree.eq(self.pubkey.to_string())) .order_by_desc(tree_transactions::Column::Slot) .one(&conn) .await? @@ -133,35 +135,21 @@ impl TreeResponse { let slot = i64::try_from(sig.slot)?; let sig = Signature::from_str(&sig.signature)?; - let tree_transaction_processed = tree_transactions::Entity::find() - .filter( - tree_transactions::Column::Signature - .eq(sig.to_string()) - .and(tree_transactions::Column::ProcessedAt.is_not_null()), - ) - .one(&conn) - .await?; - - if tree_transaction_processed.is_some() { - info!("skipping previously processed transaction {}", sig); - continue; - } - let tree_transaction = tree_transactions::ActiveModel { signature: Set(sig.to_string()), - tree: Set(self.pubkey.as_ref().to_vec()), + tree: Set(self.pubkey.to_string()), slot: Set(slot), ..Default::default() }; - tree_transactions::Entity::insert(tree_transaction) + let _ = tree_transactions::Entity::insert(tree_transaction) .on_conflict( OnConflict::column(tree_transactions::Column::Signature) .do_nothing() .to_owned(), ) .exec(&conn) - .await?; + .await; sender.send(sig).await?;