Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics reporting for tpu-client-next #5338

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

alexpyattaev
Copy link

@alexpyattaev alexpyattaev commented Mar 17, 2025

Problem

  • tpu-client-next has internal stats but they are not well-designed: memory consumption is unbounded, and no reporting into metrics is provisioned
  • much of the internal state for ConnectionWorkersScheduler is managed externally in callers

Summary of Changes

  • Internalize some of the state for ConnectionsWorkerScheduler (including the stats)
  • remove (now unneeded) generics from TpuClientNextClient
  • patch ConnectionsWorkerScheduler stats into influxdb via a small coroutine that can periodically report stats using the same tokio runtime

@alexpyattaev alexpyattaev force-pushed the tpu_client_next_stats branch 4 times, most recently from ac534ea to 372a50a Compare March 18, 2025 10:51
Copy link

mergify bot commented Mar 18, 2025

If this PR represents a change to the public RPC API:

  1. Make sure it includes a complementary update to rpc-client/ (example)
  2. Open a follow-up PR to update the JavaScript client @solana/web3.js (example)

Thank you for keeping the RPC clients in sync with the server API @alexpyattaev.

@alexpyattaev alexpyattaev force-pushed the tpu_client_next_stats branch from 372a50a to 2f28f72 Compare March 18, 2025 11:59
@alexpyattaev alexpyattaev marked this pull request as ready for review March 18, 2025 12:47
{
pub fn new(
impl TpuClientNextClient {
pub fn new<T>(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why the structure is not needed to be generic any more: because Scheduler is stateful and hence generic can be moved just to new.

Comment on lines +20 to +45
while !cancel.is_cancelled() {
interval.tick().await;
let view = stats.read_and_reset();
let connect_error = view.connect_error_cids_exhausted
+ view.connect_error_other
+ view.connect_error_invalid_remote_address;
let connection_error = view.connection_error_reset
+ view.connection_error_cids_exhausted
+ view.connection_error_timed_out
+ view.connection_error_application_closed
+ view.connection_error_transport_error
+ view.connection_error_version_mismatch
+ view.connection_error_locally_closed;
let write_error = view.write_error_stopped
+ view.write_error_closed_stream
+ view.write_error_connection_lost
+ view.write_error_zero_rtt_rejected;

datapoint_info!(
name,
("connect_error", connect_error, i64),
("connection_error", connection_error, i64),
("successfully_sent", view.successfully_sent, i64),
("write_error", write_error, i64),
);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while !cancel.is_cancelled() {
interval.tick().await;
let view = stats.read_and_reset();
let connect_error = view.connect_error_cids_exhausted
+ view.connect_error_other
+ view.connect_error_invalid_remote_address;
let connection_error = view.connection_error_reset
+ view.connection_error_cids_exhausted
+ view.connection_error_timed_out
+ view.connection_error_application_closed
+ view.connection_error_transport_error
+ view.connection_error_version_mismatch
+ view.connection_error_locally_closed;
let write_error = view.write_error_stopped
+ view.write_error_closed_stream
+ view.write_error_connection_lost
+ view.write_error_zero_rtt_rejected;
datapoint_info!(
name,
("connect_error", connect_error, i64),
("connection_error", connection_error, i64),
("successfully_sent", view.successfully_sent, i64),
("write_error", write_error, i64),
);
}
loop {
select! {
_ = interval.tick() => {
let view = stats.read_and_reset();
let connect_error = view.connect_error_cids_exhausted
+ view.connect_error_other
+ view.connect_error_invalid_remote_address;
let connection_error = view.connection_error_reset
+ view.connection_error_cids_exhausted
+ view.connection_error_timed_out
+ view.connection_error_application_closed
+ view.connection_error_transport_error
+ view.connection_error_version_mismatch
+ view.connection_error_locally_closed;
let write_error = view.write_error_stopped
+ view.write_error_closed_stream
+ view.write_error_connection_lost
+ view.write_error_zero_rtt_rejected;
datapoint_info!(
name,
("connect_error", connect_error, i64),
("connection_error", connection_error, i64),
("successfully_sent", view.successfully_sent, i64),
("write_error", write_error, i64),
);
}
_ = cancel.cancelled() => break,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, we will wait for the specified duration (3sec) until we cancel.

tokio_util::sync::CancellationToken,
};

impl SendTransactionStats {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if some hypothetical client will decide to report metrics somewhere else (not influxdb)? Am I right that the idea is that in this case client will just write his own function report_to_something (because he cannot do impl for SendTransactionStats)?

@@ -0,0 +1,47 @@
use {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use {
//! If `metrics` feature is activated, this module provides `report_to_influxdb`
//! method for [`SendTransactionStats`] which periodically reports transaction
//! sending statistics to InfluxDB.
use {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this crate we use module-level comments

};

impl SendTransactionStats {
///Report the statistics to influxdb in a compact form

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///Report the statistics to influxdb in a compact form
/// Report the statistics to influxdb in a compact form.

Copy link

@KirillLykov KirillLykov Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little bit of bike shedding here

Box::new(leader_updater.clone()),
receiver,
let scheduler = ConnectionWorkersScheduler::new(Box::new(leader_updater), receiver);
// leaking handle to this task, as it will run until the end of life of the process
Copy link

@KirillLykov KirillLykov Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be cancelled as well when, for example, validator identity is updated. So in this case metrics task will be stopped but not will be relaunched in do_update_key

// leaking handle to this task, as it will run until the end of life of the process
runtime_handle.spawn(scheduler.stats.clone().report_to_influxdb(
"send-transaction-service-TPU-client",
Duration::from_secs(3),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this to the some const at the top of the module?

pub struct ConnectionWorkersScheduler {
leader_updater: Box<dyn LeaderUpdater>,
transaction_receiver: TransactionReceiver,
pub stats: Arc<SendTransactionStats>,
Copy link

@KirillLykov KirillLykov Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer having a method for that instead of making it pub. Because as soon as we publish this crate, it will be very hard to change the implementation because this pub stats will be part of public API (in this case need to remove extra clone() in transaction_client):

pub fn stats(&self) -> Arc<SendTransactionStats> {
        self.stats.clone()
    }

@@ -5,13 +5,8 @@ use {
super::QuicError,
quinn::{ConnectError, ConnectionError, WriteError},
std::{
collections::HashMap,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like documentation for this module is stale: we don't collect per IP statistics anylonger

Copy link

@KirillLykov KirillLykov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, these changes provide quite some code simplification which I like a lot. Left some comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants