Skip to content

Commit 372a50a

Browse files
author
Alex Pyattaev
committed
metrics reporting for tpu-client-next
1 parent 0771988 commit 372a50a

14 files changed

+156
-144
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

programs/sbf/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/src/rpc.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -4985,7 +4985,7 @@ pub mod tests {
49854985

49864986
#[test]
49874987
fn test_rpc_request_processor_new_tpu_client_next() {
4988-
rpc_request_processor_new::<TpuClientNextClient<NullTpuInfo>>();
4988+
rpc_request_processor_new::<TpuClientNextClient>();
49894989
}
49904990

49914991
fn rpc_get_balance<Client: ClientWithCreator>() {
@@ -5022,7 +5022,7 @@ pub mod tests {
50225022

50235023
#[test]
50245024
fn test_rpc_get_balance_new_tpu_client_next() {
5025-
rpc_get_balance::<TpuClientNextClient<NullTpuInfo>>();
5025+
rpc_get_balance::<TpuClientNextClient>();
50265026
}
50275027

50285028
fn rpc_get_balance_via_client<Client: ClientWithCreator>() {
@@ -5061,7 +5061,7 @@ pub mod tests {
50615061

50625062
#[test]
50635063
fn test_rpc_get_balance_via_client_tpu_client_next() {
5064-
rpc_get_balance_via_client::<TpuClientNextClient<NullTpuInfo>>();
5064+
rpc_get_balance_via_client::<TpuClientNextClient>();
50655065
}
50665066

50675067
#[test]
@@ -5196,7 +5196,7 @@ pub mod tests {
51965196

51975197
#[test]
51985198
fn test_rpc_get_tx_count_tpu_client_next() {
5199-
rpc_get_tx_count::<TpuClientNextClient<NullTpuInfo>>();
5199+
rpc_get_tx_count::<TpuClientNextClient>();
52005200
}
52015201

52025202
#[test]
@@ -6657,7 +6657,7 @@ pub mod tests {
66576657

66586658
#[test]
66596659
fn test_rpc_send_bad_tx_tpu_client_next() {
6660-
rpc_send_bad_tx::<TpuClientNextClient<NullTpuInfo>>();
6660+
rpc_send_bad_tx::<TpuClientNextClient>();
66616661
}
66626662

66636663
fn rpc_send_transaction_preflight<Client: ClientWithCreator>() {
@@ -6841,7 +6841,7 @@ pub mod tests {
68416841

68426842
#[test]
68436843
fn test_rpc_send_transaction_preflight_with_tpu_client_next() {
6844-
rpc_send_transaction_preflight::<TpuClientNextClient<NullTpuInfo>>();
6844+
rpc_send_transaction_preflight::<TpuClientNextClient>();
68456845
}
68466846

68476847
#[test]
@@ -7054,7 +7054,7 @@ pub mod tests {
70547054

70557055
#[test]
70567056
fn test_rpc_processor_get_block_commitment_with_tpu_client_next() {
7057-
rpc_processor_get_block_commitment::<TpuClientNextClient<NullTpuInfo>>();
7057+
rpc_processor_get_block_commitment::<TpuClientNextClient>();
70587058
}
70597059

70607060
#[test]

send-transaction-service/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ solana-measure = { workspace = true }
2121
solana-metrics = { workspace = true }
2222
solana-runtime = { workspace = true }
2323
solana-sdk = { workspace = true }
24-
solana-tpu-client-next = { workspace = true }
24+
solana-tpu-client-next = { workspace = true, features = ["metrics"] }
2525
tokio = { workspace = true, features = ["full"] }
2626
tokio-util = { workspace = true }
2727

send-transaction-service/src/send_transaction_service.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ mod test {
614614

615615
#[tokio::test(flavor = "multi_thread")]
616616
async fn service_exit_with_tpu_client_next() {
617-
service_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
617+
service_exit::<TpuClientNextClient>(Some(Handle::current()));
618618
}
619619

620620
fn validator_exit<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -665,7 +665,7 @@ mod test {
665665

666666
#[tokio::test(flavor = "multi_thread")]
667667
async fn validator_exit_with_tpu_client_next() {
668-
validator_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
668+
validator_exit::<TpuClientNextClient>(Some(Handle::current()));
669669
}
670670

671671
fn process_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -924,7 +924,7 @@ mod test {
924924

925925
#[tokio::test(flavor = "multi_thread")]
926926
async fn process_transactions_with_tpu_client_next() {
927-
process_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
927+
process_transactions::<TpuClientNextClient>(Some(Handle::current()));
928928
}
929929

930930
fn retry_durable_nonce_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -1234,8 +1234,6 @@ mod test {
12341234

12351235
#[tokio::test(flavor = "multi_thread")]
12361236
async fn retry_durable_nonce_transactions_with_tpu_client_next() {
1237-
retry_durable_nonce_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(
1238-
Handle::current(),
1239-
));
1237+
retry_durable_nonce_transactions::<TpuClientNextClient>(Some(Handle::current()));
12401238
}
12411239
}

send-transaction-service/src/test_utils.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl CreateClient for ConnectionCacheClient<NullTpuInfo> {
4343
}
4444
}
4545

46-
impl CreateClient for TpuClientNextClient<NullTpuInfo> {
46+
impl CreateClient for TpuClientNextClient {
4747
fn create_client(
4848
maybe_runtime: Option<Handle>,
4949
my_tpu_address: SocketAddr,
@@ -52,7 +52,7 @@ impl CreateClient for TpuClientNextClient<NullTpuInfo> {
5252
) -> Self {
5353
let runtime_handle =
5454
maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient.");
55-
Self::new(
55+
Self::new::<NullTpuInfo>(
5656
runtime_handle,
5757
my_tpu_address,
5858
tpu_peers,
@@ -74,10 +74,7 @@ where
7474
fn stop(&self) {}
7575
}
7676

77-
impl<T> Stoppable for TpuClientNextClient<T>
78-
where
79-
T: TpuInfoWithSendStatic + Clone,
80-
{
77+
impl Stoppable for TpuClientNextClient {
8178
fn stop(&self) {
8279
self.cancel().unwrap();
8380
}

send-transaction-service/src/transaction_client.rs

+21-38
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use {
88
solana_measure::measure::Measure,
99
solana_sdk::quic::NotifyKeyUpdate,
1010
solana_tpu_client_next::{
11-
connection_workers_scheduler::{
12-
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
13-
},
11+
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
1412
leader_updater::LeaderUpdater,
1513
transaction_batch::TransactionBatch,
1614
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
@@ -24,6 +22,7 @@ use {
2422
runtime::Handle,
2523
sync::mpsc::{self},
2624
task::JoinHandle as TokioJoinHandle,
25+
time::interval,
2726
},
2827
tokio_util::sync::CancellationToken,
2928
};
@@ -230,27 +229,20 @@ where
230229
/// scheduler. Most of the complexity of this structure arises from this
231230
/// functionality.
232231
#[derive(Clone)]
233-
pub struct TpuClientNextClient<T>
234-
where
235-
T: TpuInfoWithSendStatic + Clone,
236-
{
232+
pub struct TpuClientNextClient {
237233
runtime_handle: Handle,
238234
sender: mpsc::Sender<TransactionBatch>,
239235
// This handle is needed to implement `NotifyKeyUpdate` trait. It's only
240236
// method takes &self and thus we need to wrap with Mutex.
241237
join_and_cancel: Arc<Mutex<(Option<TpuClientJoinHandle>, CancellationToken)>>,
242-
leader_updater: SendTransactionServiceLeaderUpdater<T>,
243238
leader_forward_count: u64,
244239
}
245240

246241
type TpuClientJoinHandle =
247-
TokioJoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>;
242+
TokioJoinHandle<Result<ConnectionWorkersScheduler, ConnectionWorkersSchedulerError>>;
248243

249-
impl<T> TpuClientNextClient<T>
250-
where
251-
T: TpuInfoWithSendStatic + Clone,
252-
{
253-
pub fn new(
244+
impl TpuClientNextClient {
245+
pub fn new<T>(
254246
runtime_handle: Handle,
255247
my_tpu_address: SocketAddr,
256248
tpu_peers: Option<Vec<SocketAddr>>,
@@ -275,18 +267,18 @@ where
275267
tpu_peers,
276268
};
277269
let config = Self::create_config(identity, leader_forward_count as usize);
278-
let handle = runtime_handle.spawn(ConnectionWorkersScheduler::run(
279-
config,
280-
Box::new(leader_updater.clone()),
281-
receiver,
270+
let scheduler = ConnectionWorkersScheduler::new(Box::new(leader_updater), receiver);
271+
// leaking handle to this task, as it will run until the end of life of the process
272+
runtime_handle.spawn(scheduler.stats.clone().report_to_influxdb(
273+
"send-transaction-service-TPU-client",
274+
interval(Duration::from_secs(3)),
282275
cancel.clone(),
283276
));
284-
277+
let handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
285278
Self {
286279
runtime_handle,
287280
join_and_cancel: Arc::new(Mutex::new((Some(handle), cancel))),
288281
sender,
289-
leader_updater,
290282
leader_forward_count,
291283
}
292284
}
@@ -322,7 +314,6 @@ where
322314
async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
323315
let runtime_handle = self.runtime_handle.clone();
324316
let config = Self::create_config(Some(identity), self.leader_forward_count as usize);
325-
let leader_updater = self.leader_updater.clone();
326317
let handle = self.join_and_cancel.clone();
327318

328319
let join_handle = {
@@ -340,14 +331,9 @@ where
340331
};
341332

342333
match result {
343-
Ok((_stats, receiver)) => {
334+
Ok(scheduler) => {
344335
let cancel = CancellationToken::new();
345-
let join_handle = runtime_handle.spawn(ConnectionWorkersScheduler::run(
346-
config,
347-
Box::new(leader_updater),
348-
receiver,
349-
cancel.clone(),
350-
));
336+
let join_handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
351337

352338
let Ok(mut lock) = handle.lock() else {
353339
return Err("TpuClientNext task panicked.".into());
@@ -363,19 +349,13 @@ where
363349
}
364350
}
365351

366-
impl<T> NotifyKeyUpdate for TpuClientNextClient<T>
367-
where
368-
T: TpuInfoWithSendStatic + Clone,
369-
{
352+
impl NotifyKeyUpdate for TpuClientNextClient {
370353
fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
371354
self.runtime_handle.block_on(self.do_update_key(identity))
372355
}
373356
}
374357

375-
impl<T> TransactionClient for TpuClientNextClient<T>
376-
where
377-
T: TpuInfoWithSendStatic + Clone,
378-
{
358+
impl TransactionClient for TpuClientNextClient {
379359
fn send_transactions_in_batch(
380360
&self,
381361
wire_transactions: Vec<Vec<u8>>,
@@ -415,8 +395,11 @@ where
415395
};
416396
match self.runtime_handle.block_on(handle) {
417397
Ok(result) => match result {
418-
Ok(stats) => {
419-
debug!("tpu-client-next statistics over all the connections: {stats:?}");
398+
Ok(scheduler) => {
399+
debug!(
400+
"tpu-client-next statistics over all the connections: {:?}",
401+
scheduler.stats
402+
);
420403
}
421404
Err(error) => error!("tpu-client-next exits with error {error}."),
422405
},

svm/examples/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tpu-client-next/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ solana-clock = { workspace = true }
1818
solana-connection-cache = { workspace = true }
1919
solana-keypair = { workspace = true }
2020
solana-measure = { workspace = true }
21+
solana-metrics ={ workspace = true, optional = true }
2122
solana-quic-definitions = { workspace = true }
2223
solana-rpc-client = { workspace = true }
2324
solana-streamer = { workspace = true }
@@ -39,3 +40,6 @@ solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }
3940

4041
[package.metadata.docs.rs]
4142
targets = ["x86_64-unknown-linux-gnu"]
43+
44+
[features]
45+
metrics =["dep:solana-metrics"]

0 commit comments

Comments
 (0)