diff --git a/Cargo.toml b/Cargo.toml index ce77458..8a9174b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,11 +27,11 @@ integration = [] [dependencies] init4-bin-base = { version = "0.4.1", features = ["perms"] } -signet-constants = { git = "https://github.com/init4tech/signet-sdk", rev = "bd183b627dcb0eb682da801093b13f1f8311446b" } -signet-sim = { git = "https://github.com/init4tech/signet-sdk", rev = "bd183b627dcb0eb682da801093b13f1f8311446b" } -signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", rev = "bd183b627dcb0eb682da801093b13f1f8311446b" } -signet-types = { git = "https://github.com/init4tech/signet-sdk", rev = "bd183b627dcb0eb682da801093b13f1f8311446b" } -signet-zenith = { git = "https://github.com/init4tech/signet-sdk", rev = "bd183b627dcb0eb682da801093b13f1f8311446b" } +signet-constants = { git = "https://github.com/init4tech/signet-sdk", rev = "ba5894f6fac35299d495ae115cd465a1f0791637" } +signet-sim = { git = "https://github.com/init4tech/signet-sdk", rev = "ba5894f6fac35299d495ae115cd465a1f0791637" } +signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", rev = "ba5894f6fac35299d495ae115cd465a1f0791637" } +signet-types = { git = "https://github.com/init4tech/signet-sdk", rev = "ba5894f6fac35299d495ae115cd465a1f0791637" } +signet-zenith = { git = "https://github.com/init4tech/signet-sdk", rev = "ba5894f6fac35299d495ae115cd465a1f0791637" } trevm = { version = "0.23.4", features = ["concurrent-db", "test-utils"] } diff --git a/bin/submit_transaction.rs b/bin/submit_transaction.rs index abf45d3..71b10f4 100644 --- a/bin/submit_transaction.rs +++ b/bin/submit_transaction.rs @@ -11,7 +11,7 @@ use builder::config::HostProvider; use init4_bin_base::{ deps::{ metrics::{counter, histogram}, - tracing, + tracing::{debug, error}, }, init4, utils::{from_env::FromEnv, signer::LocalOrAwsConfig}, @@ -52,16 +52,17 @@ async fn main() { let _guard = init4(); let config = Config::from_env().unwrap(); - tracing::trace!("connecting to provider"); + debug!(?config.recipient_address, "connecting to provider"); + let provider = config.provider().await; let recipient_address = config.recipient_address; let sleep_time = config.sleep_time; loop { - tracing::debug!("attempting transaction"); + debug!(?recipient_address, "attempting transaction"); send_transaction(&provider, recipient_address).await; - tracing::debug!(sleep_time, "sleeping"); + debug!(sleep_time, "sleeping"); tokio::time::sleep(tokio::time::Duration::from_secs(sleep_time)).await; } } @@ -78,18 +79,17 @@ async fn send_transaction(provider: &HostProvider, recipient_address: Address) { let dispatch_start_time: Instant = Instant::now(); // dispatch the transaction - tracing::debug!("dispatching transaction"); let result = provider.send_transaction(tx).await.unwrap(); // wait for the transaction to mine let receipt = match timeout(Duration::from_secs(240), result.get_receipt()).await { Ok(Ok(receipt)) => receipt, Ok(Err(e)) => { - tracing::error!(error = ?e, "failed to get transaction receipt"); + error!(error = ?e, "failed to get transaction receipt"); return; } Err(_) => { - tracing::error!("timeout waiting for transaction receipt"); + error!("timeout waiting for transaction receipt"); counter!("txn_submitter.tx_timeout").increment(1); return; } @@ -99,6 +99,6 @@ async fn send_transaction(provider: &HostProvider, recipient_address: Address) { // record metrics for how long it took to mine the transaction let mine_time = dispatch_start_time.elapsed().as_secs(); - tracing::debug!(success = receipt.status(), mine_time, hash, "transaction mined"); + debug!(success = receipt.status(), mine_time, hash, "transaction mined"); histogram!("txn_submitter.tx_mine_time").record(mine_time as f64); } diff --git a/src/config.rs b/src/config.rs index 593eceb..f869fad 100644 --- a/src/config.rs +++ b/src/config.rs @@ -72,8 +72,9 @@ pub struct BuilderConfig { /// NOTE: should not include the host_rpc_url value #[from_env( var = "TX_BROADCAST_URLS", - desc = "Additional RPC URLs to which to broadcast transactions", - infallible + desc = "Additional RPC URLs to which the builder broadcasts transactions", + infallible, + optional )] pub tx_broadcast_urls: Vec>, /// address of the Zenith contract on Host. diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index dad6482..6868837 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -94,10 +94,10 @@ impl Simulator { self.config.rollup_block_gas_limit, ); - let block = block_build.build().await; - debug!(block = ?block, "finished block simulation"); + let built_block = block_build.build().await; + debug!(block_number = ?built_block.block_number(), "finished building block"); - Ok(block) + Ok(built_block) } /// Spawns the simulator task, which handles the setup and sets the deadline @@ -155,8 +155,7 @@ impl Simulator { // If no env, skip this run let Some(block_env) = self.block_env.borrow_and_update().clone() else { return }; - - debug!(block_env = ?block_env, "building on block"); + debug!(block_env = ?block_env, "building on block env"); match self.handle_build(constants, sim_cache, finish_by, block_env).await { Ok(block) => { diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 7f9a1dd..e5e002b 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -13,7 +13,7 @@ use alloy::{ sol_types::{SolCall, SolError}, transports::TransportError, }; -use eyre::{bail, eyre}; +use eyre::bail; use init4_bin_base::deps::{ metrics::{counter, histogram}, tracing::{self, Instrument, debug, debug_span, error, info, instrument, warn}, @@ -22,11 +22,18 @@ use signet_sim::BuiltBlock; use signet_types::{SignRequest, SignResponse}; use signet_zenith::{ BundleHelper::{self, BlockHeader, FillPermit2, submitCall}, - Zenith::IncorrectHostBlock, + Zenith::{self, IncorrectHostBlock}, }; -use std::time::Instant; +use std::time::{Instant, UNIX_EPOCH}; use tokio::{sync::mpsc, task::JoinHandle}; +/// Base maximum fee per gas to use as a starting point for retry bumps +pub const BASE_FEE_PER_GAS: u128 = 10 * GWEI_TO_WEI as u128; +/// Base max priority fee per gas to use as a starting point for retry bumps +pub const BASE_MAX_PRIORITY_FEE_PER_GAS: u128 = 2 * GWEI_TO_WEI as u128; +/// Base maximum fee per blob gas to use as a starting point for retry bumps +pub const BASE_MAX_FEE_PER_BLOB_GAS: u128 = GWEI_TO_WEI as u128; + macro_rules! spawn_provider_send { ($provider:expr, $tx:expr) => { { @@ -57,13 +64,10 @@ pub enum ControlFlow { pub struct SubmitTask { /// Zenith pub zenith: ZenithInstance, - /// Quincey pub quincey: Quincey, - /// Config pub config: crate::config::BuilderConfig, - /// Channel over which to send pending transactions pub outbound_tx_channel: mpsc::UnboundedSender, } @@ -91,7 +95,7 @@ impl SubmitTask { }) } - /// Builds blob transaction from the provided header and signature values + /// Builds blob transaction and encodes the sidecar for it from the provided header and signature values fn build_blob_tx( &self, fills: Vec, @@ -99,90 +103,154 @@ impl SubmitTask { v: u8, r: FixedBytes<32>, s: FixedBytes<32>, - in_progress: &BuiltBlock, + block: &BuiltBlock, ) -> eyre::Result { let data = submitCall { fills, header, v, r, s }.abi_encode(); - let sidecar = in_progress.encode_blob::().build()?; - Ok(TransactionRequest::default() - .with_blob_sidecar(sidecar) - .with_input(data) - .with_max_priority_fee_per_gas((GWEI_TO_WEI * 16) as u128)) - } + let sidecar = block.encode_blob::().build()?; - /// Returns the next host block height - async fn next_host_block_height(&self) -> eyre::Result { - let result = self.provider().get_block_number().await?; - let next = result.checked_add(1).ok_or_else(|| eyre!("next host block height overflow"))?; - Ok(next) + Ok(TransactionRequest::default().with_blob_sidecar(sidecar).with_input(data)) } - /// Submits the EIP 4844 transaction to the network + /// Prepares and sends the EIP-4844 transaction with a sidecar encoded with a rollup block to the network. async fn submit_transaction( &self, + retry_count: usize, resp: &SignResponse, - in_progress: &BuiltBlock, + block: &BuiltBlock, ) -> eyre::Result { - let (v, r, s) = extract_signature_components(&resp.sig); + let tx = self.prepare_tx(retry_count, resp, block).await?; - let header = BlockHeader { - hostBlockNumber: resp.req.host_block_number, - rollupChainId: U256::from(self.config.ru_chain_id), - gasLimit: resp.req.gas_limit, - rewardAddress: resp.req.ru_reward_address, - blockDataHash: *in_progress.contents_hash(), - }; + self.send_transaction(resp, tx).await + } - let fills = vec![]; // NB: ignored until fills are implemented - let tx = self - .build_blob_tx(fills, header, v, r, s, in_progress)? - .with_from(self.provider().default_signer_address()) - .with_to(self.config.builder_helper_address) - .with_gas_limit(1_000_000); + /// Prepares the transaction by extracting the signature components, creating the transaction + /// request, and simulating the transaction with a call to the host provider. + async fn prepare_tx( + &self, + retry_count: usize, + resp: &SignResponse, + block: &BuiltBlock, + ) -> Result { + // Create the transaction request with the signature values + let tx: TransactionRequest = self.new_tx_request(retry_count, resp, block).await?; + + // Simulate the transaction with a call to the host provider and report any errors + if let Err(err) = self.sim_with_call(&tx).await { + warn!(%err, "error in transaction simulation"); + } + + Ok(tx) + } + /// Simulates the transaction with a call to the host provider to check for reverts. + async fn sim_with_call(&self, tx: &TransactionRequest) -> eyre::Result<()> { if let Err(TransportError::ErrorResp(e)) = self.provider().call(tx.clone()).block(BlockNumberOrTag::Pending.into()).await { - error!( - code = e.code, - message = %e.message, - data = ?e.data, - "error in transaction submission" - ); - + // NB: These errors are all handled the same way but are logged for debugging purposes if e.as_revert_data() .map(|data| data.starts_with(&IncorrectHostBlock::SELECTOR)) .unwrap_or_default() { - return Ok(ControlFlow::Retry); + debug!(%e, "incorrect host block"); + bail!(e) } - return Ok(ControlFlow::Skip); + if e.as_revert_data() + .map(|data| data.starts_with(&Zenith::BadSignature::SELECTOR)) + .unwrap_or_default() + { + debug!(%e, "bad signature"); + bail!(e) + } + + if e.as_revert_data() + .map(|data| data.starts_with(&Zenith::OneRollupBlockPerHostBlock::SELECTOR)) + .unwrap_or_default() + { + debug!(%e, "one rollup block per host block"); + bail!(e) + } + + error!( + code = e.code, + message = %e.message, + data = ?e.data, + "unknown error in host transaction simulation call" + ); + bail!(e) } - // All validation checks have passed, send the transaction - self.send_transaction(resp, tx).await + Ok(()) } + /// Creates a transaction request for the blob with the given header and signature values. + async fn new_tx_request( + &self, + retry_count: usize, + resp: &SignResponse, + block: &BuiltBlock, + ) -> Result { + // TODO: ENG-1082 Implement fills + let fills = vec![]; + + // manually retrieve nonce + let nonce = + self.provider().get_transaction_count(self.provider().default_signer_address()).await?; + debug!(nonce, "assigned nonce"); + + // Extract the signature components from the response + let (v, r, s) = extract_signature_components(&resp.sig); + + // Calculate gas limits based on retry attempts + let (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) = + calculate_gas_limits( + retry_count, + BASE_FEE_PER_GAS, + BASE_MAX_PRIORITY_FEE_PER_GAS, + BASE_MAX_FEE_PER_BLOB_GAS, + ); + + // Build the block header + let header: BlockHeader = BlockHeader { + hostBlockNumber: resp.req.host_block_number, + rollupChainId: U256::from(self.config.ru_chain_id), + gasLimit: resp.req.gas_limit, + rewardAddress: resp.req.ru_reward_address, + blockDataHash: *block.contents_hash(), + }; + debug!(?header, "built block header"); + + // Create a blob transaction with the blob header and signature values and return it + let tx = self + .build_blob_tx(fills, header, v, r, s, block)? + .with_to(self.config.builder_helper_address) + .with_max_fee_per_gas(max_fee_per_gas) + .with_max_priority_fee_per_gas(max_priority_fee_per_gas) + .with_max_fee_per_blob_gas(max_fee_per_blob_gas) + .with_nonce(nonce); + + Ok(tx) + } + + /// Fills the transaction request with the provider and sends it to the network + /// and any additionally configured broadcast providers. async fn send_transaction( &self, resp: &SignResponse, tx: TransactionRequest, ) -> Result { - debug!( - host_block_number = %resp.req.host_block_number, - gas_limit = %resp.req.gas_limit, - "sending transaction to network" - ); - + // assign the nonce and fill the rest of the values let SendableTx::Envelope(tx) = self.provider().fill(tx).await? else { bail!("failed to fill transaction") }; + debug!(tx_hash = ?tx.hash(), host_block_number = %resp.req.host_block_number, "sending transaction to network"); - // Send the tx via the primary host_provider + // send the tx via the primary host_provider let fut = spawn_provider_send!(self.provider(), &tx); - // Spawn send_tx futures for all additional broadcast host_providers + // spawn send_tx futures on retry attempts for all additional broadcast host_providers for host_provider in self.config.connect_additional_broadcast() { spawn_provider_send!(&host_provider, &tx); } @@ -192,10 +260,15 @@ impl SubmitTask { error!("receipts task gone"); } - // question mark unwraps join error, which would be an internal panic - // then if let checks for rpc error if let Err(e) = fut.await? { - error!(error = %e, "Primary tx broadcast failed. Skipping transaction."); + // Detect and handle transaction underprice errors + if matches!(e, TransportError::ErrorResp(ref err) if err.code == -32603) { + debug!(tx_hash = ?tx.hash(), "underpriced transaction error - retrying tx with gas bump"); + return Ok(ControlFlow::Retry); + } + + // Unknown error, log and skip + error!(error = %e, "Primary tx broadcast failed"); return Ok(ControlFlow::Skip); } @@ -209,9 +282,14 @@ impl SubmitTask { Ok(ControlFlow::Done) } + /// Handles the inbound block by constructing a signature request and submitting the transaction. #[instrument(skip_all)] - async fn handle_inbound(&self, block: &BuiltBlock) -> eyre::Result { - info!(txns = block.tx_count(), "handling inbound block"); + async fn handle_inbound( + &self, + retry_count: usize, + block: &BuiltBlock, + ) -> eyre::Result { + info!(retry_count, txns = block.tx_count(), "handling inbound block"); let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| { error!(error = %e, "error constructing signature request"); }) else { @@ -226,9 +304,10 @@ impl SubmitTask { let signed = self.quincey.get_signature(&sig_request).await?; - self.submit_transaction(&signed, block).await + self.submit_transaction(retry_count, &signed, block).await } + /// Handles the retry logic for the inbound block. async fn retrying_handle_inbound( &self, block: &BuiltBlock, @@ -236,37 +315,54 @@ impl SubmitTask { ) -> eyre::Result { let mut retries = 0; let building_start_time = Instant::now(); + let (current_slot, start, end) = self.calculate_slot_window(); + debug!(current_slot, start, end, "calculating target slot window"); + // Retry loop let result = loop { + // Log the retry attempt let span = debug_span!("SubmitTask::retrying_handle_inbound", retries); - let result = - self.handle_inbound(block).instrument(span.clone()).await.inspect_err(|e| { - error!(error = %e, "error handling inbound block"); - })?; + let inbound_result = + match self.handle_inbound(retries, block).instrument(span.clone()).await { + Ok(control_flow) => control_flow, + Err(err) => { + // Delay until next slot if we get a 403 error + if err.to_string().contains("403 Forbidden") { + let (slot_number, _, _) = self.calculate_slot_window(); + debug!(slot_number, "403 detected - skipping slot"); + return Ok(ControlFlow::Skip); + } else { + error!(error = %err, "error handling inbound block"); + } + + ControlFlow::Retry + } + }; let guard = span.entered(); - match result { + match inbound_result { ControlFlow::Retry => { retries += 1; if retries > retry_limit { counter!("builder.building_too_many_retries").increment(1); + debug!("retries exceeded - skipping block"); return Ok(ControlFlow::Skip); } - error!("error handling inbound block: retrying"); drop(guard); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - + debug!(retries, start, end, "retrying block"); continue; } ControlFlow::Skip => { counter!("builder.skipped_blocks").increment(1); - break result; + debug!(retries, "skipping block"); + break inbound_result; } ControlFlow::Done => { counter!("builder.submitted_successful_blocks").increment(1); - break result; + debug!(retries, "successfully submitted block"); + break inbound_result; } } }; @@ -278,16 +374,55 @@ impl SubmitTask { Ok(result) } + /// Calculates and returns the slot number and its start and end timestamps for the current instant. + fn calculate_slot_window(&self) -> (u64, u64, u64) { + let now_ts = self.now(); + let current_slot = self.config.slot_calculator.calculate_slot(now_ts); + let (start, end) = self.config.slot_calculator.calculate_slot_window(current_slot); + (current_slot, start, end) + } + + /// Returns the current timestamp in seconds since the UNIX epoch. + fn now(&self) -> u64 { + let now = std::time::SystemTime::now(); + now.duration_since(UNIX_EPOCH).unwrap().as_secs() + } + + /// Returns the next host block height. + async fn next_host_block_height(&self) -> eyre::Result { + let block_num = self.provider().get_block_number().await?; + Ok(block_num + 1) + } + + /// Task future for the submit task + /// NB: This task assumes that the simulator will only send it blocks for + /// slots that it's assigned. async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { + // Holds a reference to the last block we attempted to submit + let mut last_block_attempted: u64 = 0; + loop { + // Wait to receive a new block let Some(block) = inbound.recv().await else { debug!("upstream task gone"); break; }; + debug!(block_number = block.block_number(), ?block, "submit channel received block"); - if self.retrying_handle_inbound(&block, 3).await.is_err() { + // Only attempt each block number once + if block.block_number() == last_block_attempted { + debug!("block number is unchanged from last attempt - skipping"); continue; } + + // This means we have encountered a new block, so reset the last block attempted + last_block_attempted = block.block_number(); + debug!(last_block_attempted, "resetting last block attempted"); + + if self.retrying_handle_inbound(&block, 3).await.is_err() { + debug!("error handling inbound block"); + continue; + }; } } @@ -299,3 +434,29 @@ impl SubmitTask { (sender, handle) } } + +// Returns gas parameters based on retry counts. +fn calculate_gas_limits( + retry_count: usize, + base_max_fee_per_gas: u128, + base_max_priority_fee_per_gas: u128, + base_max_fee_per_blob_gas: u128, +) -> (u128, u128, u128) { + let bump_multiplier = 1150u128.pow(retry_count as u32); // 15% bump + let blob_bump_multiplier = 2000u128.pow(retry_count as u32); // 100% bump (double each time) for blob gas + let bump_divisor = 1000u128.pow(retry_count as u32); + + let max_fee_per_gas = base_max_fee_per_gas * bump_multiplier / bump_divisor; + let max_priority_fee_per_gas = base_max_priority_fee_per_gas * bump_multiplier / bump_divisor; + let max_fee_per_blob_gas = base_max_fee_per_blob_gas * blob_bump_multiplier / bump_divisor; + + debug!( + retry_count, + max_fee_per_gas, + max_priority_fee_per_gas, + max_fee_per_blob_gas, + "calculated bumped gas parameters" + ); + + (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) +}