Skip to content

Commit ac534ea

Browse files
author
Alex Pyattaev
committed
metrics reporting for tpu-client-next
1 parent 4b9b0b7 commit ac534ea

13 files changed

+149
-137
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

programs/sbf/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

send-transaction-service/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ solana-measure = { workspace = true }
2121
solana-metrics = { workspace = true }
2222
solana-runtime = { workspace = true }
2323
solana-sdk = { workspace = true }
24-
solana-tpu-client-next = { workspace = true }
24+
solana-tpu-client-next = { workspace = true, features = ["metrics"] }
2525
tokio = { workspace = true, features = ["full"] }
2626
tokio-util = { workspace = true }
2727

send-transaction-service/src/send_transaction_service.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ mod test {
614614

615615
#[tokio::test(flavor = "multi_thread")]
616616
async fn service_exit_with_tpu_client_next() {
617-
service_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
617+
service_exit::<TpuClientNextClient>(Some(Handle::current()));
618618
}
619619

620620
fn validator_exit<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -665,7 +665,7 @@ mod test {
665665

666666
#[tokio::test(flavor = "multi_thread")]
667667
async fn validator_exit_with_tpu_client_next() {
668-
validator_exit::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
668+
validator_exit::<TpuClientNextClient>(Some(Handle::current()));
669669
}
670670

671671
fn process_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -924,7 +924,7 @@ mod test {
924924

925925
#[tokio::test(flavor = "multi_thread")]
926926
async fn process_transactions_with_tpu_client_next() {
927-
process_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
927+
process_transactions::<TpuClientNextClient>(Some(Handle::current()));
928928
}
929929

930930
fn retry_durable_nonce_transactions<C: ClientWithCreator>(maybe_runtime: Option<Handle>) {
@@ -1234,8 +1234,6 @@ mod test {
12341234

12351235
#[tokio::test(flavor = "multi_thread")]
12361236
async fn retry_durable_nonce_transactions_with_tpu_client_next() {
1237-
retry_durable_nonce_transactions::<TpuClientNextClient<NullTpuInfo>>(Some(
1238-
Handle::current(),
1239-
));
1237+
retry_durable_nonce_transactions::<TpuClientNextClient>(Some(Handle::current()));
12401238
}
12411239
}

send-transaction-service/src/test_utils.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl CreateClient for ConnectionCacheClient<NullTpuInfo> {
4444
}
4545
}
4646

47-
impl CreateClient for TpuClientNextClient<NullTpuInfo> {
47+
impl CreateClient for TpuClientNextClient {
4848
fn create_client(
4949
maybe_runtime: Option<Handle>,
5050
my_tpu_address: SocketAddr,
@@ -53,7 +53,7 @@ impl CreateClient for TpuClientNextClient<NullTpuInfo> {
5353
) -> Self {
5454
let runtime_handle =
5555
maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient.");
56-
Self::new(
56+
Self::new::<NullTpuInfo>(
5757
runtime_handle,
5858
my_tpu_address,
5959
tpu_peers,
@@ -75,10 +75,7 @@ where
7575
fn stop(&self) {}
7676
}
7777

78-
impl<T> Stoppable for TpuClientNextClient<T>
79-
where
80-
T: TpuInfoWithSendStatic + Clone,
81-
{
78+
impl Stoppable for TpuClientNextClient {
8279
fn stop(&self) {
8380
self.cancel().unwrap();
8481
}

send-transaction-service/src/transaction_client.rs

+21-38
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use {
88
solana_measure::measure::Measure,
99
solana_sdk::quic::NotifyKeyUpdate,
1010
solana_tpu_client_next::{
11-
connection_workers_scheduler::{
12-
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
13-
},
11+
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
1412
leader_updater::LeaderUpdater,
1513
transaction_batch::TransactionBatch,
1614
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
@@ -24,6 +22,7 @@ use {
2422
runtime::Handle,
2523
sync::mpsc::{self},
2624
task::JoinHandle as TokioJoinHandle,
25+
time::interval,
2726
},
2827
tokio_util::sync::CancellationToken,
2928
};
@@ -230,27 +229,20 @@ where
230229
/// scheduler. Most of the complexity of this structure arises from this
231230
/// functionality.
232231
#[derive(Clone)]
233-
pub struct TpuClientNextClient<T>
234-
where
235-
T: TpuInfoWithSendStatic + Clone,
236-
{
232+
pub struct TpuClientNextClient {
237233
runtime_handle: Handle,
238234
sender: mpsc::Sender<TransactionBatch>,
239235
// This handle is needed to implement `NotifyKeyUpdate` trait. It's only
240236
// method takes &self and thus we need to wrap with Mutex.
241237
join_and_cancel: Arc<Mutex<(Option<TpuClientJoinHandle>, CancellationToken)>>,
242-
leader_updater: SendTransactionServiceLeaderUpdater<T>,
243238
leader_forward_count: u64,
244239
}
245240

246241
type TpuClientJoinHandle =
247-
TokioJoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>;
242+
TokioJoinHandle<Result<ConnectionWorkersScheduler, ConnectionWorkersSchedulerError>>;
248243

249-
impl<T> TpuClientNextClient<T>
250-
where
251-
T: TpuInfoWithSendStatic + Clone,
252-
{
253-
pub fn new(
244+
impl TpuClientNextClient {
245+
pub fn new<T>(
254246
runtime_handle: Handle,
255247
my_tpu_address: SocketAddr,
256248
tpu_peers: Option<Vec<SocketAddr>>,
@@ -275,18 +267,18 @@ where
275267
tpu_peers,
276268
};
277269
let config = Self::create_config(identity, leader_forward_count as usize);
278-
let handle = runtime_handle.spawn(ConnectionWorkersScheduler::run(
279-
config,
280-
Box::new(leader_updater.clone()),
281-
receiver,
270+
let scheduler = ConnectionWorkersScheduler::new(Box::new(leader_updater), receiver);
271+
// leaking handle to this task, as it will run until the end of life of the process
272+
runtime_handle.spawn(scheduler.stats.clone().report_to_influxdb(
273+
"send-transaction-service-TPU-client",
274+
interval(Duration::from_secs(3)),
282275
cancel.clone(),
283276
));
284-
277+
let handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
285278
Self {
286279
runtime_handle,
287280
join_and_cancel: Arc::new(Mutex::new((Some(handle), cancel))),
288281
sender,
289-
leader_updater,
290282
leader_forward_count,
291283
}
292284
}
@@ -322,7 +314,6 @@ where
322314
async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
323315
let runtime_handle = self.runtime_handle.clone();
324316
let config = Self::create_config(Some(identity), self.leader_forward_count as usize);
325-
let leader_updater = self.leader_updater.clone();
326317
let handle = self.join_and_cancel.clone();
327318

328319
let join_handle = {
@@ -340,14 +331,9 @@ where
340331
};
341332

342333
match result {
343-
Ok((_stats, receiver)) => {
334+
Ok(scheduler) => {
344335
let cancel = CancellationToken::new();
345-
let join_handle = runtime_handle.spawn(ConnectionWorkersScheduler::run(
346-
config,
347-
Box::new(leader_updater),
348-
receiver,
349-
cancel.clone(),
350-
));
336+
let join_handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
351337

352338
let Ok(mut lock) = handle.lock() else {
353339
return Err("TpuClientNext task panicked.".into());
@@ -363,19 +349,13 @@ where
363349
}
364350
}
365351

366-
impl<T> NotifyKeyUpdate for TpuClientNextClient<T>
367-
where
368-
T: TpuInfoWithSendStatic + Clone,
369-
{
352+
impl NotifyKeyUpdate for TpuClientNextClient {
370353
fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
371354
self.runtime_handle.block_on(self.do_update_key(identity))
372355
}
373356
}
374357

375-
impl<T> TransactionClient for TpuClientNextClient<T>
376-
where
377-
T: TpuInfoWithSendStatic + Clone,
378-
{
358+
impl TransactionClient for TpuClientNextClient {
379359
fn send_transactions_in_batch(
380360
&self,
381361
wire_transactions: Vec<Vec<u8>>,
@@ -415,8 +395,11 @@ where
415395
};
416396
match self.runtime_handle.block_on(handle) {
417397
Ok(result) => match result {
418-
Ok(stats) => {
419-
debug!("tpu-client-next statistics over all the connections: {stats:?}");
398+
Ok(scheduler) => {
399+
debug!(
400+
"tpu-client-next statistics over all the connections: {:?}",
401+
scheduler.stats
402+
);
420403
}
421404
Err(error) => error!("tpu-client-next exits with error {error}."),
422405
},

svm/examples/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tpu-client-next/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ solana-clock = { workspace = true }
1818
solana-connection-cache = { workspace = true }
1919
solana-keypair = { workspace = true }
2020
solana-measure = { workspace = true }
21+
solana-metrics ={ workspace = true, optional = true }
2122
solana-quic-definitions = { workspace = true }
2223
solana-rpc-client = { workspace = true }
2324
solana-streamer = { workspace = true }
@@ -39,3 +40,6 @@ solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }
3940

4041
[package.metadata.docs.rs]
4142
targets = ["x86_64-unknown-linux-gnu"]
43+
44+
[features]
45+
metrics =["dep:solana-metrics"]

tpu-client-next/src/connection_workers_scheduler.rs

+30-27
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! to the upcoming leaders.
33
44
use {
5-
super::{leader_updater::LeaderUpdater, SendTransactionStatsPerAddr},
5+
super::leader_updater::LeaderUpdater,
66
crate::{
77
connection_worker::ConnectionWorker,
88
quic_networking::{
@@ -21,14 +21,19 @@ use {
2121
tokio::sync::mpsc,
2222
tokio_util::sync::CancellationToken,
2323
};
24+
pub type TransactionReceiver = mpsc::Receiver<TransactionBatch>;
2425

2526
/// The [`ConnectionWorkersScheduler`] sends transactions from the provided
2627
/// receiver channel to upcoming leaders. It obtains information about future
2728
/// leaders from the implementation of the [`LeaderUpdater`] trait.
2829
///
2930
/// Internally, it enables the management and coordination of multiple network
3031
/// connections, schedules and oversees connection workers.
31-
pub struct ConnectionWorkersScheduler;
32+
pub struct ConnectionWorkersScheduler {
33+
leader_updater: Box<dyn LeaderUpdater>,
34+
transaction_receiver: TransactionReceiver,
35+
pub stats: Arc<SendTransactionStats>,
36+
}
3237

3338
/// Errors that arise from running [`ConnectionWorkersSchedulerError`].
3439
#[derive(Debug, Error, PartialEq)]
@@ -137,12 +142,19 @@ pub trait WorkersBroadcaster {
137142
) -> Result<(), ConnectionWorkersSchedulerError>;
138143
}
139144

140-
pub type TransactionStatsAndReceiver = (
141-
SendTransactionStatsPerAddr,
142-
mpsc::Receiver<TransactionBatch>,
143-
);
144-
145145
impl ConnectionWorkersScheduler {
146+
pub fn new(
147+
leader_updater: Box<dyn LeaderUpdater>,
148+
transaction_receiver: mpsc::Receiver<TransactionBatch>,
149+
) -> Self {
150+
let stats = Arc::new(SendTransactionStats::default());
151+
Self {
152+
leader_updater,
153+
transaction_receiver,
154+
stats,
155+
}
156+
}
157+
146158
/// Starts the scheduler, which manages the distribution of transactions to
147159
/// the network's upcoming leaders.
148160
///
@@ -154,18 +166,12 @@ impl ConnectionWorkersScheduler {
154166
/// will be dropped. The same for transactions that failed to be delivered
155167
/// over the network.
156168
pub async fn run(
169+
self,
157170
config: ConnectionWorkersSchedulerConfig,
158-
leader_updater: Box<dyn LeaderUpdater>,
159-
transaction_receiver: mpsc::Receiver<TransactionBatch>,
160171
cancel: CancellationToken,
161-
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
162-
Self::run_with_broadcaster::<NonblockingBroadcaster>(
163-
config,
164-
leader_updater,
165-
transaction_receiver,
166-
cancel,
167-
)
168-
.await
172+
) -> Result<Self, ConnectionWorkersSchedulerError> {
173+
self.run_with_broadcaster::<NonblockingBroadcaster>(config, cancel)
174+
.await
169175
}
170176

171177
/// Starts the scheduler, which manages the distribution of transactions to
@@ -182,6 +188,7 @@ impl ConnectionWorkersScheduler {
182188
/// Importantly, if some transactions were not delivered due to network
183189
/// problems, they will not be retried when the problem is resolved.
184190
pub async fn run_with_broadcaster<Broadcaster: WorkersBroadcaster>(
191+
mut self,
185192
ConnectionWorkersSchedulerConfig {
186193
bind,
187194
stake_identity,
@@ -191,20 +198,17 @@ impl ConnectionWorkersScheduler {
191198
max_reconnect_attempts,
192199
leaders_fanout,
193200
}: ConnectionWorkersSchedulerConfig,
194-
mut leader_updater: Box<dyn LeaderUpdater>,
195-
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
196201
cancel: CancellationToken,
197-
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
202+
) -> Result<Self, ConnectionWorkersSchedulerError> {
198203
let endpoint = Self::setup_endpoint(bind, stake_identity)?;
199204
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
200205
let mut workers = WorkersCache::new(num_connections, cancel.clone());
201-
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();
202206

203207
let mut last_error = None;
204208

205209
loop {
206210
let transaction_batch: TransactionBatch = tokio::select! {
207-
recv_res = transaction_receiver.recv() => match recv_res {
211+
recv_res = self.transaction_receiver.recv() => match recv_res {
208212
Some(txs) => txs,
209213
None => {
210214
debug!("End of `transaction_receiver`: shutting down.");
@@ -217,21 +221,20 @@ impl ConnectionWorkersScheduler {
217221
}
218222
};
219223

220-
let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect);
224+
let connect_leaders = self.leader_updater.next_leaders(leaders_fanout.connect);
221225
let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);
222226

223227
// add future leaders to the cache to hide the latency of opening
224228
// the connection.
225229
for peer in connect_leaders {
226230
if !workers.contains(&peer) {
227-
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
228231
let worker = Self::spawn_worker(
229232
&endpoint,
230233
&peer,
231234
worker_channel_size,
232235
skip_check_transaction_age,
233236
max_reconnect_attempts,
234-
stats.clone(),
237+
self.stats.clone(),
235238
);
236239
maybe_shutdown_worker(workers.push(peer, worker));
237240
}
@@ -248,11 +251,11 @@ impl ConnectionWorkersScheduler {
248251
workers.shutdown().await;
249252

250253
endpoint.close(0u32.into(), b"Closing connection");
251-
leader_updater.stop().await;
254+
self.leader_updater.stop().await;
252255
if let Some(error) = last_error {
253256
return Err(error);
254257
}
255-
Ok((send_stats_per_addr, transaction_receiver))
258+
Ok(self)
256259
}
257260

258261
/// Sets up the QUIC endpoint for the scheduler to handle connections.

0 commit comments

Comments
 (0)