Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(constraints-api): don't read from db for checking proposer duties #30

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 89 additions & 21 deletions crates/api/src/constraints/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use axum::{
http::{Request, StatusCode},
Extension,
};
use ethereum_consensus::{deneb::Slot, ssz};
use ethereum_consensus::{deneb::Slot, phase0::mainnet::SLOTS_PER_EPOCH, ssz};
use helix_common::{
api::constraints_api::{
SignableBLS, SignedDelegation, SignedRevocation, DELEGATION_ACTION,
Expand All @@ -14,15 +14,22 @@ use helix_common::{
proofs::{ConstraintsMessage, SignedConstraints, SignedConstraintsWithProofData},
ConstraintSubmissionTrace, ConstraintsApiConfig,
};
use helix_database::DatabaseService;
use helix_datastore::Auctioneer;
use helix_housekeeper::{ChainUpdate, SlotUpdate};
use helix_utils::signing::{verify_signed_message, COMMIT_BOOST_DOMAIN};
use std::{
collections::HashSet,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::{sync::broadcast, time::Instant};
use tokio::{
sync::{
broadcast,
mpsc::{self, error::SendError, Sender},
RwLock,
},
time::Instant,
};
use tracing::{error, info, trace, warn};
use uuid::Uuid;

Expand All @@ -34,14 +41,15 @@ use super::error::Conflict;
pub(crate) const MAX_REQUEST_LENGTH: usize = 1024 * 1024 * 5;

#[derive(Clone)]
pub struct ConstraintsApi<A, DB>
pub struct ConstraintsApi<A>
where
A: Auctioneer + 'static,
DB: DatabaseService + 'static,
{
auctioneer: Arc<A>,
db: Arc<DB>,
chain_info: Arc<ChainInfo>,
/// Information about the current head slot and next proposer duty
curr_slot_info: Arc<RwLock<SlotUpdate>>,

constraints_api_config: Arc<ConstraintsApiConfig>,

constraints_handle: ConstraintsHandle,
Expand All @@ -60,26 +68,44 @@ impl ConstraintsHandle {
}
}

impl<A, DB> ConstraintsApi<A, DB>
impl<A> ConstraintsApi<A>
where
A: Auctioneer + 'static,
DB: DatabaseService + 'static,
{
pub fn new(
auctioneer: Arc<A>,
db: Arc<DB>,
chain_info: Arc<ChainInfo>,
slot_update_subscription: Sender<Sender<ChainUpdate>>,
constraints_handle: ConstraintsHandle,
constraints_api_config: Arc<ConstraintsApiConfig>,
) -> Self {
Self { auctioneer, db, chain_info, constraints_handle, constraints_api_config }
let api = Self {
auctioneer,
chain_info,
curr_slot_info: Arc::new(RwLock::new(Default::default())),
constraints_handle,
constraints_api_config,
};

// Spin up the housekeep task
let api_clone = api.clone();
tokio::spawn(async move {
if let Err(err) = api_clone.housekeep(slot_update_subscription).await {
error!(
error = %err,
"ConstraintsApi. housekeep task encountered an error",
);
}
});

api
}

/// Handles the submission of batch of signed constraints.
///
/// Implements this API: <https://docs.boltprotocol.xyz/technical-docs/api/builder#constraints>
pub async fn submit_constraints(
Extension(api): Extension<Arc<ConstraintsApi<A, DB>>>,
Extension(api): Extension<Arc<ConstraintsApi<A>>>,
req: Request<Body>,
) -> Result<StatusCode, ConstraintsApiError> {
let request_id = Uuid::new_v4();
Expand Down Expand Up @@ -108,14 +134,18 @@ where
return Err(ConstraintsApiError::InvalidConstraints)
}

// PERF: can we avoid calling the db?
let maybe_validator_pubkey = api.db.get_proposer_duties().await?.iter().find_map(|d| {
if d.slot == first_constraints.slot {
Some(d.entry.registration.message.public_key.clone())
let maybe_validator_pubkey =
if let Some(duties) = api.curr_slot_info.read().await.new_duties.as_ref() {
thedevbirb marked this conversation as resolved.
Show resolved Hide resolved
duties.iter().find_map(|d| {
if d.slot == first_constraints.slot {
Some(d.entry.registration.message.public_key.clone())
} else {
None
}
})
} else {
None
}
});
};

let Some(validator_pubkey) = maybe_validator_pubkey else {
error!(request_id = %request_id, slot = first_constraints.slot, "Missing proposer info");
Expand Down Expand Up @@ -200,7 +230,7 @@ where
///
/// Implements this API: <https://docs.boltprotocol.xyz/technical-docs/api/builder#delegate>
pub async fn delegate(
Extension(api): Extension<Arc<ConstraintsApi<A, DB>>>,
Extension(api): Extension<Arc<ConstraintsApi<A>>>,
req: Request<Body>,
) -> Result<StatusCode, ConstraintsApiError> {
let request_id = Uuid::new_v4();
Expand Down Expand Up @@ -274,7 +304,7 @@ where
///
/// Implements this API: <https://docs.boltprotocol.xyz/technical-docs/api/builder#revoke>
pub async fn revoke(
Extension(api): Extension<Arc<ConstraintsApi<A, DB>>>,
Extension(api): Extension<Arc<ConstraintsApi<A>>>,
req: Request<Body>,
) -> Result<StatusCode, ConstraintsApiError> {
let request_id = Uuid::new_v4();
Expand Down Expand Up @@ -347,10 +377,9 @@ where
}

// Helpers
impl<A, DB> ConstraintsApi<A, DB>
impl<A> ConstraintsApi<A>
where
A: Auctioneer + 'static,
DB: DatabaseService + 'static,
{
async fn save_constraints_to_auctioneer(
&self,
Expand Down Expand Up @@ -379,6 +408,45 @@ where
}
}

// STATE SYNC
impl<A> ConstraintsApi<A>
where
A: Auctioneer + 'static,
{
/// Subscribes to slot head updater.
/// Updates the current slot and next proposer duty.
pub async fn housekeep(
&self,
slot_update_subscription: Sender<Sender<ChainUpdate>>,
) -> Result<(), SendError<Sender<ChainUpdate>>> {
let (tx, mut rx) = mpsc::channel(20);
slot_update_subscription.send(tx).await?;

while let Some(slot_update) = rx.recv().await {
if let ChainUpdate::SlotUpdate(slot_update) = slot_update {
self.handle_new_slot(slot_update).await;
}
}

Ok(())
}

/// Handle a new slot update.
/// Updates the next proposer duty for the new slot.
async fn handle_new_slot(&self, slot_update: SlotUpdate) {
let epoch = slot_update.slot / self.chain_info.seconds_per_slot;
info!(
epoch = epoch,
slot = slot_update.slot,
slot_start_next_epoch = (epoch + 1) * SLOTS_PER_EPOCH,
next_proposer_duty = ?slot_update.next_duty,
"ConstraintsApi - housekeep: Updated head slot",
);

*self.curr_slot_info.write().await = slot_update
}
}

/// Checks if the constraints for the given slot conflict with the existing constraints.
/// Returns a [Conflict] in case of a conflict, None otherwise.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/constraints/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn send_request(req_url: &str, encoding: Encoding, req_payload: Vec<u8>) -
async fn start_api_server() -> (
oneshot::Sender<()>,
HttpServiceConfig,
Arc<ConstraintsApi<MockAuctioneer, MockDatabaseService>>,
Arc<ConstraintsApi<MockAuctioneer>>,
Arc<BuilderApi<MockAuctioneer, MockDatabaseService, MockSimulator, MockGossiper>>,
Receiver<Sender<ChainUpdate>>,
) {
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub type ProposerApiProd = ProposerApi<

pub type DataApiProd = DataApi<PostgresDatabaseService>;

pub type ConstraintsApiProd = ConstraintsApi<RedisCache, PostgresDatabaseService>;
pub type ConstraintsApiProd = ConstraintsApi<RedisCache>;

pub fn build_router(
router_config: &mut RouterConfig,
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ impl ApiService {

let constraints_api = Arc::new(ConstraintsApiProd::new(
auctioneer.clone(),
db.clone(),
chain_info.clone(),
slot_update_sender.clone(),
constraints_handle,
constraints_api_config,
));
Expand Down
23 changes: 11 additions & 12 deletions crates/api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ pub fn data_api_app() -> (Router, Arc<DataApi<MockDatabaseService>>, Arc<MockDat
#[allow(clippy::type_complexity)]
pub fn constraints_api_app() -> (
Router,
Arc<ConstraintsApi<MockAuctioneer, MockDatabaseService>>,
Arc<ConstraintsApi<MockAuctioneer>>,
Arc<BuilderApi<MockAuctioneer, MockDatabaseService, MockSimulator, MockGossiper>>,
Receiver<Sender<ChainUpdate>>,
) {
Expand All @@ -322,14 +322,13 @@ pub fn constraints_api_app() -> (
);
let builder_api_service = Arc::new(builder_api_service);

let constraints_api_service =
Arc::new(ConstraintsApi::<MockAuctioneer, MockDatabaseService>::new(
auctioneer.clone(),
database.clone(),
Arc::new(ChainInfo::for_mainnet()),
handler,
Arc::new(ConstraintsApiConfig::default()),
));
let constraints_api_service = Arc::new(ConstraintsApi::<MockAuctioneer>::new(
auctioneer.clone(),
Arc::new(ChainInfo::for_mainnet()),
slot_update_sender,
handler,
Arc::new(ConstraintsApiConfig::default()),
));

let router = Router::new()
.route(
Expand Down Expand Up @@ -358,15 +357,15 @@ pub fn constraints_api_app() -> (
)
.route(
&Route::SubmitBuilderConstraints.path(),
post(ConstraintsApi::<MockAuctioneer, MockDatabaseService>::submit_constraints),
post(ConstraintsApi::<MockAuctioneer>::submit_constraints),
)
.route(
&Route::DelegateSubmissionRights.path(),
post(ConstraintsApi::<MockAuctioneer, MockDatabaseService>::delegate),
post(ConstraintsApi::<MockAuctioneer>::delegate),
)
.route(
&Route::RevokeSubmissionRights.path(),
post(ConstraintsApi::<MockAuctioneer, MockDatabaseService>::revoke),
post(ConstraintsApi::<MockAuctioneer>::revoke),
)
.layer(RequestBodyLimitLayer::new(MAX_PAYLOAD_LENGTH))
.layer(Extension(builder_api_service.clone()))
Expand Down
8 changes: 4 additions & 4 deletions crates/common/src/api/constraints_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl SignableBLS for DelegationMessage {
fn digest(&self) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update([self.action]);
hasher.update(&self.validator_pubkey.to_vec());
hasher.update(&self.delegatee_pubkey.to_vec());
hasher.update(self.validator_pubkey.as_slice());
hasher.update(self.delegatee_pubkey.as_slice());

hasher.finalize().into()
}
Expand All @@ -55,8 +55,8 @@ impl SignableBLS for RevocationMessage {
fn digest(&self) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update([self.action]);
hasher.update(&self.validator_pubkey.to_vec());
hasher.update(&self.delegatee_pubkey.to_vec());
hasher.update(self.validator_pubkey.as_slice());
hasher.update(self.delegatee_pubkey.as_slice());

hasher.finalize().into()
}
Expand Down
10 changes: 6 additions & 4 deletions crates/common/src/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ethereum_consensus::{
primitives::{BlsPublicKey, BlsSignature},
ssz::prelude::*,
};
use reth_primitives::{PooledTransactionsElement, TxHash, B256};
use reth_primitives::{Bytes, PooledTransactionsElement, TxHash, B256};
use sha2::{Digest, Sha256};
use tree_hash::Hash256;

Expand Down Expand Up @@ -90,9 +90,11 @@ impl TryFrom<SignedConstraints> for SignedConstraintsWithProofData {

fn try_from(value: SignedConstraints) -> Result<Self, ProofError> {
let mut transactions = Vec::with_capacity(value.message.transactions.len());
for transaction in value.message.transactions.to_vec().iter() {
let tx = PooledTransactionsElement::decode_enveloped(transaction.to_vec().into())
.map_err(|e| ProofError::DecodingFailed(e.to_string()))?;
for transaction in value.message.transactions.iter() {
let tx = PooledTransactionsElement::decode_enveloped(Bytes::copy_from_slice(
transaction.as_slice(),
))
.map_err(|e| ProofError::DecodingFailed(e.to_string()))?;

let tx_hash = *tx.hash();

Expand Down
2 changes: 1 addition & 1 deletion crates/housekeeper/src/chain_event_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct PayloadAttributesUpdate {
}

/// Payload for head event updates sent to subscribers.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct SlotUpdate {
pub slot: u64,
pub next_duty: Option<BuilderGetValidatorsResponseEntry>,
Expand Down