Skip to content

feat(pbs): add retry limit for validator registration (#316) #322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use crate::{
},
pbs::{
BuilderEventPublisher, DefaultTimeout, RelayClient, RelayEntry, DEFAULT_PBS_PORT,
LATE_IN_SLOT_TIME_MS,
LATE_IN_SLOT_TIME_MS, REGISTER_VALIDATOR_RETRY_LIMIT,
},
types::{Chain, Jwt, ModuleId},
utils::{
as_eth_str, default_bool, default_host, default_u16, default_u256, default_u64, WEI_PER_ETH,
as_eth_str, default_bool, default_host, default_u16, default_u256, default_u32,
default_u64, WEI_PER_ETH,
},
};

Expand Down Expand Up @@ -122,6 +123,9 @@ pub struct PbsConfig {
pub extra_validation_enabled: bool,
/// Execution Layer RPC url to use for extra validation
pub rpc_url: Option<Url>,
/// Maximum number of retries for validator registration request per relay
#[serde(default = "default_u32::<REGISTER_VALIDATOR_RETRY_LIMIT>")]
pub register_validator_retry_limit: u32,
}

impl PbsConfig {
Expand All @@ -140,6 +144,10 @@ impl PbsConfig {
self.timeout_get_header_ms < self.late_in_slot_time_ms,
"timeout_get_header_ms must be less than late_in_slot_time_ms"
);
ensure!(
self.register_validator_retry_limit > 0,
"register_validator_retry_limit must be greater than 0"
);

ensure!(
self.min_bid_wei < U256::from(WEI_PER_ETH),
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/pbs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ impl DefaultTimeout {
}

pub const LATE_IN_SLOT_TIME_MS: u64 = 2000;

// Maximum number of retries for validator registration request per relay
pub const REGISTER_VALIDATOR_RETRY_LIMIT: u32 = 3;
13 changes: 12 additions & 1 deletion crates/common/src/pbs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,18 @@ impl PbsError {

/// Whether the error is retryable in requests to relays
pub fn should_retry(&self) -> bool {
matches!(self, PbsError::RelayResponse { .. } | PbsError::Reqwest { .. })
match self {
PbsError::Reqwest(err) => {
// Retry on timeout or connection error
err.is_timeout() || err.is_connect()
}
PbsError::RelayResponse { code, .. } => match *code {
500..509 => true, // Retry on server errors
400 | 429 => false, // Do not retry if rate limited or bad request
_ => false,
},
_ => false,
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ pub const fn default_u64<const U: u64>() -> u64 {
U
}

pub const fn default_u32<const U: u32>() -> u32 {
U
}

pub const fn default_u16<const U: u16>() -> u16 {
U
}
Expand Down
13 changes: 11 additions & 2 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn register_validator<S: BuilderApiState>(
relay.clone(),
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
)
.in_current_span(),
));
Expand All @@ -54,6 +55,7 @@ pub async fn register_validator<S: BuilderApiState>(
relay.clone(),
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
)
.in_current_span(),
));
Expand Down Expand Up @@ -85,6 +87,7 @@ async fn send_register_validator_with_timeout(
relay: RelayClient,
headers: HeaderMap,
timeout_ms: u64,
retry_limit: u32,
) -> Result<(), PbsError> {
let url = relay.register_validator_url()?;
let mut remaining_timeout_ms = timeout_ms;
Expand All @@ -106,6 +109,14 @@ async fn send_register_validator_with_timeout(
Ok(_) => return Ok(()),

Err(err) if err.should_retry() => {
retry += 1;
if retry >= retry_limit {
error!(
relay_id = relay.id.as_str(),
retry, "reached retry limit for validator registration"
);
return Err(err);
}
tokio::time::sleep(backoff).await;
backoff += Duration::from_millis(250);

Expand All @@ -119,8 +130,6 @@ async fn send_register_validator_with_timeout(

Err(err) => return Err(err),
};

retry += 1;
}
}

Expand Down
14 changes: 12 additions & 2 deletions tests/src/mock_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, RwLock,
},
};

Expand Down Expand Up @@ -48,6 +48,7 @@ pub struct MockRelayState {
received_get_status: Arc<AtomicU64>,
received_register_validator: Arc<AtomicU64>,
received_submit_block: Arc<AtomicU64>,
response_override: RwLock<Option<StatusCode>>,
}

