diff --git a/Cargo.lock b/Cargo.lock index 953b09c6..576e8eff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -619,6 +619,27 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.65.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.90", +] + [[package]] name = "bindgen" version = "0.70.1" @@ -823,6 +844,17 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "camino" version = "1.1.9" @@ -3449,6 +3481,12 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "leopard-codec" version = "0.1.0" @@ -3519,6 +3557,22 @@ dependencies = [ "libc", ] +[[package]] +name = "librocksdb-sys" +version = "0.11.0+8.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e" +dependencies = [ + "bindgen 0.65.1", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + [[package]] name = "libz-sys" version = "1.1.20" @@ -3568,6 +3622,16 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -4467,6 +4531,12 @@ dependencies = [ "hmac", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "pem" version = "1.1.1" @@ -4859,7 +4929,9 @@ dependencies = [ "prism-common", "prism-errors", "redis", + "rocksdb", "serde", + "tempfile", ] [[package]] @@ -4876,6 +4948,7 @@ dependencies = [ "prism-prover", "prism-storage", "rand", + "tempfile", "tokio", ] @@ -5432,6 +5505,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rocksdb" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "ron" version = "0.8.1" @@ -6478,7 +6561,7 @@ checksum = "a049cdff6b64bc1cd2bebdf494fd314bc4b45eff9058ea69dace55a0fa77e483" dependencies = [ "anyhow", "bincode", - "bindgen", + "bindgen 0.70.1", "cc", "cfg-if", "hex", @@ -8045,3 +8128,13 @@ dependencies = [ "sha3", "subtle", ] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 4d8400fe..61bf8265 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ utoipa-swagger-ui = { version = "3.1", features = ["axum"] } # database redis = "0.24.0" -# rocksdb = { version = "0.21.0", features = ["multi-threaded-cf"] } commented until impl is finished +rocksdb = { version = "0.21.0", features = ["multi-threaded-cf"] } # async async-trait = "0.1.68" @@ -103,6 +103,7 @@ jmt = { git = "https://github.com/deltadevsde/jmt", branch = "rehashing-circuit" "mocks", ] } sha2 = "0.10.8" +tempfile = "3.14.0" auto_impl = "1.2.0" # prism diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 1ebd9133..1f445205 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -21,4 +21,7 @@ jmt = { workspace = true } prism-errors = { workspace = true } prism-common = { workspace = true } auto_impl = { workspace = true } -# rocksdb = { workspace = true } +rocksdb = { workspace = true } + +[dev-dependencies] +tempfile.workspace = true diff --git a/crates/storage/src/rocksdb.rs b/crates/storage/src/rocksdb.rs index 22f458a1..e0d5add8 100644 --- a/crates/storage/src/rocksdb.rs +++ b/crates/storage/src/rocksdb.rs @@ -1,158 +1,258 @@ -// use crate::Database; -// use anyhow::Result; -// use jmt::{ -// storage::{LeafNode, Node, NodeBatch, NodeKey, TreeReader, TreeWriter}, -// KeyHash, OwnedValue, Version, -// }; -// use prism_common::digest::Digest; -// use prism_errors::DatabaseError; -// use rocksdb::{DBWithThreadMode, MultiThreaded, Options, DB}; - -// type RocksDB = DBWithThreadMode; - -// pub struct RocksDBConnection { -// connection: RocksDB, -// path: String, -// } - -// impl RocksDBConnection { -// pub fn new(path: &str) -> Result { -// let db = DB::open_default(path)?; - -// Ok(Self { -// connection: db, -// path: path.to_string(), -// }) -// } -// } - -// impl Database for RocksDBConnection { -// fn get_commitment(&self, epoch: &u64) -> anyhow::Result { -// let key = format!("commitments:epoch_{}", epoch); -// let raw_bytes = self.connection.get(key.as_bytes())?.ok_or_else(|| { -// DatabaseError::NotFoundError(format!("commitment from epoch_{}", epoch)) -// })?; - -// let value: [u8; 32] = -// raw_bytes.try_into().expect("commitment digest should always be 32 bytes"); - -// Ok(Digest(value)) -// } - -// fn set_commitment(&self, epoch: &u64, commitment: &Digest) -> anyhow::Result<()> { -// Ok(self.connection.put::<&[u8], [u8; 32]>( -// format!("commitments:epoch_{}", epoch).as_bytes(), -// commitment.0, -// )?) -// } - -// fn get_last_synced_height(&self) -> anyhow::Result { -// let res = self -// .connection -// .get(b"app_state:sync_height")? -// .ok_or_else(|| DatabaseError::NotFoundError("current sync height".to_string()))?; - -// Ok(u64::from_be_bytes(res.try_into().unwrap())) -// } - -// fn set_last_synced_height(&self, height: &u64) -> anyhow::Result<()> { -// Ok(self.connection.put(b"app_state:sync_height", height.to_be_bytes())?) -// } - -// fn get_epoch(&self) -> anyhow::Result { -// let res = self -// .connection -// .get(b"app_state:epoch")? -// .ok_or_else(|| DatabaseError::NotFoundError("current epoch".to_string()))?; - -// Ok(u64::from_be_bytes(res.try_into().unwrap())) -// } - -// fn set_epoch(&self, epoch: &u64) -> anyhow::Result<()> { -// Ok(self.connection.put(b"app_state:epoch", epoch.to_be_bytes())?) -// } - -// fn flush_database(&self) -> Result<()> { -// Ok(DB::destroy(&Options::default(), &self.path)?) -// } -// } - -// impl TreeWriter for RocksDBConnection { -// fn write_node_batch(&self, _node_batch: &NodeBatch) -> Result<()> { -// todo!() -// } -// } - -// impl TreeReader for RocksDBConnection { -// fn get_node_option(&self, _node_key: &NodeKey) -> Result> { -// todo!() -// } - -// fn get_value_option( -// &self, -// _max_version: Version, -// _key_hash: KeyHash, -// ) -> Result> { -// todo!() -// } - -// fn get_rightmost_leaf(&self) -> Result> { -// todo!() -// } -// } - -// #[cfg(test)] -// mod tests { -// use super::*; -// use tempfile::TempDir; - -// #[test] -// fn test_get_commitment() { -// let temp_dir = TempDir::new().unwrap(); -// let db = RocksDBConnection::new(temp_dir.path().to_str().unwrap()).unwrap(); - -// let epoch = 1; -// let commitment = Digest::from([0u8; 32]); -// db.set_commitment(&epoch, &commitment).unwrap(); - -// let result = db.get_commitment(&epoch).unwrap(); -// assert_eq!(result, commitment); -// } - -// #[test] -// fn test_set_commitment() { -// let temp_dir = TempDir::new().unwrap(); -// let db = RocksDBConnection::new(temp_dir.path().to_str().unwrap()).unwrap(); - -// let epoch = 1; -// let commitment = Digest::from([0u8; 32]); -// db.set_commitment(&epoch, &commitment).unwrap(); - -// let result = db.get_commitment(&epoch).unwrap(); -// assert_eq!(result, commitment); -// } - -// #[test] -// fn test_get_epoch() { -// let temp_dir = TempDir::new().unwrap(); -// let db = RocksDBConnection::new(temp_dir.path().to_str().unwrap()).unwrap(); - -// let epoch = 1; -// db.set_epoch(&epoch).unwrap(); - -// let result = db.get_epoch().unwrap(); -// assert_eq!(result, epoch); -// } - -// #[test] -// fn test_set_epoch() { -// let temp_dir = TempDir::new().unwrap(); -// let db = RocksDBConnection::new(temp_dir.path().to_str().unwrap()).unwrap(); - -// let epoch = 1; -// db.set_epoch(&epoch).unwrap(); - -// let result = db.get_epoch().unwrap(); -// assert_eq!(result, epoch); -// } -// } +use std::sync::Arc; + +use crate::Database; +use anyhow::{anyhow, Result}; +use bincode; +use jmt::{ + storage::{LeafNode, Node, NodeBatch, NodeKey, TreeReader, TreeWriter}, + KeyHash, OwnedValue, Version, +}; +use prism_common::digest::Digest; +use prism_errors::DatabaseError; +use rocksdb::{DBWithThreadMode, MultiThreaded, Options, DB}; + +const KEY_PREFIX_COMMITMENTS: &str = "commitments:epoch_"; +const KEY_PREFIX_NODE: &str = "node:"; +const KEY_PREFIX_VALUE_HISTORY: &str = "value_history:"; + +type RocksDB = DBWithThreadMode; + +#[derive(Clone)] +pub struct RocksDBConnection { + connection: Arc, + path: String, +} + +impl RocksDBConnection { + pub fn new(path: &str) -> Result { + let db = DB::open_default(path)?; + + Ok(Self { + connection: Arc::new(db), + path: path.to_string(), + }) + } +} + +impl Database for RocksDBConnection { + fn get_commitment(&self, epoch: &u64) -> anyhow::Result { + let key = format!("{KEY_PREFIX_COMMITMENTS}{}", epoch); + let raw_bytes = self.connection.get(key.as_bytes())?.ok_or_else(|| { + DatabaseError::NotFoundError(format!("commitment from epoch_{}", epoch)) + })?; + + let value: [u8; 32] = + raw_bytes.try_into().expect("commitment digest should always be 32 bytes"); + + Ok(Digest(value)) + } + + fn set_commitment(&self, epoch: &u64, commitment: &Digest) -> anyhow::Result<()> { + Ok(self.connection.put::<&[u8], [u8; 32]>( + format!("{KEY_PREFIX_COMMITMENTS}{}", epoch).as_bytes(), + commitment.0, + )?) + } + + fn get_last_synced_height(&self) -> anyhow::Result { + let res = self + .connection + .get(b"app_state:sync_height")? + .ok_or_else(|| DatabaseError::NotFoundError("current sync height".to_string()))?; + + Ok(u64::from_be_bytes(res.try_into().map_err(|e| { + anyhow!("failed byte conversion from BigEndian to u64: {:?}", e) + })?)) + } + + fn set_last_synced_height(&self, height: &u64) -> anyhow::Result<()> { + Ok(self.connection.put(b"app_state:sync_height", height.to_be_bytes())?) + } + + fn get_epoch(&self) -> anyhow::Result { + let res = self + .connection + .get(b"app_state:epoch")? + .ok_or_else(|| DatabaseError::NotFoundError("current epoch".to_string()))?; + + Ok(u64::from_be_bytes(res.try_into().map_err(|e| { + anyhow!("failed byte conversion from BigEndian to u64: {:?}", e) + })?)) + } + + fn set_epoch(&self, epoch: &u64) -> anyhow::Result<()> { + Ok(self.connection.put(b"app_state:epoch", epoch.to_be_bytes())?) + } + + fn flush_database(&self) -> Result<()> { + Ok(DB::destroy(&Options::default(), &self.path)?) + } +} + +impl TreeReader for RocksDBConnection { + fn get_node_option(&self, node_key: &NodeKey) -> Result> { + let key = format!( + "{KEY_PREFIX_NODE}{}", + hex::encode(bincode::serialize(node_key)?) + ); + let value = self.connection.get(key.as_bytes())?; + + match value { + Some(data) => Ok(Some(bincode::deserialize(&data)?)), + None => Ok(None), + } + } + + fn get_value_option( + &self, + max_version: Version, + key_hash: KeyHash, + ) -> Result> { + let value_key = format!("{KEY_PREFIX_VALUE_HISTORY}{}", hex::encode(key_hash.0)); + let iter = self.connection.prefix_iterator(value_key.as_bytes()); + + let mut latest_value = None; + let mut latest_version = 0; + + for item in iter { + let (key, value) = item?; + let version: Version = + bincode::deserialize(&hex::decode(&key[value_key.len() + 1..])?)?; + + if version <= max_version && version > latest_version { + latest_version = version; + latest_value = Some(value); + } + } + + Ok(latest_value.map(|v| bincode::deserialize(&v)).transpose()?) + } + + fn get_rightmost_leaf(&self) -> Result> { + let mut iter = self.connection.iterator(rocksdb::IteratorMode::End); + + while let Some(Ok((key, value))) = iter.next() { + if key.starts_with(KEY_PREFIX_NODE.as_bytes()) { + let node: Node = bincode::deserialize(&value)?; + if let Node::Leaf(leaf) = node { + let node_key: NodeKey = + bincode::deserialize(&hex::decode(&key[KEY_PREFIX_NODE.len()..])?)?; + return Ok(Some((node_key, leaf))); + } + } + } + + Ok(None) + } +} + +impl TreeWriter for RocksDBConnection { + fn write_node_batch(&self, node_batch: &NodeBatch) -> Result<()> { + let mut batch = rocksdb::WriteBatch::default(); + + for (node_key, node) in node_batch.nodes() { + let key = format!( + "{KEY_PREFIX_NODE}{}", + hex::encode(bincode::serialize(node_key)?) + ); + let value = bincode::serialize(node)?; + batch.put(key.as_bytes(), &value); + } + + for ((version, key_hash), value) in node_batch.values() { + let value_key = format!("{KEY_PREFIX_VALUE_HISTORY}{}", hex::encode(key_hash.0)); + let version_key = format!( + "{}:{}", + value_key, + hex::encode(bincode::serialize(version)?) + ); + + if let Some(v) = value { + let serialized_value = bincode::serialize(v)?; + batch.put(version_key.as_bytes(), &serialized_value); + } else { + batch.delete(version_key.as_bytes()); + } + } + + self.connection.write(batch)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use jmt::{KeyHash, OwnedValue, Version}; + use tempfile::TempDir; + + fn setup_db() -> (TempDir, RocksDBConnection) { + let temp_dir = TempDir::new().unwrap(); + let db = RocksDBConnection::new(temp_dir.path().to_str().unwrap()).unwrap(); + (temp_dir, db) + } + + #[test] + fn test_rw_commitment() { + let (_temp_dir, db) = setup_db(); + + let epoch = 1; + let commitment = Digest([1; 32]); + + db.set_commitment(&epoch, &commitment).unwrap(); + let read_commitment = db.get_commitment(&epoch).unwrap(); + + assert_eq!(read_commitment, commitment); + } + + #[test] + fn test_rw_epoch() { + let (_temp_dir, db) = setup_db(); + + let epoch = 1; + + db.set_epoch(&epoch).unwrap(); + let read_epoch = db.get_epoch().unwrap(); + + assert_eq!(read_epoch, epoch); + } + + #[test] + fn test_write_and_read_value() { + let (_temp_dir, db) = setup_db(); + + let key_hash = KeyHash([1; 32]); + let value: OwnedValue = vec![4, 5, 6]; + let version: Version = 1; + + let mut batch = NodeBatch::default(); + batch.insert_value(version, key_hash, value.clone()); + + db.write_node_batch(&batch).unwrap(); + + let read_value = db.get_value_option(version, key_hash).unwrap(); + assert_eq!(read_value, Some(value)); + } + + #[test] + fn test_get_value_option_with_multiple_versions() { + let (_temp_dir, db) = setup_db(); + + let key_hash = KeyHash([2; 32]); + let value1: OwnedValue = vec![1, 1, 1]; + let value2: OwnedValue = vec![2, 2, 2]; + + let mut batch = NodeBatch::default(); + batch.insert_value(1, key_hash, value1.clone()); + batch.insert_value(2, key_hash, value2.clone()); + + db.write_node_batch(&batch).unwrap(); + + assert_eq!(db.get_value_option(1, key_hash).unwrap(), Some(value1)); + assert_eq!( + db.get_value_option(2, key_hash).unwrap(), + Some(value2.clone()) + ); + assert_eq!(db.get_value_option(3, key_hash).unwrap(), Some(value2)); + } +} diff --git a/crates/tests/Cargo.toml b/crates/tests/Cargo.toml index 19030551..01e1cd9b 100644 --- a/crates/tests/Cargo.toml +++ b/crates/tests/Cargo.toml @@ -14,14 +14,15 @@ test_utils = [] mock_prover = [] [dependencies] -log = { workspace = true } -pretty_env_logger = { workspace = true } -anyhow = { workspace = true } -keystore-rs = { workspace = true } +log.workspace = true +pretty_env_logger.workspace = true +anyhow.workspace = true +keystore-rs.workspace = true prism-common = { workspace = true, features = ["test_utils"] } -prism-storage = { workspace = true } +prism-storage.workspace = true prism-prover = { workspace = true, features = ["mock_prover"] } -prism-lightclient = { workspace = true } -prism-da = { workspace = true } -rand = { workspace = true } -tokio = { workspace = true } +prism-lightclient.workspace = true +prism-da.workspace = true +rand.workspace = true +tokio.workspace = true +tempfile.workspace = true diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs index f79423f5..88d58c1d 100644 --- a/crates/tests/src/lib.rs +++ b/crates/tests/src/lib.rs @@ -12,15 +12,19 @@ use prism_da::{ }; use prism_lightclient::LightClient; use prism_prover::Prover; -use prism_storage::{inmemory::InMemoryDatabase, Database}; +use prism_storage::{rocksdb::RocksDBConnection, Database}; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::sync::Arc; use tokio::{spawn, time::Duration}; use prism_common::test_utils::TestTreeState; +use tempfile::TempDir; + fn setup_db() -> Arc> { - Arc::new(Box::new(InMemoryDatabase::new()) as Box) + let temp_dir = TempDir::new().unwrap(); + let db = RocksDBConnection::new(temp_dir.path().to_str().unwrap()).unwrap(); + Arc::new(Box::new(db) as Box) } #[tokio::test]