From f81a92ee390b524eb4893761c8bafd433e6d1b7a Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Wed, 27 Nov 2024 15:35:05 +0100 Subject: [PATCH] fix(constraints-api): don't read from db for checking proposer duties, use auctioneer instead --- crates/api/src/constraints/api.rs | 110 ++++++++++++++---- crates/api/src/constraints/tests.rs | 2 +- crates/api/src/router.rs | 2 +- crates/api/src/service.rs | 2 +- crates/api/src/test_utils.rs | 23 ++-- crates/common/src/api/constraints_api.rs | 8 +- crates/common/src/proofs.rs | 10 +- crates/housekeeper/src/chain_event_updater.rs | 2 +- 8 files changed, 114 insertions(+), 45 deletions(-) diff --git a/crates/api/src/constraints/api.rs b/crates/api/src/constraints/api.rs index ba42bef..32522ab 100644 --- a/crates/api/src/constraints/api.rs +++ b/crates/api/src/constraints/api.rs @@ -3,7 +3,7 @@ use axum::{ http::{Request, StatusCode}, Extension, }; -use ethereum_consensus::{deneb::Slot, ssz}; +use ethereum_consensus::{deneb::Slot, phase0::mainnet::SLOTS_PER_EPOCH, ssz}; use helix_common::{ api::constraints_api::{ SignableBLS, SignedDelegation, SignedRevocation, DELEGATION_ACTION, @@ -14,15 +14,22 @@ use helix_common::{ proofs::{ConstraintsMessage, SignedConstraints, SignedConstraintsWithProofData}, ConstraintSubmissionTrace, ConstraintsApiConfig, }; -use helix_database::DatabaseService; use helix_datastore::Auctioneer; +use helix_housekeeper::{ChainUpdate, SlotUpdate}; use helix_utils::signing::{verify_signed_message, COMMIT_BOOST_DOMAIN}; use std::{ collections::HashSet, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; -use tokio::{sync::broadcast, time::Instant}; +use tokio::{ + sync::{ + broadcast, + mpsc::{self, error::SendError, Sender}, + RwLock, + }, + time::Instant, +}; use tracing::{error, info, trace, warn}; use uuid::Uuid; @@ -34,14 +41,15 @@ use super::error::Conflict; pub(crate) const MAX_REQUEST_LENGTH: usize = 1024 * 1024 * 5; #[derive(Clone)] -pub struct ConstraintsApi +pub struct ConstraintsApi where A: Auctioneer + 'static, - DB: DatabaseService + 'static, { auctioneer: Arc, - db: Arc, chain_info: Arc, + /// Information about the current head slot and next proposer duty + curr_slot_info: Arc>, + constraints_api_config: Arc, constraints_handle: ConstraintsHandle, @@ -60,26 +68,44 @@ impl ConstraintsHandle { } } -impl ConstraintsApi +impl ConstraintsApi where A: Auctioneer + 'static, - DB: DatabaseService + 'static, { pub fn new( auctioneer: Arc, - db: Arc, chain_info: Arc, + slot_update_subscription: Sender>, constraints_handle: ConstraintsHandle, constraints_api_config: Arc, ) -> Self { - Self { auctioneer, db, chain_info, constraints_handle, constraints_api_config } + let api = Self { + auctioneer, + chain_info, + curr_slot_info: Arc::new(RwLock::new(Default::default())), + constraints_handle, + constraints_api_config, + }; + + // Spin up the housekeep task + let api_clone = api.clone(); + tokio::spawn(async move { + if let Err(err) = api_clone.housekeep(slot_update_subscription).await { + error!( + error = %err, + "ConstraintsApi. housekeep task encountered an error", + ); + } + }); + + api } /// Handles the submission of batch of signed constraints. /// /// Implements this API: pub async fn submit_constraints( - Extension(api): Extension>>, + Extension(api): Extension>>, req: Request, ) -> Result { let request_id = Uuid::new_v4(); @@ -108,14 +134,18 @@ where return Err(ConstraintsApiError::InvalidConstraints) } - // PERF: can we avoid calling the db? - let maybe_validator_pubkey = api.db.get_proposer_duties().await?.iter().find_map(|d| { - if d.slot == first_constraints.slot { - Some(d.entry.registration.message.public_key.clone()) + let maybe_validator_pubkey = + if let Some(duties) = api.curr_slot_info.read().await.new_duties.as_ref() { + duties.iter().find_map(|d| { + if d.slot == first_constraints.slot { + Some(d.entry.registration.message.public_key.clone()) + } else { + None + } + }) } else { None - } - }); + }; let Some(validator_pubkey) = maybe_validator_pubkey else { error!(request_id = %request_id, slot = first_constraints.slot, "Missing proposer info"); @@ -200,7 +230,7 @@ where /// /// Implements this API: pub async fn delegate( - Extension(api): Extension>>, + Extension(api): Extension>>, req: Request, ) -> Result { let request_id = Uuid::new_v4(); @@ -274,7 +304,7 @@ where /// /// Implements this API: pub async fn revoke( - Extension(api): Extension>>, + Extension(api): Extension>>, req: Request, ) -> Result { let request_id = Uuid::new_v4(); @@ -347,10 +377,9 @@ where } // Helpers -impl ConstraintsApi +impl ConstraintsApi where A: Auctioneer + 'static, - DB: DatabaseService + 'static, { async fn save_constraints_to_auctioneer( &self, @@ -379,6 +408,45 @@ where } } +// STATE SYNC +impl ConstraintsApi +where + A: Auctioneer + 'static, +{ + /// Subscribes to slot head updater. + /// Updates the current slot and next proposer duty. + pub async fn housekeep( + &self, + slot_update_subscription: Sender>, + ) -> Result<(), SendError>> { + let (tx, mut rx) = mpsc::channel(20); + slot_update_subscription.send(tx).await?; + + while let Some(slot_update) = rx.recv().await { + if let ChainUpdate::SlotUpdate(slot_update) = slot_update { + self.handle_new_slot(slot_update).await; + } + } + + Ok(()) + } + + /// Handle a new slot update. + /// Updates the next proposer duty for the new slot. + async fn handle_new_slot(&self, slot_update: SlotUpdate) { + let epoch = slot_update.slot / self.chain_info.seconds_per_slot; + info!( + epoch = epoch, + slot = slot_update.slot, + slot_start_next_epoch = (epoch + 1) * SLOTS_PER_EPOCH, + next_proposer_duty = ?slot_update.next_duty, + "ConstraintsApi - housekeep: Updated head slot", + ); + + *self.curr_slot_info.write().await = slot_update + } +} + /// Checks if the constraints for the given slot conflict with the existing constraints. /// Returns a [Conflict] in case of a conflict, None otherwise. /// diff --git a/crates/api/src/constraints/tests.rs b/crates/api/src/constraints/tests.rs index 1516c0e..3f7f2bb 100644 --- a/crates/api/src/constraints/tests.rs +++ b/crates/api/src/constraints/tests.rs @@ -152,7 +152,7 @@ async fn send_request(req_url: &str, encoding: Encoding, req_payload: Vec) - async fn start_api_server() -> ( oneshot::Sender<()>, HttpServiceConfig, - Arc>, + Arc>, Arc>, Receiver>, ) { diff --git a/crates/api/src/router.rs b/crates/api/src/router.rs index a245009..6cf8f9a 100644 --- a/crates/api/src/router.rs +++ b/crates/api/src/router.rs @@ -46,7 +46,7 @@ pub type ProposerApiProd = ProposerApi< pub type DataApiProd = DataApi; -pub type ConstraintsApiProd = ConstraintsApi; +pub type ConstraintsApiProd = ConstraintsApi; pub fn build_router( router_config: &mut RouterConfig, diff --git a/crates/api/src/service.rs b/crates/api/src/service.rs index 7dffc27..fc5fbcb 100644 --- a/crates/api/src/service.rs +++ b/crates/api/src/service.rs @@ -190,8 +190,8 @@ impl ApiService { let constraints_api = Arc::new(ConstraintsApiProd::new( auctioneer.clone(), - db.clone(), chain_info.clone(), + slot_update_sender.clone(), constraints_handle, constraints_api_config, )); diff --git a/crates/api/src/test_utils.rs b/crates/api/src/test_utils.rs index fac23de..923e3b0 100644 --- a/crates/api/src/test_utils.rs +++ b/crates/api/src/test_utils.rs @@ -298,7 +298,7 @@ pub fn data_api_app() -> (Router, Arc>, Arc ( Router, - Arc>, + Arc>, Arc>, Receiver>, ) { @@ -322,14 +322,13 @@ pub fn constraints_api_app() -> ( ); let builder_api_service = Arc::new(builder_api_service); - let constraints_api_service = - Arc::new(ConstraintsApi::::new( - auctioneer.clone(), - database.clone(), - Arc::new(ChainInfo::for_mainnet()), - handler, - Arc::new(ConstraintsApiConfig::default()), - )); + let constraints_api_service = Arc::new(ConstraintsApi::::new( + auctioneer.clone(), + Arc::new(ChainInfo::for_mainnet()), + slot_update_sender, + handler, + Arc::new(ConstraintsApiConfig::default()), + )); let router = Router::new() .route( @@ -358,15 +357,15 @@ pub fn constraints_api_app() -> ( ) .route( &Route::SubmitBuilderConstraints.path(), - post(ConstraintsApi::::submit_constraints), + post(ConstraintsApi::::submit_constraints), ) .route( &Route::DelegateSubmissionRights.path(), - post(ConstraintsApi::::delegate), + post(ConstraintsApi::::delegate), ) .route( &Route::RevokeSubmissionRights.path(), - post(ConstraintsApi::::revoke), + post(ConstraintsApi::::revoke), ) .layer(RequestBodyLimitLayer::new(MAX_PAYLOAD_LENGTH)) .layer(Extension(builder_api_service.clone())) diff --git a/crates/common/src/api/constraints_api.rs b/crates/common/src/api/constraints_api.rs index c244847..280c51f 100644 --- a/crates/common/src/api/constraints_api.rs +++ b/crates/common/src/api/constraints_api.rs @@ -31,8 +31,8 @@ impl SignableBLS for DelegationMessage { fn digest(&self) -> [u8; 32] { let mut hasher = Sha256::new(); hasher.update([self.action]); - hasher.update(&self.validator_pubkey.to_vec()); - hasher.update(&self.delegatee_pubkey.to_vec()); + hasher.update(self.validator_pubkey.as_slice()); + hasher.update(self.delegatee_pubkey.as_slice()); hasher.finalize().into() } @@ -55,8 +55,8 @@ impl SignableBLS for RevocationMessage { fn digest(&self) -> [u8; 32] { let mut hasher = Sha256::new(); hasher.update([self.action]); - hasher.update(&self.validator_pubkey.to_vec()); - hasher.update(&self.delegatee_pubkey.to_vec()); + hasher.update(self.validator_pubkey.as_slice()); + hasher.update(self.delegatee_pubkey.as_slice()); hasher.finalize().into() } diff --git a/crates/common/src/proofs.rs b/crates/common/src/proofs.rs index 33a52ac..7388cc4 100644 --- a/crates/common/src/proofs.rs +++ b/crates/common/src/proofs.rs @@ -5,7 +5,7 @@ use ethereum_consensus::{ primitives::{BlsPublicKey, BlsSignature}, ssz::prelude::*, }; -use reth_primitives::{PooledTransactionsElement, TxHash, B256}; +use reth_primitives::{Bytes, PooledTransactionsElement, TxHash, B256}; use sha2::{Digest, Sha256}; use tree_hash::Hash256; @@ -90,9 +90,11 @@ impl TryFrom for SignedConstraintsWithProofData { fn try_from(value: SignedConstraints) -> Result { let mut transactions = Vec::with_capacity(value.message.transactions.len()); - for transaction in value.message.transactions.to_vec().iter() { - let tx = PooledTransactionsElement::decode_enveloped(transaction.to_vec().into()) - .map_err(|e| ProofError::DecodingFailed(e.to_string()))?; + for transaction in value.message.transactions.iter() { + let tx = PooledTransactionsElement::decode_enveloped(Bytes::copy_from_slice( + transaction.as_slice(), + )) + .map_err(|e| ProofError::DecodingFailed(e.to_string()))?; let tx_hash = *tx.hash(); diff --git a/crates/housekeeper/src/chain_event_updater.rs b/crates/housekeeper/src/chain_event_updater.rs index 1df5fd0..8e1b29c 100644 --- a/crates/housekeeper/src/chain_event_updater.rs +++ b/crates/housekeeper/src/chain_event_updater.rs @@ -36,7 +36,7 @@ pub struct PayloadAttributesUpdate { } /// Payload for head event updates sent to subscribers. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct SlotUpdate { pub slot: u64, pub next_duty: Option,