Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tree backfiller patches #116

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions digital_asset_types/src/dao/generated/tree_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl EntityName for Entity {
#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)]
pub struct Model {
pub signature: String,
pub tree: Vec<u8>,
pub tree: String,
pub slot: i64,
pub created_at: Option<DateTimeWithTimeZone>,
pub processed_at: Option<DateTimeWithTimeZone>,
Expand Down Expand Up @@ -49,8 +49,8 @@ impl ColumnTrait for Column {
type EntityName = Entity;
fn def(&self) -> ColumnDef {
match self {
Self::Signature => ColumnType::Char(Some(84u32)).def(),
Self::Tree => ColumnType::Binary.def(),
Self::Signature => ColumnType::Text.def(),
Self::Tree => ColumnType::Text.def(),
Self::Slot => ColumnType::BigInteger.def(),
Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(),
Self::ProcessedAt => ColumnType::TimestampWithTimeZone.def().null(),
Expand Down
16 changes: 2 additions & 14 deletions migration/src/m20231208_103949_create_tree_transactions_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,18 @@ impl MigrationTrait for Migration {
.if_not_exists()
.col(
ColumnDef::new(TreeTransactions::Signature)
.char_len(88)
.string()
.not_null()
.primary_key(),
)
.col(ColumnDef::new(TreeTransactions::Tree).binary().not_null())
.col(ColumnDef::new(TreeTransactions::Tree).string().not_null())
.col(ColumnDef::new(TreeTransactions::Slot).big_integer().not_null())
.col(ColumnDef::new(TreeTransactions::CreatedAt).timestamp_with_time_zone().default("now()"))
.col(ColumnDef::new(TreeTransactions::ProcessedAt).timestamp_with_time_zone())
.to_owned(),
)
.await?;

let stmt = Statement::from_sql_and_values(
manager.get_database_backend(),
r#"CREATE INDEX signature_processed_at_not_null_index ON tree_transactions (signature, processed_at) WHERE processed_at IS NOT NULL"#,
[]
);

db.execute(stmt).await?;

manager
.create_index(
Index::create()
Expand All @@ -48,10 +40,6 @@ impl MigrationTrait for Migration {
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(Index::drop().name("signature_processed_at_null_index").table(TreeTransactions::Table).to_owned())
.await?;

manager
.drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned())
.await?;
Expand Down
6 changes: 1 addition & 5 deletions tree_backfiller/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@



# Tree Backfiller

The Tree Backfiller crawls all trees on-chain and backfills any transactions related to a tree that have not already been observed.
Expand All @@ -18,7 +15,7 @@ Usage: das-tree-backfiller run [OPTIONS] --solana-rpc-url <SOLANA_RPC_URL> --dat

Options:
--solana-rpc-url <SOLANA_RPC_URL>
Solana RPC URL [env: SOLANA_RPC_URL=https://index.rpcpool.com/a4d23a00546272efeba9843a4ae4R]
Solana RPC URL [env: SOLANA_RPC_URL=]
--tree-crawler-count <TREE_CRAWLER_COUNT>
Number of tree crawler workers [env: TREE_CRAWLER_COUNT=] [default: 100]
--signature-channel-size <SIGNATURE_CHANNEL_SIZE>
Expand Down Expand Up @@ -51,7 +48,6 @@ The Tree Backfiller provides several metrics for monitoring performance and stat

Metric | Description
--- | ---
transaction.workers | Gauge of active worker threads
transaction.failed | Count of failed transaction
transaction.queued | Time for a transaction to be queued
tree.crawled | Time to crawl a tree
Expand Down
134 changes: 35 additions & 99 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ use log::{error, info};
use sea_orm::SqlxPostgresConnector;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Signature;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::{mpsc, Semaphore};

Expand All @@ -28,12 +26,15 @@ pub struct Args {
pub tree_crawler_count: usize,

/// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel.
#[arg(long, env, default_value = "10000")]
#[arg(long, env, default_value = "1000")]
pub signature_channel_size: usize,

#[arg(long, env, default_value = "1000")]
pub queue_channel_size: usize,

#[arg(long, env)]
pub only_trees: Option<Vec<String>>,

/// Database configuration
#[clap(flatten)]
pub database: db::PoolArgs,
Expand All @@ -47,50 +48,6 @@ pub struct Args {
pub metrics: MetricsArgs,
}

/// A thread-safe counter.
pub struct Counter(Arc<AtomicUsize>);

impl Counter {
/// Creates a new counter initialized to zero.
pub fn new() -> Self {
Self(Arc::new(AtomicUsize::new(0)))
}

/// Increments the counter by one.
pub fn increment(&self) {
self.0.fetch_add(1, Ordering::SeqCst);
}

/// Decrements the counter by one.
pub fn decrement(&self) {
self.0.fetch_sub(1, Ordering::SeqCst);
}

/// Returns the current value of the counter.
pub fn get(&self) -> usize {
self.0.load(Ordering::SeqCst)
}

/// Returns a future that resolves when the counter reaches zero.
/// The future periodically checks the counter value and sleeps for a short duration.
pub fn zero(&self) -> impl std::future::Future<Output = ()> {
let counter = self.clone();
async move {
while counter.get() > 0 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}

impl Clone for Counter {
/// Returns a clone of the counter.
/// The returned counter shares the same underlying atomic integer.
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}

/// The main function for running the backfiller.
///
/// This function does the following:
Expand Down Expand Up @@ -124,54 +81,12 @@ pub async fn run(config: Args) -> Result<()> {
let queue_metrics = metrics.clone();

let (queue_sender, mut queue_receiver) = mpsc::channel::<Vec<u8>>(config.queue_channel_size);
let signature_queue_sender = queue_sender.clone();
let (sig_sender, mut sig_receiver) = mpsc::channel::<Signature>(config.signature_channel_size);

let transaction_worker_count = Counter::new();
let transaction_worker_count_check = transaction_worker_count.clone();

tokio::spawn(async move {
loop {
tokio::select! {
Some(signature) = sig_receiver.recv() => {
let solana_rpc = Arc::clone(&sig_solana_rpc);
let transaction_worker_count_sig = transaction_worker_count.clone();
let queue_sender = queue_sender.clone();
let metrics = signature_metrics.clone();

transaction_worker_count_sig.increment();

if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) {
metrics.gauge("transaction.workers", transaction_workers_running);
}

let transaction_task = async move {
let timing = Instant::now();
if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await {
metrics.increment("transaction.failed");
error!("retrieving transaction: {:?}", e);
}

transaction_worker_count_sig.decrement();

if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) {
metrics.gauge("transaction.workers", transaction_workers_running);
}

metrics.time("transaction.queued", timing.elapsed());
};

tokio::spawn(transaction_task);
},
else => break,
}
}

Ok::<(), anyhow::Error>(())
});

let queue_handler = tokio::spawn(async move {
let mut queue = queue::Queue::setup(config.queue).await?;
let mut queue = queue::Queue::setup(config.queue).await?;

let queue_handle = tokio::spawn(async move {
while let Some(data) = queue_receiver.recv().await {
if let Err(e) = queue.push(&data).await {
queue_metrics.increment("transaction.failed");
Expand All @@ -180,8 +95,25 @@ pub async fn run(config: Args) -> Result<()> {
queue_metrics.increment("transaction.succeeded");
}
}
});

Ok::<(), anyhow::Error>(())
let signature_handle = tokio::spawn(async move {
while let Some(signature) = sig_receiver.recv().await {
let solana_rpc = Arc::clone(&sig_solana_rpc);
let queue_sender = signature_queue_sender.clone();
let metrics = signature_metrics.clone();

tokio::spawn(async move {
let timing = Instant::now();

if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await {
metrics.increment("transaction.failed");
error!("sending to queue: {:?}", e);
} else {
metrics.time("transaction.queued", timing.elapsed());
}
});
}
});

let started = Instant::now();
Expand All @@ -196,7 +128,7 @@ pub async fn run(config: Args) -> Result<()> {
);

let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count));
let mut crawl_handlers = Vec::with_capacity(tree_count);
let mut crawl_handles = Vec::with_capacity(tree_count);

for tree in trees {
let client = Arc::clone(&solana_rpc);
Expand All @@ -206,7 +138,7 @@ pub async fn run(config: Args) -> Result<()> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let metrics = tree_metrics.clone();

let crawl_handler = tokio::spawn(async move {
let crawl_handle = tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();
Expand All @@ -223,12 +155,16 @@ pub async fn run(config: Args) -> Result<()> {
Ok::<(), anyhow::Error>(())
});

crawl_handlers.push(crawl_handler);
crawl_handles.push(crawl_handle);
}

futures::future::try_join_all(crawl_handlers).await?;
transaction_worker_count_check.zero().await;
let _ = queue_handler.await?;
futures::future::try_join_all(crawl_handles).await?;
drop(sig_sender);

signature_handle.await?;
drop(queue_sender);

queue_handle.await?;

metrics.time("job.completed", started.elapsed());

Expand Down
6 changes: 0 additions & 6 deletions tree_backfiller/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ impl Metrics {
}
}

pub fn gauge(&self, key: &str, amount: u64) {
if let Err(e) = self.0.gauge(key, amount) {
error!("submitting gauge: {:?}", e)
}
}

pub fn increment(&self, key: &str) {
if let Err(e) = self.0.count(key, 1) {
error!("submitting increment: {:?}", e)
Expand Down
7 changes: 3 additions & 4 deletions tree_backfiller/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use figment::value::{Dict, Value};
use plerkle_messenger::{
redis_messenger::RedisMessenger, Messenger, MessengerConfig, MessengerError, MessengerType,
};
use std::sync::{Arc, Mutex};

const TRANSACTION_BACKFILL_STREAM: &str = "TXNFILL";
const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL";

#[derive(Clone, Debug, Parser)]
pub struct QueueArgs {
Expand Down Expand Up @@ -33,11 +34,10 @@ impl From<QueueArgs> for MessengerConfig {

Self {
messenger_type: MessengerType::Redis,
connection_config,
connection_config: connection_config,
}
}
}

#[derive(Debug)]
pub struct Queue(RedisMessenger);

Expand All @@ -46,7 +46,6 @@ impl Queue {
let mut messenger = RedisMessenger::new(config.clone().into()).await?;

messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?;

messenger
.set_buffer_size(
TRANSACTION_BACKFILL_STREAM,
Expand Down
26 changes: 7 additions & 19 deletions tree_backfiller/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use borsh::BorshDeserialize;
use clap::Args;
use digital_asset_types::dao::tree_transactions;
use flatbuffers::FlatBufferBuilder;
use log::info;
use log::{error, info};
use plerkle_serialization::serializer::seralize_encoded_transaction_with_status;
use sea_orm::{
sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait,
Expand Down Expand Up @@ -53,6 +53,8 @@ pub enum TreeErrorKind {
PlerkleMessenger(#[from] plerkle_messenger::MessengerError),
#[error("queue send")]
QueueSend(#[from] tokio::sync::mpsc::error::SendError<Vec<u8>>),
#[error("parse pubkey")]
ParsePubkey(#[from] solana_sdk::pubkey::ParsePubkeyError),
}
#[derive(Debug, Clone)]
pub struct TreeHeaderResponse {
Expand Down Expand Up @@ -108,7 +110,7 @@ impl TreeResponse {
let mut before = None;

let until = tree_transactions::Entity::find()
.filter(tree_transactions::Column::Tree.eq(self.pubkey.as_ref()))
.filter(tree_transactions::Column::Tree.eq(self.pubkey.to_string()))
.order_by_desc(tree_transactions::Column::Slot)
.one(&conn)
.await?
Expand All @@ -133,35 +135,21 @@ impl TreeResponse {
let slot = i64::try_from(sig.slot)?;
let sig = Signature::from_str(&sig.signature)?;

let tree_transaction_processed = tree_transactions::Entity::find()
.filter(
tree_transactions::Column::Signature
.eq(sig.to_string())
.and(tree_transactions::Column::ProcessedAt.is_not_null()),
)
.one(&conn)
.await?;

if tree_transaction_processed.is_some() {
info!("skipping previously processed transaction {}", sig);
continue;
}

let tree_transaction = tree_transactions::ActiveModel {
signature: Set(sig.to_string()),
tree: Set(self.pubkey.as_ref().to_vec()),
tree: Set(self.pubkey.to_string()),
slot: Set(slot),
..Default::default()
};

tree_transactions::Entity::insert(tree_transaction)
let _ = tree_transactions::Entity::insert(tree_transaction)
.on_conflict(
OnConflict::column(tree_transactions::Column::Signature)
.do_nothing()
.to_owned(),
)
.exec(&conn)
.await?;
.await;

sender.send(sig).await?;

Expand Down
Loading