Skip to content

Commit

Permalink
Make grpc authentication optional (#46)
Browse files Browse the repository at this point in the history
Authentication is optional to access the grpc endpoint.

Please follow the example code to see how to use it with or without authentication.

If you use an unauthorized pubkey, your request will be rejected.
  • Loading branch information
ebin-mathews authored May 14, 2024
1 parent 38b6803 commit 3708a1e
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 132 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backrun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ spl-memo = "3.0.1"
thiserror = "1.0.40"
tokio = "1"
tonic = { version = "0.10", features = ["tls", "tls-roots", "tls-webpki-roots"] }
uuid = { version = "1.8.0", features = ["v4"] }
7 changes: 6 additions & 1 deletion backrun/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
See the [cli README](../cli/README.md) for setup instructions.

## Usage

```bash
cargo run --bin jito-backrun-example -- \
--block-engine-url <BLOCK_ENGINE_URL> \
Expand All @@ -12,10 +13,13 @@ cargo run --bin jito-backrun-example -- \
--rpc-url http://{RPC_URL}:8899 \
--tip-program-id <TIP_PROGRAM_ID> \
--backrun-accounts <BACKRUN_ACCOUNTS>
# Note: Don't provide --auth-keypair argument if not planning to use authentication
```

## Example

Backrun transactions that write-lock the [Pyth SOL/USDC account](https://solscan.io/account/H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG):

```bash
RUST_LOG=INFO cargo run --bin jito-backrun-example -- \
--block-engine-url https://frankfurt.mainnet.block-engine.jito.wtf \
Expand All @@ -25,4 +29,5 @@ RUST_LOG=INFO cargo run --bin jito-backrun-example -- \
--rpc-url https://api.mainnet-beta.solana.com:8899 \
--tip-program-id T1pyyaTNZsKv2WcRAB8oVnk93mLJw2XzjtVYqCsaHqt \
--backrun-accounts H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG
```
# Note: Don't provide --auth-keypair argument if not planning to use authentication
```
158 changes: 72 additions & 86 deletions backrun/src/event_loops.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use futures_util::StreamExt;
use jito_protos::{
bundle::BundleResult,
searcher::{
mempool_subscription, MempoolSubscription, PendingTxNotification,
SubscribeBundleResultsRequest, WriteLockedAccountSubscriptionV0,
mempool_subscription, searcher_service_client::SearcherServiceClient, MempoolSubscription,
PendingTxNotification, SubscribeBundleResultsRequest, WriteLockedAccountSubscriptionV0,
},
};
use jito_searcher_client::get_searcher_client;
use log::info;
use solana_client::{
nonblocking::pubsub_client::PubsubClient,
Expand All @@ -21,11 +20,13 @@ use solana_sdk::{
clock::Slot,
commitment_config::{CommitmentConfig, CommitmentLevel},
pubkey::Pubkey,
signature::Keypair,
};
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
use tokio::{sync::mpsc::Sender, time::sleep};
use tonic::Streaming;
use tonic::{
codegen::{Body, Bytes, StdError},
Streaming,
};

// slot update subscription loop that attempts to maintain a connection to an RPC server
pub async fn slot_subscribe_loop(pubsub_addr: String, slot_sender: Sender<Slot>) {
Expand Down Expand Up @@ -144,13 +145,18 @@ pub async fn block_subscribe_loop(
}

// attempts to maintain connection to searcher service and stream pending transaction notifications over a channel
pub async fn pending_tx_loop(
block_engine_url: String,
auth_keypair: Arc<Keypair>,
pub async fn pending_tx_loop<T>(
mut searcher_client: SearcherServiceClient<T>,
pending_tx_sender: Sender<PendingTxNotification>,
backrun_pubkeys: Vec<Pubkey>,
) {
let mut num_searcher_connection_errors: usize = 0;
) where
T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + 'static + Clone,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: std::marker::Send,
{
let _num_searcher_connection_errors: usize = 0;
let mut num_pending_tx_sub_errors: usize = 0;
let mut num_pending_tx_stream_errors: usize = 0;
let mut num_pending_tx_stream_disconnects: usize = 0;
Expand All @@ -160,103 +166,83 @@ pub async fn pending_tx_loop(
loop {
sleep(Duration::from_secs(1)).await;

match get_searcher_client(&block_engine_url, &auth_keypair).await {
Ok(mut searcher_client) => {
match searcher_client
.subscribe_mempool(MempoolSubscription {
regions: vec![],
msg: Some(mempool_subscription::Msg::WlaV0Sub(
WriteLockedAccountSubscriptionV0 {
accounts: backrun_pubkeys.iter().map(|pk| pk.to_string()).collect(),
},
)),
})
.await
{
Ok(pending_tx_stream_response) => {
let mut pending_tx_stream = pending_tx_stream_response.into_inner();
while let Some(maybe_notification) = pending_tx_stream.next().await {
match maybe_notification {
Ok(notification) => {
if pending_tx_sender.send(notification).await.is_err() {
datapoint_error!(
"pending_tx_send_error",
("errors", 1, i64)
);
return;
}
}
Err(e) => {
num_pending_tx_stream_errors += 1;
datapoint_error!(
"searcher_pending_tx_stream_error",
("errors", num_pending_tx_stream_errors, i64),
("error_str", e.to_string(), String)
);
break;
}
match searcher_client
.subscribe_mempool(MempoolSubscription {
regions: vec![],
msg: Some(mempool_subscription::Msg::WlaV0Sub(
WriteLockedAccountSubscriptionV0 {
accounts: backrun_pubkeys.iter().map(|pk| pk.to_string()).collect(),
},
)),
})
.await
{
Ok(pending_tx_stream_response) => {
let mut pending_tx_stream = pending_tx_stream_response.into_inner();
while let Some(maybe_notification) = pending_tx_stream.next().await {
match maybe_notification {
Ok(notification) => {
if pending_tx_sender.send(notification).await.is_err() {
datapoint_error!("pending_tx_send_error", ("errors", 1, i64));
return;
}
}
num_pending_tx_stream_disconnects += 1;
datapoint_error!(
"searcher_pending_tx_stream_disconnect",
("errors", num_pending_tx_stream_disconnects, i64),
);
}
Err(e) => {
num_pending_tx_sub_errors += 1;
datapoint_error!(
"searcher_pending_tx_sub_error",
("errors", num_pending_tx_sub_errors, i64),
("error_str", e.to_string(), String)
);
Err(e) => {
num_pending_tx_stream_errors += 1;
datapoint_error!(
"searcher_pending_tx_stream_error",
("errors", num_pending_tx_stream_errors, i64),
("error_str", e.to_string(), String)
);
break;
}
}
}
num_pending_tx_stream_disconnects += 1;
datapoint_error!(
"searcher_pending_tx_stream_disconnect",
("errors", num_pending_tx_stream_disconnects, i64),
);
}
Err(e) => {
num_searcher_connection_errors += 1;
num_pending_tx_sub_errors += 1;
datapoint_error!(
"searcher_connection_error",
("errors", num_searcher_connection_errors, i64),
"searcher_pending_tx_sub_error",
("errors", num_pending_tx_sub_errors, i64),
("error_str", e.to_string(), String)
);
}
}
}
}

pub async fn bundle_results_loop(
block_engine_url: String,
auth_keypair: Arc<Keypair>,
pub async fn bundle_results_loop<T>(
mut searcher_client: SearcherServiceClient<T>,
bundle_results_sender: Sender<BundleResult>,
) {
let mut connection_errors: usize = 0;
) where
T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + 'static + Clone,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: std::marker::Send,
{
let _connection_errors: usize = 0;
let mut response_errors: usize = 0;

loop {
sleep(Duration::from_millis(1000)).await;
match get_searcher_client(&block_engine_url, &auth_keypair).await {
Ok(mut c) => match c
.subscribe_bundle_results(SubscribeBundleResultsRequest {})
.await
{
Ok(resp) => {
consume_bundle_results_stream(resp.into_inner(), &bundle_results_sender).await;
}
Err(e) => {
response_errors += 1;
datapoint_error!(
"searcher_bundle_results_error",
("errors", response_errors, i64),
("msg", e.to_string(), String)
);
}
},
match searcher_client
.subscribe_bundle_results(SubscribeBundleResultsRequest {})
.await
{
Ok(resp) => {
consume_bundle_results_stream(resp.into_inner(), &bundle_results_sender).await;
}
Err(e) => {
connection_errors += 1;
response_errors += 1;
datapoint_error!(
"searcher_bundle_results_error",
("errors", connection_errors, i64),
("errors", response_errors, i64),
("msg", e.to_string(), String)
);
}
Expand Down
Loading

0 comments on commit 3708a1e

Please sign in to comment.