impl MockRelayState {
Expand All @@ -66,6 +67,9 @@ impl MockRelayState {
pub fn large_body(&self) -> bool {
self.large_body
}
pub fn set_response_override(&self, status: StatusCode) {
*self.response_override.write().unwrap() = Some(status);
}
}

impl MockRelayState {
Expand All @@ -78,6 +82,7 @@ impl MockRelayState {
received_get_status: Default::default(),
received_register_validator: Default::default(),
received_submit_block: Default::default(),
response_override: RwLock::new(None),
}
}

Expand Down Expand Up @@ -130,7 +135,12 @@ async fn handle_register_validator(
) -> impl IntoResponse {
state.received_register_validator.fetch_add(1, Ordering::Relaxed);
debug!("Received {} registrations", validators.len());
StatusCode::OK

if let Some(status) = state.response_override.read().unwrap().as_ref() {
return (*status).into_response();
}

StatusCode::OK.into_response()
}

async fn handle_submit_block(State(state): State<Arc<MockRelayState>>) -> Response {
Expand Down
1 change: 1 addition & 0 deletions tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig {
late_in_slot_time_ms: u64::MAX,
extra_validation_enabled: false,
rpc_url: None,
register_validator_retry_limit: u32::MAX,
}
}

Expand Down
115 changes: 109 additions & 6 deletions tests/tests/pbs_post_validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn test_register_validators() -> Result<()> {
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
Expand Down Expand Up @@ -93,7 +93,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul
"message": {
"fee_recipient": "0xaa",
"gas_limit": "100000",
"timestamp": "1000000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
Expand All @@ -115,7 +115,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"timestamp": "1000000",
"pubkey": "0xbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
Expand All @@ -137,7 +137,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccc"
Expand All @@ -159,7 +159,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "10000000000000000000000000000000000000000000000000000000",
"timestamp": "1000000",
"timestamp": "1000000",
"pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
},
"signature": "0xcccc"
Expand All @@ -181,7 +181,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "1000000",
"timestamp": "10000000000000000000000000000000000000000000000000000000",
"timestamp": "10000000000000000000000000000000000000000000000000000000",
"pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
},
"signature": "0xcccc"
Expand All @@ -201,3 +201,106 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul
assert_eq!(mock_state.received_register_validator(), 0);
Ok(())
}

#[tokio::test]
async fn test_register_validators_does_not_retry_on_429() -> Result<()> {
setup_test_env();
let signer = random_secret();
let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();

let chain = Chain::Holesky;
let pbs_port = 4200;

// Set up mock relay state and override response to 429
let mock_state = Arc::new(MockRelayState::new(chain, signer));
mock_state.set_response_override(StatusCode::TOO_MANY_REQUESTS);

// Run a mock relay
let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?];
tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1));

// Run the PBS service
let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays);
let state = PbsState::new(config);
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state.clone()));

// Leave some time to start servers
tokio::time::sleep(Duration::from_millis(100)).await;

let mock_validator = MockValidator::new(pbs_port)?;
info!("Sending register validator to test 429 response");

let registration: ValidatorRegistration = serde_json::from_str(
r#"{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
}"#,
)?;

let registrations = vec![registration];
let res = mock_validator.do_register_custom_validators(registrations).await?;

// Should only be called once (no retry)
assert_eq!(mock_state.received_register_validator(), 1);
// Expected to return 429 status code
// But it returns `No relay passed register_validator successfully` with 502
// status code
assert_eq!(res.status(), StatusCode::BAD_GATEWAY);

Ok(())
}

#[tokio::test]
async fn test_register_validators_retries_on_500() -> Result<()> {
setup_test_env();
let signer = random_secret();
let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();

let chain = Chain::Holesky;
let pbs_port = 4300;

// Set up internal mock relay with 500 response override
let mock_state = Arc::new(MockRelayState::new(chain, signer));
mock_state.set_response_override(StatusCode::INTERNAL_SERVER_ERROR); // 500

let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?];
tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1));

// Set retry limit to 3
let mut pbs_config = get_pbs_static_config(pbs_port);
pbs_config.register_validator_retry_limit = 3;

let config = to_pbs_config(chain, pbs_config, relays);
let state = PbsState::new(config);
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state.clone()));

tokio::time::sleep(Duration::from_millis(100)).await;

let mock_validator = MockValidator::new(pbs_port)?;
info!("Sending register validator to test retry on 500");

let registration: ValidatorRegistration = serde_json::from_str(
r#"{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
}"#,
)?;

let registrations = vec![registration];
let _ = mock_validator.do_register_custom_validators(registrations).await;

// Should retry 3 times (0, 1, 2) → total 3 calls
assert_eq!(mock_state.received_register_validator(), 3);

Ok(())
}