diff --git a/digital_asset_types/Cargo.toml b/digital_asset_types/Cargo.toml index 8e4b18bb9..07a08f468 100644 --- a/digital_asset_types/Cargo.toml +++ b/digital_asset_types/Cargo.toml @@ -7,10 +7,17 @@ publish = false [dependencies] spl-concurrent-merkle-tree = "0.2.0" -sea-orm = { optional = true, version = "0.10.6", features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] } +sea-orm = { optional = true, version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", +] } sea-query = { version = "0.28.1", features = ["postgres-array"] } serde = { version = "1.0.137", optional = true } -serde_json = { version = "1.0.81", optional = true, features=["preserve_order"] } +serde_json = { version = "1.0.81", optional = true, features = [ + "preserve_order", +] } bs58 = "0.4.0" borsh = { version = "~0.10.3", optional = true } borsh-derive = { version = "~0.10.3", optional = true } diff --git a/digital_asset_types/src/dao/generated/mod.rs b/digital_asset_types/src/dao/generated/mod.rs index 5db9a8690..64fef9216 100644 --- a/digital_asset_types/src/dao/generated/mod.rs +++ b/digital_asset_types/src/dao/generated/mod.rs @@ -16,3 +16,4 @@ pub mod sea_orm_active_enums; pub mod tasks; pub mod token_accounts; pub mod tokens; +pub mod tree_transactions; diff --git a/digital_asset_types/src/dao/generated/prelude.rs b/digital_asset_types/src/dao/generated/prelude.rs index 79759cd1c..76403b9e1 100644 --- a/digital_asset_types/src/dao/generated/prelude.rs +++ b/digital_asset_types/src/dao/generated/prelude.rs @@ -13,3 +13,4 @@ pub use super::raw_txn::Entity as RawTxn; pub use super::tasks::Entity as Tasks; pub use super::token_accounts::Entity as TokenAccounts; pub use super::tokens::Entity as Tokens; +pub use super::tree_transactions::Entity as TreeTransactions; diff --git a/digital_asset_types/src/dao/generated/tree_transactions.rs b/digital_asset_types/src/dao/generated/tree_transactions.rs new file mode 100644 index 000000000..d1eae60f8 --- /dev/null +++ b/digital_asset_types/src/dao/generated/tree_transactions.rs @@ -0,0 +1,67 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 + +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "tree_transactions" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)] +pub struct Model { + pub signature: String, + pub tree: Vec, + pub slot: i64, + pub created_at: Option, + pub processed_at: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + Signature, + Tree, + Slot, + CreatedAt, + ProcessedAt, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + Signature, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = String; + fn auto_increment() -> bool { + false + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::Signature => ColumnType::Char(Some(64u32)).def(), + Self::Tree => ColumnType::Binary.def(), + Self::Slot => ColumnType::BigInteger.def(), + Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(), + Self::ProcessedAt => ColumnType::TimestampWithTimeZone.def().null(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 7e38ac93d..0ff8f26c6 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -30,6 +30,7 @@ mod m20230724_120101_add_group_info_seq; mod m20230726_013107_remove_not_null_constraint_from_group_value; mod m20230918_182123_add_raw_name_symbol; mod m20230919_072154_cl_audits; +mod m20231208_103949_create_tree_transactions_table; pub struct Migrator; @@ -67,6 +68,7 @@ impl MigratorTrait for Migrator { Box::new(m20230726_013107_remove_not_null_constraint_from_group_value::Migration), Box::new(m20230918_182123_add_raw_name_symbol::Migration), Box::new(m20230919_072154_cl_audits::Migration), + Box::new(m20231208_103949_create_tree_transactions_table::Migration), ] } } diff --git a/migration/src/m20231208_103949_create_tree_transactions_table.rs b/migration/src/m20231208_103949_create_tree_transactions_table.rs new file mode 100644 index 000000000..e592d0c7a --- /dev/null +++ b/migration/src/m20231208_103949_create_tree_transactions_table.rs @@ -0,0 +1,61 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(TreeTransactions::Table) + .if_not_exists() + .col( + ColumnDef::new(TreeTransactions::Signature) + .char_len(64) + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(TreeTransactions::Tree).binary().not_null()) + .col(ColumnDef::new(TreeTransactions::Slot).big_integer().not_null()) + .col(ColumnDef::new(TreeTransactions::CreatedAt).timestamp_with_time_zone().default("now()")) + .col(ColumnDef::new(TreeTransactions::ProcessedAt).timestamp_with_time_zone()) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("tree_slot_index") + .table(TreeTransactions::Table) + .col(TreeTransactions::Tree) + .col(TreeTransactions::Slot) + .unique() + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned()) + .await?; + + manager + .drop_table(Table::drop().table(TreeTransactions::Table).to_owned()) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum TreeTransactions { + Table, + Signature, + Tree, + CreatedAt, + ProcessedAt, + Slot, +} diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 1115020c7..fabf0e4c2 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -33,7 +33,6 @@ sea-orm = { version = "0.10.6", features = [ "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", - "mock", ] } sea-query = { version = "0.28.1", features = ["postgres-array"] } chrono = "0.4.19" diff --git a/tools/bgtask_creator/Cargo.toml b/tools/bgtask_creator/Cargo.toml index d2f95f151..bf51d8f01 100644 --- a/tools/bgtask_creator/Cargo.toml +++ b/tools/bgtask_creator/Cargo.toml @@ -7,15 +7,30 @@ publish = false [dependencies] anyhow = "1.0.70" clap = { version = "4.1.4", features = ["derive", "cargo"] } -digital_asset_types = { path = "../../digital_asset_types", features = ["json_types", "sql_types"] } +digital_asset_types = { path = "../../digital_asset_types", features = [ + "json_types", + "sql_types", +] } futures = "0.3.25" lazy_static = "1.4.0" log = "0.4.17" nft_ingester = { path = "../../nft_ingester" } prometheus = "0.13.3" -sea-orm = { version = "0.10.6", features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] } +sea-orm = { version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", +] } sea-query = { version = "0.28.1", features = ["postgres-array"] } solana-sdk = "~1.16.16" -sqlx = { version = "0.6.2", features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] } +sqlx = { version = "0.6.2", features = [ + "macros", + "runtime-tokio-rustls", + "postgres", + "uuid", + "offline", + "json", +] } tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] } txn_forwarder = { path = "../txn_forwarder" } diff --git a/tools/tree-status/Cargo.toml b/tools/tree-status/Cargo.toml index 52abc07e5..cbaf4581e 100644 --- a/tools/tree-status/Cargo.toml +++ b/tools/tree-status/Cargo.toml @@ -11,7 +11,10 @@ anchor-client = "0.28.0" anyhow = "1.0.70" bs58 = "0.4.0" clap = { version = "4.1.4", features = ["derive"] } -digital_asset_types = { path = "../../digital_asset_types", features = ["json_types", "sql_types"] } +digital_asset_types = { path = "../../digital_asset_types", features = [ + "json_types", + "sql_types", +] } env_logger = "0.10.0" flatbuffers = "23.1.21" futures = "0.3.28" @@ -19,14 +22,32 @@ hex = "0.4.3" lazy_static = "1.4.0" log = "0.4.17" prometheus = "0.13.3" -sea-orm = { version = "0.10.6", features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] } +sea-orm = { version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", +] } serde_json = "1.0.81" solana-client = "~1.16.16" solana-sdk = "~1.16.16" solana-transaction-status = "~1.16.16" spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } spl-noop = { version = "0.2.0", features = ["no-entrypoint"] } -sqlx = { version = "0.6.2", features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] } +sqlx = { version = "0.6.2", features = [ + "macros", + "runtime-tokio-rustls", + "postgres", + "uuid", + "offline", + "json", +] } thiserror = "1.0.31" -tokio = { version = "1.23.0", features = ["fs", "macros", "rt-multi-thread", "sync", "time"] } +tokio = { version = "1.23.0", features = [ + "fs", + "macros", + "rt-multi-thread", + "sync", + "time", +] } txn_forwarder = { path = "../txn_forwarder" } diff --git a/tree_backfiller/Cargo.toml b/tree_backfiller/Cargo.toml index 4d4b1285d..db24fa44b 100644 --- a/tree_backfiller/Cargo.toml +++ b/tree_backfiller/Cargo.toml @@ -36,7 +36,6 @@ sea-orm = { version = "0.10.6", features = [ "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", - "mock", ] } sea-query = { version = "0.28.1", features = ["postgres-array"] } chrono = "0.4.19" diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index 6c21ddd9b..4e477424a 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -1,31 +1,96 @@ +use crate::db; use crate::tree; use anyhow::Result; use clap::Parser; +use digital_asset_types::dao::tree_transactions; +use futures::FutureExt; use log::{debug, error, info}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Signature; +use std::panic::{self, AssertUnwindSafe}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::{mpsc, Mutex, Semaphore}; +use tokio::time::{timeout, Duration}; #[derive(Debug, Parser, Clone)] -pub struct Config { +pub struct Args { /// Solana RPC URL #[arg(long, env)] pub solana_rpc_url: String, /// Number of tree crawler workers - #[arg(long, env, default_value = "100")] + #[arg(long, env, default_value = "1")] pub tree_crawler_count: usize, /// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel. - #[arg(long, env, default_value = "1000")] + #[arg(long, env, default_value = "1")] pub signature_channel_size: usize, + + #[arg(long, env, default_value = "3000")] + pub transaction_check_timeout: u64, + + /// Database configuration + #[clap(flatten)] + pub database: db::PoolArgs, +} + +/// A thread-safe counter. +pub struct Counter(Arc); + +impl Counter { + /// Creates a new counter initialized to zero. + pub fn new() -> Self { + Self(Arc::new(AtomicUsize::new(0))) + } + + /// Increments the counter by one. + pub fn increment(&self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + + /// Decrements the counter by one. + pub fn decrement(&self) { + self.0.fetch_sub(1, Ordering::SeqCst); + } + + /// Returns the current value of the counter. + pub fn get(&self) -> usize { + self.0.load(Ordering::SeqCst) + } + + /// Returns a future that resolves when the counter reaches zero. + /// The future periodically checks the counter value and sleeps for a short duration. + pub fn zero(&self) -> impl std::future::Future { + let counter = self.clone(); + async move { + while counter.get() > 0 { + println!("Counter value: {}", counter.get()); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } +} + +impl Clone for Counter { + /// Returns a clone of the counter. + /// The returned counter shares the same underlying atomic integer. + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } } -pub async fn run(config: Config) -> Result<()> { +/// Runs the tree backfiller. +/// +/// This function takes a `Config` as input and returns a `Result<()>`. +/// It creates an `RpcClient` and retrieves all trees. +/// It then spawns a thread for each tree and a separate thread to handle transaction workers. +/// The function waits for all threads to finish before returning. +pub async fn run(config: Args) -> Result<()> { let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url)); + let conn = db::connect(config.database).await?; + let trees = tree::all(&solana_rpc).await?; let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); @@ -34,28 +99,47 @@ pub async fn run(config: Config) -> Result<()> { let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); let sig_solana_rpc = Arc::clone(&solana_rpc); - let transaction_worker_count = Arc::new(AtomicUsize::new(0)); - let transaction_worker_count_check = Arc::clone(&transaction_worker_count); + let transaction_worker_count = Counter::new(); + let transaction_worker_count_check = transaction_worker_count.clone(); tokio::spawn(async move { loop { tokio::select! { Some(signature) = sig_receiver.recv() => { - // TODO: possibly limit spawn rate. currently limited by the sign channel size let solana_rpc = Arc::clone(&sig_solana_rpc); - let transaction_worker_count = Arc::clone(&transaction_worker_count); + let transaction_worker_count_sig = transaction_worker_count.clone(); + let transaction_worker_count_guard = transaction_worker_count.clone(); - transaction_worker_count.fetch_add(1, Ordering::SeqCst); + transaction_worker_count_sig.increment(); - tokio::spawn(async move { + let transaction_task = async move { match tree::transaction(solana_rpc, signature).await { Ok(builder) => {} - Err(e) => error!("retrieving transaction: {:?}", e), + Err(e) => println!("error retrieving transaction: {:?}", e), } - transaction_worker_count.fetch_sub(1, Ordering::SeqCst); + transaction_worker_count_sig.decrement() + }; + + let guarded_task = AssertUnwindSafe(transaction_task).catch_unwind(); + let timed_task = tokio::spawn(async move { + timeout(Duration::from_millis(config.transaction_check_timeout), guarded_task).await }); - } + + let _ = tokio::spawn(async move { + match timed_task.await { + Ok(Ok(_)) => {} + Ok(Err(_)) => { + println!("Task panicked"); + transaction_worker_count_guard.decrement() + }, + Err(_) => { + println!("Task timed out"); + transaction_worker_count_guard.decrement() + } + }}); + + }, else => break, } } @@ -67,12 +151,13 @@ pub async fn run(config: Config) -> Result<()> { let solana_rpc = Arc::clone(&solana_rpc); let semaphore = Arc::clone(&semaphore); let sig_sender = sig_sender.clone(); + let conn = conn.clone(); let crawl_handler = tokio::spawn(async move { let _permit = semaphore.acquire().await?; - if let Err(e) = tree::crawl(solana_rpc, sig_sender, tree).await { - error!("crawling tree: {:?}", e); + if let Err(e) = tree::crawl(solana_rpc, sig_sender, &conn, tree).await { + println!("error crawling tree: {:?}", e); } Ok::<(), anyhow::Error>(()) @@ -82,10 +167,7 @@ pub async fn run(config: Config) -> Result<()> { } futures::future::try_join_all(crawl_handlers).await?; - - while transaction_worker_count_check.load(Ordering::SeqCst) > 0 { - std::thread::sleep(std::time::Duration::from_millis(100)); - } + transaction_worker_count_check.zero().await; Ok(()) } diff --git a/tree_backfiller/src/db.rs b/tree_backfiller/src/db.rs new file mode 100644 index 000000000..9490cfed6 --- /dev/null +++ b/tree_backfiller/src/db.rs @@ -0,0 +1,23 @@ +use anyhow::Result; +use clap::Parser; +use sea_orm::{ConnectOptions, Database, DatabaseConnection, DbErr}; + +#[derive(Debug, Parser, Clone)] +pub struct PoolArgs { + #[arg(long, env)] + pub database_url: String, + #[arg(long, env, default_value = "125")] + pub database_max_connections: u32, + #[arg(long, env, default_value = "5")] + pub database_min_connections: u32, +} + +pub async fn connect(config: PoolArgs) -> Result { + let mut options = ConnectOptions::new(config.database_url); + + options + .min_connections(config.database_min_connections) + .max_connections(config.database_max_connections); + + Database::connect(options).await +} diff --git a/tree_backfiller/src/main.rs b/tree_backfiller/src/main.rs index 82d561439..7e8cf520f 100644 --- a/tree_backfiller/src/main.rs +++ b/tree_backfiller/src/main.rs @@ -1,43 +1,31 @@ mod backfiller; +mod db; mod tree; use anyhow::Result; use clap::{Parser, Subcommand}; use log::{debug, error, info}; -use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::signature::Signature; -use std::sync::Arc; -use tokio::sync::Semaphore; #[derive(Debug, Parser)] #[clap(author, version)] struct Args { #[command(subcommand)] - action: ArgsAction, + command: Command, } #[derive(Debug, Clone, Subcommand)] -enum ArgsAction { +enum Command { /// The 'run' command is used to cross-reference the index against on-chain accounts. /// It crawls through trees and backfills any missed tree transactions. /// This is particularly useful for ensuring data consistency and completeness. #[command(name = "run")] - Run(backfiller::Config), + Run(backfiller::Args), } #[tokio::main] async fn main() -> Result<()> { - // lookup all trees - // fetch transaction for tree - // tx already exist in db and processed then next else write transaction to database - // fetch and parse block for transaction - - // tree::lookup_all(rpc, tree_channel) - // tree::crawl_tree(rpc, conn, tree_channel) - // forward::send(rpc, conn, tree_channel) - let args = Args::parse(); - match args.action { - ArgsAction::Run(config) => backfiller::run(config).await, + match args.command { + Command::Run(config) => backfiller::run(config).await, } } diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs index e998812b7..cf2e0ed37 100644 --- a/tree_backfiller/src/tree.rs +++ b/tree_backfiller/src/tree.rs @@ -1,15 +1,17 @@ use anyhow::Result; use borsh::BorshDeserialize; use clap::Args; +use digital_asset_types::dao::tree_transactions; use flatbuffers::FlatBufferBuilder; use log::debug; use plerkle_messenger::{Messenger, TRANSACTION_BACKFILL_STREAM}; use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder}; use solana_account_decoder::UiAccountEncoding; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, - rpc_config::{RpcAccountInfoConfig, RpcBlockConfig, RpcProgramAccountsConfig}, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionConfig}, rpc_filter::{Memcmp, RpcFilterType}, }; use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; @@ -19,6 +21,7 @@ use spl_account_compression::state::{ merkle_tree_get_size, ConcurrentMerkleTreeHeader, ConcurrentMerkleTreeHeaderDataV1, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, }; +use sqlx::{Pool, Postgres}; use std::str::FromStr; use std::sync::Arc; use thiserror::Error as ThisError; @@ -35,10 +38,10 @@ pub struct ConfigBackfiller { #[derive(ThisError, Debug)] pub enum TreeErrorKind { - #[error("solana tree gpa")] - FetchAll(#[from] solana_client::client_error::ClientError), - #[error("anchor struct deserialize")] - AchorDeserialize(#[from] anchor_client::anchor_lang::error::Error), + #[error("solana rpc")] + Rpc(#[from] solana_client::client_error::ClientError), + #[error("anchor")] + Achor(#[from] anchor_client::anchor_lang::error::Error), #[error("perkle serialize")] PerkleSerialize(#[from] plerkle_serialization::error::PlerkleSerializationError), } @@ -104,8 +107,7 @@ pub async fn all(client: &Arc) -> Result, TreeError Ok(client .get_program_accounts_with_config(&id(), config) - .await - .map_err(TreeErrorKind::FetchAll)? + .await? .into_iter() .filter_map(|(pubkey, account)| TreeResponse::from_rpc(pubkey, account).ok()) .collect()) @@ -114,21 +116,26 @@ pub async fn all(client: &Arc) -> Result, TreeError pub async fn crawl( client: Arc, sig_sender: Sender, + conn: &DatabaseConnection, tree: TreeResponse, ) -> Result<()> { - println!("crawl tree: {:?}", tree.pubkey); + let mut before = None; - // TODO: check db for tree_transactions picking the sig of the last processed transaction. `SELECT signature FROM tree_transactions WHERE tree = $1 ORDER BY position ASC LIMIT 1` - let mut last_sig = None; - loop { - let before = last_sig; + let until = tree_transactions::Entity::find() + .filter(tree_transactions::Column::Tree.eq(tree.pubkey.as_ref())) + .order_by_desc(tree_transactions::Column::Slot) + .one(conn) + .await? + .map(|t| Signature::from_str(&t.signature).ok()) + .flatten(); + loop { let sigs = client .get_signatures_for_address_with_config( &tree.pubkey, GetConfirmedSignaturesForAddress2Config { before, - until: None, + until, ..GetConfirmedSignaturesForAddress2Config::default() }, ) @@ -136,11 +143,11 @@ pub async fn crawl( for sig in sigs.iter() { let sig = Signature::from_str(&sig.signature)?; - println!("send signature: {:?}", sig.clone()); + println!("sig: {}", sig); sig_sender.send(sig.clone()).await?; - last_sig = Some(sig); + before = Some(sig); } if sigs.len() < GET_SIGNATURES_FOR_ADDRESS_LIMIT { @@ -156,11 +163,16 @@ pub async fn transaction<'a>( signature: Signature, ) -> Result, TreeErrorKind> { let transaction = client - .get_transaction(&signature, UiTransactionEncoding::Base58) + .get_transaction_with_config( + &signature, + RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Base58), + max_supported_transaction_version: Some(0), + ..RpcTransactionConfig::default() + }, + ) .await?; - println!("transaction: {:?}", signature); - Ok(seralize_encoded_transaction_with_status( FlatBufferBuilder::new(), transaction,