diff --git a/ghost-crab/Cargo.toml b/ghost-crab/Cargo.toml index 81b302d..63af2f0 100644 --- a/ghost-crab/Cargo.toml +++ b/ghost-crab/Cargo.toml @@ -14,6 +14,7 @@ alloy = { version = "0.1.0", features = [ "contract", "provider-http", "rpc-types-eth", + "json-rpc", ] } tokio = { version = "1.37.0", features = ["full"] } dotenvy = "0.15" @@ -29,3 +30,4 @@ ghost-crab-common = { path = "../ghost-crab-common", version = "0.2.0" } serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" rocksdb = "0.22.0" +tower = "0.4.13" diff --git a/ghost-crab/src/block_handler.rs b/ghost-crab/src/block_handler.rs index ed36eb1..b42e5cf 100644 --- a/ghost-crab/src/block_handler.rs +++ b/ghost-crab/src/block_handler.rs @@ -1,10 +1,9 @@ +use crate::cache::manager::CacheProvider; use crate::indexer::TemplateManager; use crate::latest_block_manager::LatestBlockManager; use alloy::providers::Provider; -use alloy::providers::RootProvider; use alloy::rpc::types::eth::Block; use alloy::rpc::types::eth::BlockNumberOrTag; -use alloy::transports::http::{Client, Http}; use alloy::transports::TransportError; use async_trait::async_trait; use ghost_crab_common::config::ExecutionMode; @@ -12,7 +11,7 @@ use std::sync::Arc; use std::time::Duration; pub struct BlockContext { - pub provider: RootProvider>, + pub provider: CacheProvider, pub templates: TemplateManager, pub block: Block, } @@ -32,7 +31,7 @@ pub trait BlockHandler { pub struct ProcessBlocksInput { pub handler: BlockHandlerInstance, pub templates: TemplateManager, - pub provider: RootProvider>, + pub provider: CacheProvider, } pub async fn process_blocks( diff --git a/ghost-crab/src/cache/cache.rs b/ghost-crab/src/cache/cache.rs new file mode 100644 index 0000000..6d364bb --- /dev/null +++ b/ghost-crab/src/cache/cache.rs @@ -0,0 +1,17 @@ +use rocksdb::DB; + +#[derive(Debug)] +pub enum Error { + DB(rocksdb::Error), + CacheFileNotFound(std::io::Error), +} + +type Result = core::result::Result; + +pub fn load_cache(network: &str) -> Result { + let current_dir = std::env::current_dir().map_err(|e| Error::CacheFileNotFound(e))?; + let cache_path = current_dir.join("cache").join(network); + let db = DB::open_default(cache_path).map_err(|e| Error::DB(e))?; + + Ok(db) +} diff --git a/ghost-crab/src/cache/cache_layer.rs b/ghost-crab/src/cache/cache_layer.rs new file mode 100644 index 0000000..f4fcf56 --- /dev/null +++ b/ghost-crab/src/cache/cache_layer.rs @@ -0,0 +1,157 @@ +use alloy::rpc::json_rpc::{ + Id, RequestPacket, Response, ResponsePacket, ResponsePayload, SerializedRequest, +}; +use alloy::transports::{RpcError, TransportError, TransportErrorKind}; +use core::str; +use rocksdb::DB; +use serde_json::value::RawValue; +use std::{ + fmt::Debug, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tower::{Layer, Service}; + +pub struct CacheLayer { + network: String, + db: Arc, +} + +impl CacheLayer { + pub fn new(network: &str, db: DB) -> Self { + Self { network: network.into(), db: Arc::new(db) } + } +} + +impl Layer for CacheLayer { + type Service = CacheService; + + fn layer(&self, inner: S) -> Self::Service { + CacheService { inner, network: self.network.clone(), db: Arc::clone(&self.db) } + } +} + +#[derive(Debug, Clone)] +pub struct CacheService { + inner: S, + network: String, + db: Arc, +} + +impl CacheService { + fn cache_get(&self, key: &str) -> Option> { + let key = self.network.clone() + key; + + if let Ok(result) = self.db.get(key) { + return result; + } + + None + } + + fn convert_to_response( + &self, + raw_response: Vec, + ) -> Pin> + Send>> { + let raw_value = String::from_utf8(raw_response).unwrap(); + let raw_value = RawValue::from_string(raw_value).unwrap(); + + let response_payload: ResponsePayload, Box>; + response_payload = ResponsePayload::Success(raw_value); + + let response_single = Response { id: Id::Number(0), payload: response_payload }; + + let response_packet: ResponsePacket; + response_packet = ResponsePacket::Single(response_single); + + let response: Result>; + response = Ok(response_packet); + + return Box::pin(async move { + return response; + }); + } +} + +const INVALID_WORDS: &[&[u8]] = &[b"earliest", b"latest", b"safe", b"finalized", b"pending"]; + +#[inline] +fn contains_invalid_word(input: &[u8]) -> bool { + for search in INVALID_WORDS { + if input.windows(search.len()).any(|x| &x == search) { + return true; + } + } + + false +} + +fn cacheable_request(request: &SerializedRequest) -> bool { + if !matches!(request.method(), "eth_getBlockByNumber") { + return false; + } + + let raw_request = request.serialized().get(); + + if contains_invalid_word(raw_request.as_bytes()) { + return false; + } + + return true; +} + +impl Service for CacheService +where + S: Service, + S::Future: Send + 'static, + S::Response: Send + 'static + Debug, + S::Error: Send + 'static + Debug, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + if let RequestPacket::Single(single) = &request { + if cacheable_request(single) { + let raw_request = single.serialized().get(); + + // Set id to zero + let id = single.id(); + let id_old = format!("\"id\":{id}"); + let id_new = "\"id\":0"; + let raw_request = raw_request.replace(&id_old, id_new); + + if let Some(raw_data) = self.cache_get(&raw_request) { + return self.convert_to_response(raw_data); + } + + let db = Arc::clone(&self.db); + let future = self.inner.call(request); + + return Box::pin(async move { + let response = future.await; + + if let Ok(response) = &response { + if let ResponsePacket::Single(single) = response { + if let ResponsePayload::Success(payload) = &single.payload { + let raw_response = payload.get(); + db.put(raw_request, raw_response).unwrap(); + } + } + } + + response + }); + } + } + + Box::pin(self.inner.call(request)) + } +} diff --git a/ghost-crab/src/cache/manager.rs b/ghost-crab/src/cache/manager.rs index cc07772..d66a654 100644 --- a/ghost-crab/src/cache/manager.rs +++ b/ghost-crab/src/cache/manager.rs @@ -1,40 +1,37 @@ -use super::rpc_proxy::RpcWithCache; use alloy::providers::ProviderBuilder; use alloy::providers::RootProvider; +use alloy::rpc::client::ClientBuilder; use alloy::transports::http::{Client, Http}; use std::collections::HashMap; +use super::cache::load_cache; +use super::cache_layer::CacheLayer; +use super::cache_layer::CacheService; + +pub type CacheProvider = RootProvider>>; + pub struct RPCManager { - current_port: u16, - rpcs: HashMap>>, + rpcs: HashMap, } impl RPCManager { pub fn new() -> Self { - RPCManager { rpcs: HashMap::new(), current_port: 3001 } + RPCManager { rpcs: HashMap::new() } } - pub async fn get_or_create( - &mut self, - network: String, - rpc_url: String, - ) -> RootProvider> { + pub async fn get_or_create(&mut self, network: String, rpc_url: String) -> CacheProvider { if let Some(provider) = self.rpcs.get(&rpc_url) { return provider.clone(); } - let provider = ProviderBuilder::new() - .on_http(format!("http://localhost:{}", self.current_port).parse().unwrap()); + let cache = load_cache(&network).unwrap(); + let cache_layer = CacheLayer::new(&network, cache); - self.rpcs.insert(rpc_url.clone(), provider.clone()); - let rpc_with_cache = - RpcWithCache::new(&network, rpc_url.clone(), self.current_port).unwrap(); + let client = ClientBuilder::default().layer(cache_layer).http(rpc_url.parse().unwrap()); + let provider = ProviderBuilder::new().on_client(client); - tokio::spawn(async move { - rpc_with_cache.run().await.unwrap(); - }); + self.rpcs.insert(rpc_url.clone(), provider.clone()); - self.current_port += 1; provider } } diff --git a/ghost-crab/src/cache/mod.rs b/ghost-crab/src/cache/mod.rs index 5ff5693..8e26f24 100644 --- a/ghost-crab/src/cache/mod.rs +++ b/ghost-crab/src/cache/mod.rs @@ -1,2 +1,3 @@ +pub mod cache; +pub mod cache_layer; pub mod manager; -pub mod rpc_proxy; diff --git a/ghost-crab/src/cache/rpc_proxy.rs b/ghost-crab/src/cache/rpc_proxy.rs deleted file mode 100644 index 3af23ac..0000000 --- a/ghost-crab/src/cache/rpc_proxy.rs +++ /dev/null @@ -1,152 +0,0 @@ -use blake3; -use bytes::Bytes; -use http_body_util::BodyExt; -use http_body_util::Full; -use hyper::server::conn::http1; -use hyper::service::service_fn; -use hyper::{Request, Response}; -use hyper_tls::HttpsConnector; -use hyper_util::client::legacy::connect::HttpConnector; -use hyper_util::rt::TokioIo; -use hyper_util::rt::TokioTimer; -use hyper_util::{client::legacy::Client, rt::TokioExecutor}; -use rocksdb::DB; -use std::net::SocketAddr; -use std::sync::Arc; -use tokio::net::TcpListener; - -pub struct RpcWithCache { - rpc_url: Arc, - cache: Arc, - port: u16, -} - -#[derive(Debug)] -pub enum Error { - DB(rocksdb::Error), - CacheFileNotFound(std::io::Error), -} - -type Result = core::result::Result; - -fn load_cache(network: &str) -> Result { - let current_dir = std::env::current_dir().map_err(|e| Error::CacheFileNotFound(e))?; - let cache_path = current_dir.join("cache").join(network); - let db = DB::open_default(cache_path).map_err(|e| Error::DB(e))?; - - Ok(db) -} - -impl RpcWithCache { - pub fn new(network: &str, rpc_url: String, port: u16) -> Result { - Ok(Self { rpc_url: Arc::new(rpc_url), cache: Arc::new(load_cache(network)?), port }) - } - - pub async fn run(&self) -> core::result::Result<(), Box> { - let addr: SocketAddr = ([127, 0, 0, 1], self.port).into(); - let listener = TcpListener::bind(addr).await?; - let https = HttpsConnector::new(); - let client = Client::builder(TokioExecutor::new()).build::<_, Full>(https); - - loop { - let (tcp, _) = listener.accept().await?; - let io = TokioIo::new(tcp); - let db = Arc::clone(&self.cache); - let rpc_url = Arc::clone(&self.rpc_url); - let client = client.clone(); - - tokio::task::spawn(async move { - if let Err(err) = http1::Builder::new() - .timer(TokioTimer::new()) - .serve_connection( - io, - service_fn(|request| { - handler(request, Arc::clone(&rpc_url), Arc::clone(&db), client.clone()) - }), - ) - .await - { - println!("Error serving connection: {:?}", err); - } - }); - } - } -} - -fn divide_request_by_id(input: &[u8]) -> Option<(&[u8], &[u8], &[u8])> { - const ID_FIELD: &[u8; 5] = b"\"id\":"; - let id_field_index = input.windows(ID_FIELD.len()).position(|x| x == ID_FIELD)?; - - let value_start = id_field_index + ID_FIELD.len(); - let value_end = input[value_start..].iter().position(|&x| x == b',')?; - - Some(( - &input[..value_start], - &input[value_start..value_start + value_end], - &input[value_start + value_end..], - )) -} - -const INVALID_WORDS: &[&[u8]] = - &[b"eth_blockNumber", b"earliest", b"latest", b"safe", b"finalized", b"pending"]; - -#[inline] -fn contains_invalid_word(input: &[u8]) -> bool { - for search in INVALID_WORDS { - if input.windows(search.len()).any(|x| &x == search) { - return true; - } - } - - false -} - -async fn handler( - request: Request, - rpc_url: Arc, - db: Arc, - client: Client, Full>, -) -> core::result::Result>, hyper::Error> { - let request_received = request.collect().await?.to_bytes(); - - if contains_invalid_word(&request_received) { - let rpc_request = hyper::Request::builder() - .method("POST") - .uri(rpc_url.as_str()) - .header("Content-Type", "application/json") - .body(Full::new(request_received)) - .unwrap(); - - let rpc_response = client.request(rpc_request).await.unwrap().collect().await?.to_bytes(); - return Ok(Response::new(Full::new(rpc_response))); - } - - // Sets the JSON RPC id to zero - let (start, _value, end) = divide_request_by_id(&request_received).unwrap(); - let request_received = Bytes::from([start, b"0", end].concat()); - - let request_hash = blake3::hash(&request_received).to_string(); - - if let Ok(Some(value)) = db.get(&request_hash) { - return Ok(Response::new(Full::new(Bytes::from(value)))); - } - - let rpc_request = hyper::Request::builder() - .method("POST") - .uri(rpc_url.as_str()) - .header("Content-Type", "application/json") - .body(Full::new(request_received)) - .unwrap(); - - let rpc_response = client.request(rpc_request).await.unwrap().collect().await?.to_bytes(); - let rpc_response_string = String::from_utf8_lossy(&rpc_response); - - // Avoid caching errors - if !rpc_response_string.contains(r#""error":{"code":-"#) { - if let Err(err) = db.put(request_hash, rpc_response_string.to_string()) { - println!("WARNING: Error saving value to cache {err}"); - }; - } - - Ok(Response::new(Full::new(rpc_response))) -} diff --git a/ghost-crab/src/event_handler.rs b/ghost-crab/src/event_handler.rs index 73dc273..cb02d6e 100644 --- a/ghost-crab/src/event_handler.rs +++ b/ghost-crab/src/event_handler.rs @@ -1,10 +1,10 @@ +use crate::cache::manager::CacheProvider; use crate::indexer::TemplateManager; use crate::latest_block_manager::LatestBlockManager; use alloy::primitives::Address; -use alloy::providers::{Provider, RootProvider}; +use alloy::providers::Provider; use alloy::rpc::types::eth::Filter; use alloy::rpc::types::eth::Log; -use alloy::transports::http::{Client, Http}; use alloy::transports::TransportError; use async_trait::async_trait; use ghost_crab_common::config::ExecutionMode; @@ -13,7 +13,7 @@ use std::time::Duration; pub struct EventContext { pub log: Log, - pub provider: RootProvider>, + pub provider: CacheProvider, pub templates: TemplateManager, pub contract_address: Address, } @@ -40,7 +40,7 @@ pub struct ProcessEventsInput { pub step: u64, pub handler: EventHandlerInstance, pub templates: TemplateManager, - pub provider: RootProvider>, + pub provider: CacheProvider, } pub async fn process_events( diff --git a/ghost-crab/src/latest_block_manager.rs b/ghost-crab/src/latest_block_manager.rs index 71c65f9..9d1cb1c 100644 --- a/ghost-crab/src/latest_block_manager.rs +++ b/ghost-crab/src/latest_block_manager.rs @@ -1,17 +1,18 @@ -use alloy::providers::{Provider, RootProvider}; -use alloy::transports::http::{Client, Http}; +use alloy::providers::Provider; use alloy::transports::TransportError; use std::time::{Duration, Instant}; +use crate::cache::manager::CacheProvider; + pub struct LatestBlockManager { - provider: RootProvider>, + provider: CacheProvider, cache_duration: Duration, block_number: Option, last_fetch: Instant, } impl LatestBlockManager { - pub fn new(provider: RootProvider>, cache_duration: Duration) -> Self { + pub fn new(provider: CacheProvider, cache_duration: Duration) -> Self { Self { provider, cache_duration, block_number: None, last_fetch: Instant::now() } }