Skip to content

Commit

Permalink
feat(starknet_sequencer_infra,starknet_sequencer_node): add max concu…
Browse files Browse the repository at this point in the history
…rrency limit to remote servers (#3951)

commit-id:52a0746a
  • Loading branch information
lev-starkware authored Feb 5, 2025
1 parent 424c405 commit b0e1ed9
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/starknet_sequencer_infra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ starknet_api.workspace = true
starknet_infra_utils.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tower = { workspace = true, features = ["limit"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
validator.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use hyper::{Body, Request as HyperRequest, Response as HyperResponse, Server, St
use serde::de::DeserializeOwned;
use serde::Serialize;
use starknet_infra_utils::type_name::short_type_name;
use tower::limit::ConcurrencyLimitLayer;
use tower::ServiceBuilder;
use tracing::warn;

use crate::component_client::{ClientError, LocalComponentClient};
Expand Down Expand Up @@ -88,10 +90,15 @@ use crate::serde_utils::SerdeWrapper;
/// // Set the ip address and port of the server's socket.
/// let ip_address = std::net::IpAddr::V6(std::net::Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
/// let port: u16 = 8080;
/// let max_concurrency = 10;
///
/// // Instantiate the server.
/// let mut server =
/// RemoteComponentServer::<MyRequest, MyResponse>::new(local_client, ip_address, port);
/// let mut server = RemoteComponentServer::<MyRequest, MyResponse>::new(
/// local_client,
/// ip_address,
/// port,
/// max_concurrency,
/// );
///
/// // Start the server in a new task.
/// task::spawn(async move {
Expand All @@ -106,6 +113,7 @@ where
{
socket: SocketAddr,
local_client: LocalComponentClient<Request, Response>,
max_concurrency: usize,
}

impl<Request, Response> RemoteComponentServer<Request, Response>
Expand All @@ -117,8 +125,9 @@ where
local_client: LocalComponentClient<Request, Response>,
ip: IpAddr,
port: u16,
max_concurrency: usize,
) -> Self {
Self { local_client, socket: SocketAddr::new(ip, port) }
Self { local_client, socket: SocketAddr::new(ip, port), max_concurrency }
}

async fn remote_component_server_handler(
Expand Down Expand Up @@ -173,14 +182,22 @@ where
async fn start(&mut self) -> Result<(), ComponentServerError> {
let make_svc = make_service_fn(|_conn| {
let local_client = self.local_client.clone();
async {
Ok::<_, hyper::Error>(service_fn(move |req| {
let max_concurrency = self.max_concurrency;
async move {
let app_service = service_fn(move |req| {
Self::remote_component_server_handler(req, local_client.clone())
}))
});

// Apply the ConcurrencyLimitLayer middleware
let service = ServiceBuilder::new()
.layer(ConcurrencyLimitLayer::new(max_concurrency))
.service(app_service);

Ok::<_, hyper::Error>(service)
}
});

Server::bind(&self.socket.clone())
Server::bind(&self.socket)
.serve(make_svc)
.await
.map_err(|err| ComponentServerError::HttpServerStartError(err.to_string()))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ async fn setup_concurrent_remote_test() -> RemoteConcurrentComponentClient {
let socket = AVAILABLE_PORTS.lock().await.get_next_local_host_socket();
let config = RemoteClientConfig::default();

let mut concurrent_remote_server =
RemoteComponentServer::new(local_client.clone(), socket.ip(), socket.port());
let max_concurrency = 10;
let mut concurrent_remote_server = RemoteComponentServer::new(
local_client.clone(),
socket.ip(),
socket.port(),
max_concurrency,
);
task::spawn(async move {
let _ = concurrent_remote_server.start().await;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ async fn setup_for_tests(setup_value: ValueB, a_socket: SocketAddr, b_socket: So
let mut component_b_local_server =
LocalComponentServer::new(component_b, rx_b, max_concurrency);

let max_concurrency = 10;
let mut component_a_remote_server =
RemoteComponentServer::new(a_local_client, a_socket.ip(), a_socket.port());
RemoteComponentServer::new(a_local_client, a_socket.ip(), a_socket.port(), max_concurrency);
let mut component_b_remote_server =
RemoteComponentServer::new(b_local_client, b_socket.ip(), b_socket.port());
RemoteComponentServer::new(b_local_client, b_socket.ip(), b_socket.port(), max_concurrency);

task::spawn(async move {
let _ = component_a_local_server.start().await;
Expand Down
40 changes: 30 additions & 10 deletions crates/starknet_sequencer_node/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub struct SequencerNodeServers {
/// initialization if needed.
/// * `$ip` - Remote component server binding address, default "0.0.0.0".
/// * `$port` - Remote component server listening port.
/// * `$max_concurrency` - the maximum number of concurrent connections the server will handle.
///
/// # Returns
///
Expand All @@ -112,7 +113,8 @@ pub struct SequencerNodeServers {
/// &config.components.batcher.execution_mode,
/// || {clients.get_gateway_local_client()},
/// config.components.batcher.ip,
/// config.components.batcher.port
/// config.components.batcher.port,
/// config.components.batcher.max_concurrency
/// );
/// match batcher_remote_server {
/// Some(server) => println!("Remote server created: {:?}", server),
Expand All @@ -121,13 +123,24 @@ pub struct SequencerNodeServers {
/// ```
#[macro_export]
macro_rules! create_remote_server {
($execution_mode:expr, $local_client_getter:expr, $url:expr, $port:expr) => {
(
$execution_mode:expr,
$local_client_getter:expr,
$url:expr,
$port:expr,
$max_concurrency:expr
) => {
match *$execution_mode {
ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
let local_client = $local_client_getter()
.expect("Local client should be set for inbound remote connections.");

Some(Box::new(RemoteComponentServer::new(local_client, $url, $port)))
Some(Box::new(RemoteComponentServer::new(
local_client,
$url,
$port,
$max_concurrency,
)))
}
ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ReactiveComponentExecutionMode::Remote
Expand Down Expand Up @@ -354,49 +367,56 @@ pub fn create_remote_servers(
&config.components.batcher.execution_mode,
|| { clients.get_batcher_local_client() },
config.components.batcher.ip,
config.components.batcher.port
config.components.batcher.port,
config.components.batcher.max_concurrency
);

let class_manager_server = create_remote_server!(
&config.components.class_manager.execution_mode,
|| { clients.get_class_manager_local_client() },
config.components.class_manager.ip,
config.components.class_manager.port
config.components.class_manager.port,
config.components.class_manager.max_concurrency
);

let gateway_server = create_remote_server!(
&config.components.gateway.execution_mode,
|| { clients.get_gateway_local_client() },
config.components.gateway.ip,
config.components.gateway.port
config.components.gateway.port,
config.components.gateway.max_concurrency
);

let l1_provider_server = create_remote_server!(
&config.components.l1_provider.execution_mode,
|| { clients.get_l1_provider_local_client() },
config.components.l1_provider.ip,
config.components.l1_provider.port
config.components.l1_provider.port,
config.components.l1_provider.max_concurrency
);

let mempool_server = create_remote_server!(
&config.components.mempool.execution_mode,
|| { clients.get_mempool_local_client() },
config.components.mempool.ip,
config.components.mempool.port
config.components.mempool.port,
config.components.mempool.max_concurrency
);

let mempool_p2p_propagator_server = create_remote_server!(
&config.components.mempool_p2p.execution_mode,
|| { clients.get_mempool_p2p_propagator_local_client() },
config.components.mempool_p2p.ip,
config.components.mempool_p2p.port
config.components.mempool_p2p.port,
config.components.mempool_p2p.max_concurrency
);

let state_sync_server = create_remote_server!(
&config.components.state_sync.execution_mode,
|| { clients.get_state_sync_local_client() },
config.components.state_sync.ip,
config.components.state_sync.port
config.components.state_sync.port,
config.components.state_sync.max_concurrency
);

RemoteServers {
Expand Down

0 comments on commit b0e1ed9

Please sign in to comment.