Skip to content

Commit 2f28f72

Browse files
author
Alex Pyattaev
committed
metrics reporting for tpu-client-next
1 parent 0771988 commit 2f28f72

14 files changed

+159
-144
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

programs/sbf/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/src/rpc.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -4985,7 +4985,7 @@ pub mod tests {
49854985

49864986
#[test]
49874987
fn test_rpc_request_processor_new_tpu_client_next() {
4988-
rpc_request_processor_new::<TpuClientNextClient<NullTpuInfo>>();
4988+
rpc_request_processor_new::<TpuClientNextClient>();
49894989
}
49904990

49914991
fn rpc_get_balance<Client: ClientWithCreator>() {
@@ -5022,7 +5022,7 @@ pub mod tests {
50225022

50235023
#[test]
50245024
fn test_rpc_get_balance_new_tpu_client_next() {
5025-
rpc_get_balance::<TpuClientNextClient<NullTpuInfo>>();
5025+
rpc_get_balance::<TpuClientNextClient>();
50265026
}
50275027

50285028
fn rpc_get_balance_via_client<Client: ClientWithCreator>() {
@@ -5061,7 +5061,7 @@ pub mod tests {
50615061

50625062
#[test]
50635063
fn test_rpc_get_balance_via_client_tpu_client_next() {
5064-
rpc_get_balance_via_client::<TpuClientNextClient<NullTpuInfo>>();
5064+
rpc_get_balance_via_client::<TpuClientNextClient>();
50655065
}
50665066

50675067
#[test]
@@ -5196,7 +5196,7 @@ pub mod tests {
51965196

51975197
#[test]
51985198
fn test_rpc_get_tx_count_tpu_client_next() {
5199-
rpc_get_tx_count::<TpuClientNextClient<NullTpuInfo>>();
5199+
rpc_get_tx_count::<TpuClientNextClient>();
52005200
}
52015201

52025202
#[test]
@@ -6657,7 +6657,7 @@ pub mod tests {
66576657

66586658
#[test]
66596659
fn test_rpc_send_bad_tx_tpu_client_next() {
6660-
rpc_send_bad_tx::<TpuClientNextClient<NullTpuInfo>>();
6660+
rpc_send_bad_tx::<TpuClientNextClient>();
66616661
}
66626662

66636663
fn rpc_send_transaction_preflight<Client: ClientWithCreator>() {
@@ -6841,7 +6841,7 @@ pub mod tests {
68416841

68426842
#[test]
68436843
fn test_rpc_send_transaction_preflight_with_tpu_client_next() {
6844-
rpc_send_transaction_preflight::<TpuClientNextClient<NullTpuInfo>>();
6844+
rpc_send_transaction_preflight::<TpuClientNextClient>();
68456845
}
68466846

68476847
#[test]
@@ -7054,7 +7054,7 @@ pub mod tests {
70547054

70557055
#[test]
70567056
fn test_rpc_processor_get_block_commitment_with_tpu_client_next() {
7057-
rpc_processor_get_block_commitment::<TpuClientNextClient<NullTpuInfo>>();
7057+
rpc_processor_get_block_commitment::<TpuClientNextClient>();
70587058
}
70597059

70607060
#[test]

send-transaction-service/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ solana-measure = { workspace = true }
2121
solana-metrics = { workspace = true }
2222
solana-runtime = { workspace = true }
2323
solana-sdk = { workspace = true }
24-
solana-tpu-client-next = { workspace = true }
24+
solana-tpu-client-next = { workspace = true, features = ["metrics"] }
2525
tokio = { workspace = true, features = ["full"] }
2626
tokio-util = { workspace = true }
2727

send-transaction-service/src/send_transaction_service.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ mod test {
614614

615615
#[tokio::test(flavor = "multi_thread")]
616616
async fn service_exit_with_tpu_client_next() {
617-
service_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
617+
service_exit::<TpuClientNextClient>(Some(Handle::current()));
618618
}
619619

620620
fn validator_exit<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -665,7 +665,7 @@ mod test {
665665

666666
#[tokio::test(flavor = "multi_thread")]
667667
async fn validator_exit_with_tpu_client_next() {
668-
validator_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
668+
validator_exit::<TpuClientNextClient>(Some(Handle::current()));
669669
}
670670

671671
fn process_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -924,7 +924,7 @@ mod test {
924924

925925
#[tokio::test(flavor = "multi_thread")]
926926
async fn process_transactions_with_tpu_client_next() {
927-
process_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
927+
process_transactions::<TpuClientNextClient>(Some(Handle::current()));
928928
}
929929

930930
fn retry_durable_nonce_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -1234,8 +1234,6 @@ mod test {
12341234

12351235
#[tokio::test(flavor = "multi_thread")]
12361236
async fn retry_durable_nonce_transactions_with_tpu_client_next() {
1237-
retry_durable_nonce_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(
1238-
Handle::current(),
1239-
));
1237+
retry_durable_nonce_transactions::<TpuClientNextClient>(Some(Handle::current()));
12401238
}
12411239
}

send-transaction-service/src/test_utils.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl CreateClient for ConnectionCacheClient<NullTpuInfo> {
4343
}
4444
}
4545

46-
impl CreateClient for TpuClientNextClient<NullTpuInfo> {
46+
impl CreateClient for TpuClientNextClient {
4747
fn create_client(
4848
maybe_runtime: Option<Handle>,
4949
my_tpu_address: SocketAddr,
@@ -52,7 +52,7 @@ impl CreateClient for TpuClientNextClient<NullTpuInfo> {
5252
) -> Self {
5353
let runtime_handle =
5454
maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient.");
55-
Self::new(
55+
Self::new::<NullTpuInfo>(
5656
runtime_handle,
5757
my_tpu_address,
5858
tpu_peers,
@@ -74,10 +74,7 @@ where
7474
fn stop(&self) {}
7575
}
7676

77-
impl<T> Stoppable for TpuClientNextClient<T>
78-
where
79-
T: TpuInfoWithSendStatic + Clone,
80-
{
77+
impl Stoppable for TpuClientNextClient {
8178
fn stop(&self) {
8279
self.cancel().unwrap();
8380
}

send-transaction-service/src/transaction_client.rs

+20-38
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use {
88
solana_measure::measure::Measure,
99
solana_sdk::quic::NotifyKeyUpdate,
1010
solana_tpu_client_next::{
11-
connection_workers_scheduler::{
12-
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
13-
},
11+
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
1412
leader_updater::LeaderUpdater,
1513
transaction_batch::TransactionBatch,
1614
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
@@ -230,27 +228,20 @@ where
230228
/// scheduler. Most of the complexity of this structure arises from this
231229
/// functionality.
232230
#[derive(Clone)]
233-
pub struct TpuClientNextClient<T>
234-
where
235-
T: TpuInfoWithSendStatic + Clone,
236-
{
231+
pub struct TpuClientNextClient {
237232
runtime_handle: Handle,
238233
sender: mpsc::Sender<TransactionBatch>,
239234
// This handle is needed to implement `NotifyKeyUpdate` trait. It's only
240235
// method takes &self and thus we need to wrap with Mutex.
241236
join_and_cancel: Arc<Mutex<(Option<TpuClientJoinHandle>, CancellationToken)>>,
242-
leader_updater: SendTransactionServiceLeaderUpdater<T>,
243237
leader_forward_count: u64,
244238
}
245239

246240
type TpuClientJoinHandle =
247-
TokioJoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>;
241+
TokioJoinHandle<Result<ConnectionWorkersScheduler, ConnectionWorkersSchedulerError>>;
248242

249-
impl<T> TpuClientNextClient<T>
250-
where
251-
T: TpuInfoWithSendStatic + Clone,
252-
{
253-
pub fn new(
243+
impl TpuClientNextClient {
244+
pub fn new<T>(
254245
runtime_handle: Handle,
255246
my_tpu_address: SocketAddr,
256247
tpu_peers: Option<Vec<SocketAddr>>,
@@ -275,18 +266,18 @@ where
275266
tpu_peers,
276267
};
277268
let config = Self::create_config(identity, leader_forward_count as usize);
278-
let handle = runtime_handle.spawn(ConnectionWorkersScheduler::run(
279-
config,
280-
Box::new(leader_updater.clone()),
281-
receiver,
269+
let scheduler = ConnectionWorkersScheduler::new(Box::new(leader_updater), receiver);
270+
// leaking handle to this task, as it will run until the end of life of the process
271+
runtime_handle.spawn(scheduler.stats.clone().report_to_influxdb(
272+
"send-transaction-service-TPU-client",
273+
Duration::from_secs(3),
282274
cancel.clone(),
283275
));
284-
276+
let handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
285277
Self {
286278
runtime_handle,
287279
join_and_cancel: Arc::new(Mutex::new((Some(handle), cancel))),
288280
sender,
289-
leader_updater,
290281
leader_forward_count,
291282
}
292283
}
@@ -322,7 +313,6 @@ where
322313
async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
323314
let runtime_handle = self.runtime_handle.clone();
324315
let config = Self::create_config(Some(identity), self.leader_forward_count as usize);
325-
let leader_updater = self.leader_updater.clone();
326316
let handle = self.join_and_cancel.clone();
327317

328318
let join_handle = {
@@ -340,14 +330,9 @@ where
340330
};
341331

342332
match result {
343-
Ok((_stats, receiver)) => {
333+
Ok(scheduler) => {
344334
let cancel = CancellationToken::new();
345-
let join_handle = runtime_handle.spawn(ConnectionWorkersScheduler::run(
346-
config,
347-
Box::new(leader_updater),
348-
receiver,
349-
cancel.clone(),
350-
));
335+
let join_handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
351336

352337
let Ok(mut lock) = handle.lock() else {
353338
return Err("TpuClientNext task panicked.".into());
@@ -363,19 +348,13 @@ where
363348
}
364349
}
365350

366-
impl<T> NotifyKeyUpdate for TpuClientNextClient<T>
367-
where
368-
T: TpuInfoWithSendStatic + Clone,
369-
{
351+
impl NotifyKeyUpdate for TpuClientNextClient {
370352
fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
371353
self.runtime_handle.block_on(self.do_update_key(identity))
372354
}
373355
}
374356

375-
impl<T> TransactionClient for TpuClientNextClient<T>
376-
where
377-
T: TpuInfoWithSendStatic + Clone,
378-
{
357+
impl TransactionClient for TpuClientNextClient {
379358
fn send_transactions_in_batch(
380359
&self,
381360
wire_transactions: Vec<Vec<u8>>,
@@ -415,8 +394,11 @@ where
415394
};
416395
match self.runtime_handle.block_on(handle) {
417396
Ok(result) => match result {
418-
Ok(stats) => {
419-
debug!("tpu-client-next statistics over all the connections: {stats:?}");
397+
Ok(scheduler) => {
398+
debug!(
399+
"tpu-client-next statistics over all the connections: {:?}",
400+
scheduler.stats
401+
);
420402
}
421403
Err(error) => error!("tpu-client-next exits with error {error}."),
422404
},

svm/examples/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tpu-client-next/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ solana-clock = { workspace = true }
1818
solana-connection-cache = { workspace = true }
1919
solana-keypair = { workspace = true }
2020
solana-measure = { workspace = true }
21+
solana-metrics ={ workspace = true, optional = true }
2122
solana-quic-definitions = { workspace = true }
2223
solana-rpc-client = { workspace = true }
2324
solana-streamer = { workspace = true }
@@ -39,3 +40,6 @@ solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }
3940

4041
[package.metadata.docs.rs]
4142
targets = ["x86_64-unknown-linux-gnu"]
43+
44+
[features]
45+
metrics =["dep:solana-metrics"]

0 commit comments

Comments
 (0)