Skip to content

Commit

Permalink
refactor: Add cache layer to the provider and remove the rpc proxy se…
Browse files Browse the repository at this point in the history
…rver
  • Loading branch information
luis-herasme committed Jul 22, 2024
1 parent 4fcb8b5 commit 1076b56
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 183 deletions.
2 changes: 2 additions & 0 deletions ghost-crab/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ alloy = { version = "0.1.0", features = [
"contract",
"provider-http",
"rpc-types-eth",
"json-rpc",
] }
tokio = { version = "1.37.0", features = ["full"] }
dotenvy = "0.15"
Expand All @@ -29,3 +30,4 @@ ghost-crab-common = { path = "../ghost-crab-common", version = "0.2.0" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
rocksdb = "0.22.0"
tower = "0.4.13"
7 changes: 3 additions & 4 deletions ghost-crab/src/block_handler.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use crate::cache::manager::CacheProvider;
use crate::indexer::TemplateManager;
use crate::latest_block_manager::LatestBlockManager;
use alloy::providers::Provider;
use alloy::providers::RootProvider;
use alloy::rpc::types::eth::Block;
use alloy::rpc::types::eth::BlockNumberOrTag;
use alloy::transports::http::{Client, Http};
use alloy::transports::TransportError;
use async_trait::async_trait;
use ghost_crab_common::config::ExecutionMode;
use std::sync::Arc;
use std::time::Duration;

pub struct BlockContext {
pub provider: RootProvider<Http<Client>>,
pub provider: CacheProvider,
pub templates: TemplateManager,
pub block: Block,
}
Expand All @@ -32,7 +31,7 @@ pub trait BlockHandler {
pub struct ProcessBlocksInput {
pub handler: BlockHandlerInstance,
pub templates: TemplateManager,
pub provider: RootProvider<Http<Client>>,
pub provider: CacheProvider,
}

pub async fn process_blocks(
Expand Down
17 changes: 17 additions & 0 deletions ghost-crab/src/cache/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use rocksdb::DB;

#[derive(Debug)]
pub enum Error {
DB(rocksdb::Error),
CacheFileNotFound(std::io::Error),
}

type Result<T> = core::result::Result<T, Error>;

pub fn load_cache(network: &str) -> Result<DB> {
let current_dir = std::env::current_dir().map_err(|e| Error::CacheFileNotFound(e))?;
let cache_path = current_dir.join("cache").join(network);
let db = DB::open_default(cache_path).map_err(|e| Error::DB(e))?;

Ok(db)
}
157 changes: 157 additions & 0 deletions ghost-crab/src/cache/cache_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use alloy::rpc::json_rpc::{
Id, RequestPacket, Response, ResponsePacket, ResponsePayload, SerializedRequest,
};
use alloy::transports::{RpcError, TransportError, TransportErrorKind};
use core::str;
use rocksdb::DB;
use serde_json::value::RawValue;
use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tower::{Layer, Service};

pub struct CacheLayer {
network: String,
db: Arc<DB>,
}

impl CacheLayer {
pub fn new(network: &str, db: DB) -> Self {
Self { network: network.into(), db: Arc::new(db) }
}
}

impl<S> Layer<S> for CacheLayer {
type Service = CacheService<S>;

fn layer(&self, inner: S) -> Self::Service {
CacheService { inner, network: self.network.clone(), db: Arc::clone(&self.db) }
}
}

#[derive(Debug, Clone)]
pub struct CacheService<S> {
inner: S,
network: String,
db: Arc<DB>,
}

impl<S> CacheService<S> {
fn cache_get(&self, key: &str) -> Option<Vec<u8>> {
let key = self.network.clone() + key;

if let Ok(result) = self.db.get(key) {
return result;
}

None
}

fn convert_to_response(
&self,
raw_response: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<ResponsePacket, TransportError>> + Send>> {
let raw_value = String::from_utf8(raw_response).unwrap();
let raw_value = RawValue::from_string(raw_value).unwrap();

let response_payload: ResponsePayload<Box<RawValue>, Box<RawValue>>;
response_payload = ResponsePayload::Success(raw_value);

let response_single = Response { id: Id::Number(0), payload: response_payload };

let response_packet: ResponsePacket;
response_packet = ResponsePacket::Single(response_single);

let response: Result<ResponsePacket, RpcError<TransportErrorKind>>;
response = Ok(response_packet);

return Box::pin(async move {
return response;
});
}
}

const INVALID_WORDS: &[&[u8]] = &[b"earliest", b"latest", b"safe", b"finalized", b"pending"];

#[inline]
fn contains_invalid_word(input: &[u8]) -> bool {
for search in INVALID_WORDS {
if input.windows(search.len()).any(|x| &x == search) {
return true;
}
}

false
}

fn cacheable_request(request: &SerializedRequest) -> bool {
if !matches!(request.method(), "eth_getBlockByNumber") {
return false;
}

let raw_request = request.serialized().get();

if contains_invalid_word(raw_request.as_bytes()) {
return false;
}

return true;
}

impl<S> Service<RequestPacket> for CacheService<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError>,
S::Future: Send + 'static,
S::Response: Send + 'static + Debug,
S::Error: Send + 'static + Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: RequestPacket) -> Self::Future {
if let RequestPacket::Single(single) = &request {
if cacheable_request(single) {
let raw_request = single.serialized().get();

// Set id to zero
let id = single.id();
let id_old = format!("\"id\":{id}");
let id_new = "\"id\":0";
let raw_request = raw_request.replace(&id_old, id_new);

if let Some(raw_data) = self.cache_get(&raw_request) {
return self.convert_to_response(raw_data);
}

let db = Arc::clone(&self.db);
let future = self.inner.call(request);

return Box::pin(async move {
let response = future.await;

if let Ok(response) = &response {
if let ResponsePacket::Single(single) = response {
if let ResponsePayload::Success(payload) = &single.payload {
let raw_response = payload.get();
db.put(raw_request, raw_response).unwrap();
}
}
}

response
});
}
}

Box::pin(self.inner.call(request))
}
}
33 changes: 15 additions & 18 deletions ghost-crab/src/cache/manager.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,37 @@
use super::rpc_proxy::RpcWithCache;
use alloy::providers::ProviderBuilder;
use alloy::providers::RootProvider;
use alloy::rpc::client::ClientBuilder;
use alloy::transports::http::{Client, Http};
use std::collections::HashMap;

use super::cache::load_cache;
use super::cache_layer::CacheLayer;
use super::cache_layer::CacheService;

pub type CacheProvider = RootProvider<CacheService<Http<Client>>>;

pub struct RPCManager {
current_port: u16,
rpcs: HashMap<String, RootProvider<Http<Client>>>,
rpcs: HashMap<String, CacheProvider>,
}

impl RPCManager {
pub fn new() -> Self {
RPCManager { rpcs: HashMap::new(), current_port: 3001 }
RPCManager { rpcs: HashMap::new() }
}

pub async fn get_or_create(
&mut self,
network: String,
rpc_url: String,
) -> RootProvider<Http<Client>> {
pub async fn get_or_create(&mut self, network: String, rpc_url: String) -> CacheProvider {
if let Some(provider) = self.rpcs.get(&rpc_url) {
return provider.clone();
}

let provider = ProviderBuilder::new()
.on_http(format!("http://localhost:{}", self.current_port).parse().unwrap());
let cache = load_cache(&network).unwrap();
let cache_layer = CacheLayer::new(&network, cache);

self.rpcs.insert(rpc_url.clone(), provider.clone());
let rpc_with_cache =
RpcWithCache::new(&network, rpc_url.clone(), self.current_port).unwrap();
let client = ClientBuilder::default().layer(cache_layer).http(rpc_url.parse().unwrap());
let provider = ProviderBuilder::new().on_client(client);

tokio::spawn(async move {
rpc_with_cache.run().await.unwrap();
});
self.rpcs.insert(rpc_url.clone(), provider.clone());

self.current_port += 1;
provider
}
}
3 changes: 2 additions & 1 deletion ghost-crab/src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod cache;
pub mod cache_layer;
pub mod manager;
pub mod rpc_proxy;
Loading

0 comments on commit 1076b56

Please sign in to comment.