From d5bbf9ad82be8fd2c81b059710a96bfbe80f449f Mon Sep 17 00:00:00 2001 From: David Estes <5317198+dav1do@users.noreply.github.com> Date: Sat, 3 Feb 2024 10:16:08 -0700 Subject: [PATCH] fix: refactor recon storage and remove sqlx dependency from recon and core crates (#254) * fix: remove sqlx dependency from recon and core crates Modified the store implementations to be interest/model specific as they have different requirements. Interests don't have values, so those requests no-op (they error if you specify a value other than []). Removed the sort_key column from the tables as its purpose was interest/model differentiation. Renamed the tables accordingly and adjusted the queries. * fix: remove sort_key from model_block table as well * fix: Make model_block PK (cid,key) it's (probably) possible to have keys point to the same CID, so make sure they are different rows * fix: interest range_with_values query was not updated In current feed PR, the API was modified to use range instead of range_with_values in this case, but it needed to be fixed here too * chore: review clean up * fix: interest store always has a value for a key, not never --------- Co-authored-by: David Estes --- Cargo.lock | 75 +- core/Cargo.toml | 1 - core/src/lib.rs | 2 - one/src/events.rs | 2 +- one/src/lib.rs | 15 +- p2p/Cargo.toml | 1 + p2p/src/node.rs | 7 +- p2p/src/sqliteblockstore.rs | 4 +- recon/Cargo.toml | 1 - recon/src/lib.rs | 6 +- recon/src/recon.rs | 11 +- recon/src/recon/tests.rs | 18 +- store/src/lib.rs | 709 +--------- .../src/sqlite/interest.rs | 470 +++---- core/src/sql.rs => store/src/sqlite/mod.rs | 6 + store/src/sqlite/model.rs | 1185 +++++++++++++++++ store/src/tests.rs | 515 +------ 17 files changed, 1494 insertions(+), 1534 deletions(-) rename recon/src/recon/sqlitestore.rs => store/src/sqlite/interest.rs (56%) rename core/src/sql.rs => store/src/sqlite/mod.rs (96%) create mode 100644 store/src/sqlite/model.rs diff --git a/Cargo.lock b/Cargo.lock index 1593813dd..5ae33c9c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -552,6 +552,16 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atomic-write-file" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edcdbedc2236483ab103a53415653d6b4442ea6141baf1ffa85df29635e88436" +dependencies = [ + "nix 0.27.1", + "rand 0.8.5", +] + [[package]] name = "attohttpc" version = "0.24.1" @@ -1115,7 +1125,6 @@ dependencies = [ "serde_bytes", "serde_ipld_dagcbor", "serde_json", - "sqlx", "ssi", "test-log", "tokio", @@ -4901,9 +4910,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326" +checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" dependencies = [ "cc", "pkg-config", @@ -5424,6 +5433,17 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "libc", +] + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -6607,7 +6627,6 @@ dependencies = [ "serde", "serde_cbor", "serde_json", - "sqlx", "test-log", "tokio", "tokio-stream", @@ -6842,9 +6861,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.3" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ef35bf3e7fe15a53c4ab08a998e42271eab13eb0db224126bc7bc4c4bad96d" +checksum = "af6c4b23d99685a1408194da11270ef8e9809aff951cc70ec9b17350b087e474" dependencies = [ "const-oid 0.9.5", "digest 0.10.7", @@ -7595,20 +7614,20 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" +checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" dependencies = [ - "itertools 0.11.0", + "itertools 0.12.0", "nom", "unicode_categories", ] [[package]] name = "sqlx" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e50c216e3624ec8e7ecd14c6a6a6370aad6ee5d8cfc3ab30b5162eeeef2ed33" +checksum = "dba03c279da73694ef99763320dea58b51095dfe87d001b1d4b5fe78ba8763cf" dependencies = [ "sqlx-core", "sqlx-macros", @@ -7619,9 +7638,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d6753e460c998bbd4cd8c6f0ed9a64346fcca0723d6e75e52fdc351c5d2169d" +checksum = "d84b0a3c3739e220d94b3239fd69fb1f74bc36e16643423bd99de3b43c21bfbd" dependencies = [ "ahash 0.8.6", "atoi", @@ -7659,9 +7678,9 @@ dependencies = [ [[package]] name = "sqlx-macros" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a793bb3ba331ec8359c1853bd39eed32cdd7baaf22c35ccf5c92a7e8d1189ec" +checksum = "89961c00dc4d7dffb7aee214964b065072bff69e36ddb9e2c107541f75e4f2a5" dependencies = [ "proc-macro2", "quote", @@ -7672,10 +7691,11 @@ dependencies = [ [[package]] name = "sqlx-macros-core" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a4ee1e104e00dedb6aa5ffdd1343107b0a4702e862a84320ee7cc74782d96fc" +checksum = "d0bd4519486723648186a08785143599760f7cc81c52334a55d6a83ea1e20841" dependencies = [ + "atomic-write-file", "dotenvy", "either", "heck", @@ -7697,9 +7717,9 @@ dependencies = [ [[package]] name = "sqlx-mysql" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" +checksum = "e37195395df71fd068f6e2082247891bc11e3289624bbc776a0cdfa1ca7f1ea4" dependencies = [ "atoi", "base64 0.21.5", @@ -7725,7 +7745,7 @@ dependencies = [ "once_cell", "percent-encoding 2.3.0", "rand 0.8.5", - "rsa 0.9.3", + "rsa 0.9.5", "serde", "sha1", "sha2 0.10.8", @@ -7739,9 +7759,9 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" +checksum = "d6ac0ac3b7ccd10cc96c7ab29791a7dd236bd94021f31eec7ba3d46a74aa1c24" dependencies = [ "atoi", "base64 0.21.5", @@ -7778,9 +7798,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59dc83cf45d89c555a577694534fcd1b55c545a816c816ce51f20bbe56a4f3f" +checksum = "210976b7d948c7ba9fced8ca835b11cbb2d677c59c79de41ac0d397e14547490" dependencies = [ "atoi", "flume 0.11.0", @@ -7796,6 +7816,7 @@ dependencies = [ "sqlx-core", "tracing", "url", + "urlencoding", ] [[package]] @@ -9046,6 +9067,12 @@ dependencies = [ "percent-encoding 2.3.0", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8-decode" version = "1.0.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index ce663c222..7aba33249 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -26,7 +26,6 @@ serde_bytes.workspace = true serde_ipld_dagcbor.workspace = true serde_json.workspace = true ssi.workspace = true -sqlx.workspace = true unsigned-varint.workspace = true [dev-dependencies] diff --git a/core/src/lib.rs b/core/src/lib.rs index 3dc15f62d..3680a72c0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,7 +9,6 @@ mod jws; mod network; mod range; mod signer; -mod sql; mod stream_id; pub use bytes::Bytes; @@ -20,7 +19,6 @@ pub use jws::{Jws, JwsSignature}; pub use network::Network; pub use range::RangeOpen; pub use signer::{JwkSigner, Signer}; -pub use sql::{DbTx, SqlitePool}; pub use stream_id::{StreamId, StreamIdType}; pub use cid::Cid; diff --git a/one/src/events.rs b/one/src/events.rs index ee4b66fe4..0c37998d1 100644 --- a/one/src/events.rs +++ b/one/src/events.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use ceramic_core::SqlitePool; use ceramic_p2p::SQLiteBlockStore; +use ceramic_store::SqlitePool; use chrono::{SecondsFormat, Utc}; use cid::{multibase, multihash, Cid}; use clap::{Args, Subcommand}; diff --git a/one/src/lib.rs b/one/src/lib.rs index dc064696e..46576bf4d 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -9,7 +9,7 @@ mod network; use std::{env, num::NonZeroUsize, path::PathBuf, time::Duration}; use anyhow::{anyhow, Result}; -use ceramic_core::{EventId, Interest, PeerId, SqlitePool}; +use ceramic_core::{EventId, Interest, PeerId}; use ceramic_kubo_rpc::Multiaddr; use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; @@ -18,10 +18,7 @@ use clap::{Args, Parser, Subcommand, ValueEnum}; use futures::StreamExt; use multibase::Base; use multihash::{Code, Hasher, Multihash, MultihashDigest}; -use recon::{ - FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a, - StoreMetricsMiddleware, -}; +use recon::{FullInterests, Recon, ReconInterestProvider, Server, Sha256a, StoreMetricsMiddleware}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext}; @@ -283,12 +280,12 @@ pub async fn run() -> Result<()> { } } -type InterestStore = SQLiteStore; +type InterestStore = ceramic_store::InterestStore; type InterestInterest = FullInterests; type ReconInterest = Server, InterestInterest>; -type ModelStore = ceramic_store::Store; +type ModelStore = ceramic_store::ModelStore; type ModelInterest = ReconInterestProvider; type ReconModel = Server, ModelInterest>; @@ -424,14 +421,14 @@ impl Daemon { // Connect to sqlite let sql_db_path: PathBuf = dir.join("db.sqlite3"); - let sql_pool = SqlitePool::connect(&sql_db_path).await?; + let sql_pool = ceramic_store::SqlitePool::connect(&sql_db_path).await?; // Create recon metrics let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register); // Create recon store for interests. let interest_store = StoreMetricsMiddleware::new( - InterestStore::new(sql_pool.clone(), "interest".to_string()).await?, + InterestStore::new(sql_pool.clone()).await?, recon_metrics.clone(), ); diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 9e389d137..46fdd6646 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -17,6 +17,7 @@ backoff.workspace = true bytes.workspace = true ceramic-core.workspace = true ceramic-metrics.workspace = true +ceramic-store.workspace = true cid.workspace = true expect-test.workspace = true futures-util.workspace = true diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 56c21af27..11ae421b5 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -1163,7 +1163,8 @@ mod tests { use crate::keys::Keypair; use async_trait::async_trait; - use ceramic_core::{RangeOpen, SqlitePool}; + use ceramic_core::RangeOpen; + use ceramic_store::SqlitePool; use futures::TryStreamExt; use rand::prelude::*; use rand_chacha::ChaCha8Rng; @@ -1389,7 +1390,7 @@ mod tests { rpc_server_addr, keypair.into(), None::<(DummyRecon, DummyRecon)>, - ceramic_store::Store::new(sql_pool).await?, + ceramic_store::ModelStore::new(sql_pool).await?, metrics, ) .await?; @@ -1421,7 +1422,7 @@ mod tests { .await .context("timed out before getting a listening address for the node")??; let mut dial_addr = addr.clone(); - dial_addr.push(Protocol::P2p(peer_id.into())); + dial_addr.push(Protocol::P2p(peer_id)); Ok(TestRunner { task, client, diff --git a/p2p/src/sqliteblockstore.rs b/p2p/src/sqliteblockstore.rs index c01bc0c1c..fa2be98ad 100644 --- a/p2p/src/sqliteblockstore.rs +++ b/p2p/src/sqliteblockstore.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use bytes::Bytes; -use ceramic_core::SqlitePool; +use ceramic_store::SqlitePool; use cid::{ multihash::Code::{Keccak256, Sha2_256}, multihash::MultihashDigest, @@ -226,7 +226,7 @@ mod tests { use crate::SQLiteBlockStore; use anyhow::Error; use bytes::Bytes; - use ceramic_core::SqlitePool; + use ceramic_store::SqlitePool; use cid::{Cid, CidGeneric}; use expect_test::expect; use iroh_bitswap::Store; diff --git a/recon/Cargo.toml b/recon/Cargo.toml index 8a63ac6a7..d3a585aa8 100644 --- a/recon/Cargo.toml +++ b/recon/Cargo.toml @@ -22,7 +22,6 @@ multihash.workspace = true prometheus-client.workspace = true serde.workspace = true serde_json.workspace = true -sqlx.workspace = true tokio.workspace = true tracing.workspace = true void.workspace = true diff --git a/recon/src/lib.rs b/recon/src/lib.rs index 5a0591905..9067c8bcc 100644 --- a/recon/src/lib.rs +++ b/recon/src/lib.rs @@ -5,9 +5,9 @@ pub use crate::{ client::{Client, Server}, metrics::Metrics, recon::{ - btreestore::BTreeStore, sqlitestore::SQLiteStore, store_metrics::StoreMetricsMiddleware, - AssociativeHash, FullInterests, HashCount, InsertResult, InterestProvider, Key, Range, - Recon, ReconInterestProvider, ReconItem, Store, SyncState, + btreestore::BTreeStore, store_metrics::StoreMetricsMiddleware, AssociativeHash, + FullInterests, HashCount, InsertResult, InterestProvider, Key, Range, Recon, + ReconInterestProvider, ReconItem, Store, SyncState, }, sha256a::Sha256a, }; diff --git a/recon/src/recon.rs b/recon/src/recon.rs index 2b92efd14..431e1ff7b 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -1,5 +1,4 @@ pub mod btreestore; -pub mod sqlitestore; pub mod store_metrics; #[cfg(test)] pub mod tests; @@ -348,6 +347,16 @@ impl HashCount { pub fn new(hash: H, count: u64) -> Self { Self { hash, count } } + + /// The hash of the values. + pub fn hash(&self) -> &H { + &self.hash + } + + /// The number of values that produced the hash. + pub fn count(&self) -> u64 { + self.count + } } impl std::fmt::Debug for HashCount diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index 486def856..d74b8d886 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -58,19 +58,17 @@ impl std::fmt::Debug for MemoryAHash { .field("ahash", &self.ahash) .field("set", &self.set) .finish() + } else if self.is_zero() { + write!(f, "0") } else { - if self.is_zero() { - write!(f, "0") - } else { - write!(f, "h(")?; - for (i, key) in self.set.iter().enumerate() { - if i != 0 { - write!(f, ", ")?; - } - write!(f, "{key}")?; + write!(f, "h(")?; + for (i, key) in self.set.iter().enumerate() { + if i != 0 { + write!(f, ", ")?; } - write!(f, ")") + write!(f, "{key}")?; } + write!(f, ")") } } } diff --git a/store/src/lib.rs b/store/src/lib.rs index d8145c3dd..f72a278ec 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -2,713 +2,8 @@ //! This unified implementation allows for exposing Recon values as IPFS blocks #![warn(missing_docs)] +mod sqlite; #[cfg(test)] mod tests; -use std::collections::BTreeSet; - -use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use bytes::Bytes; -use ceramic_core::{DbTx, EventId, RangeOpen, SqlitePool}; -use cid::Cid; -use iroh_bitswap::Block; -use iroh_car::{CarHeader, CarReader, CarWriter}; -use itertools::{process_results, Itertools}; -use multihash::{Code, MultihashDigest}; -use recon::{AssociativeHash, HashCount, InsertResult, Key, ReconItem, Sha256a}; -use sqlx::Row; -use tracing::instrument; - -const SORT_KEY: &str = "model"; - -/// Unified implementation of [`recon::Store`] and [`iroh_bitswap::Store`] that can expose the -/// individual blocks from the CAR files directly. -#[derive(Clone, Debug)] -pub struct Store { - pool: SqlitePool, -} - -#[derive(Debug)] -struct BlockRow { - cid: Cid, - root: bool, - bytes: Vec, -} - -impl Store { - /// Create an instance of the store initializing any neccessary tables. - pub async fn new(pool: SqlitePool) -> Result { - let mut store = Store { pool }; - store.create_table_if_not_exists().await?; - Ok(store) - } - - /// Initialize the recon table. - async fn create_table_if_not_exists(&mut self) -> Result<()> { - // Do we want to remove CID from the table? - const CREATE_STORE_KEY_TABLE: &str = "CREATE TABLE IF NOT EXISTS store_key ( - sort_key TEXT, -- the field in the event header to sort by e.g. model - key BLOB, -- network_id sort_value controller StreamID height event_cid - ahash_0 INTEGER, -- the ahash is decomposed as [u32; 8] - ahash_1 INTEGER, - ahash_2 INTEGER, - ahash_3 INTEGER, - ahash_4 INTEGER, - ahash_5 INTEGER, - ahash_6 INTEGER, - ahash_7 INTEGER, - CID TEXT, - value_retrieved BOOL, -- indicates if we have the value - PRIMARY KEY(sort_key, key) - )"; - const CREATE_VALUE_RETRIEVED_INDEX: &str = - "CREATE INDEX IF NOT EXISTS idx_key_value_retrieved - ON store_key (sort_key, key, value_retrieved)"; - - const CREATE_STORE_BLOCK_TABLE: &str = "CREATE TABLE IF NOT EXISTS store_block ( - sort_key TEXT, -- the field in the event header to sort by e.g. model - key BLOB, -- network_id sort_value controller StreamID height event_cid - idx INTEGER, -- the index of the block in the CAR file - root BOOL, -- when true the block is a root in the CAR file - cid BLOB, -- the cid of the Block as bytes no 0x00 prefix - bytes BLOB, -- the Block - PRIMARY KEY(cid) - )"; - // TODO should this include idx or not? - const CREATE_BLOCK_ORDER_INDEX: &str = "CREATE INDEX IF NOT EXISTS idx_block_idx - ON store_block (sort_key, key)"; - - let mut tx = self.pool.tx().await?; - sqlx::query(CREATE_STORE_KEY_TABLE) - .execute(&mut *tx) - .await?; - sqlx::query(CREATE_VALUE_RETRIEVED_INDEX) - .execute(&mut *tx) - .await?; - sqlx::query(CREATE_STORE_BLOCK_TABLE) - .execute(&mut *tx) - .await?; - sqlx::query(CREATE_BLOCK_ORDER_INDEX) - .execute(&mut *tx) - .await?; - - tx.commit().await?; - Ok(()) - } - /// returns (new_key, new_val) tuple - async fn insert_item_int( - &mut self, - item: &ReconItem<'_, EventId>, - conn: &mut DbTx<'_>, - ) -> Result<(bool, bool)> { - // we insert the value first as it's possible we already have the key and can skip that step - // as it happens in a transaction, we'll roll back the value insert if the key insert fails and try again - if let Some(val) = item.value { - // Update the value_retrieved flag, and report if the key already exists. - let key_exists = self.update_value_retrieved_int(item.key, conn).await?; - - // Put each block from the car file - let mut reader = CarReader::new(val).await?; - let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); - let mut idx = 0; - while let Some((cid, data)) = reader.next_block().await? { - self.insert_block_int(item.key, idx, roots.contains(&cid), cid, &data.into(), conn) - .await?; - idx += 1; - } - - if key_exists { - return Ok((false, true)); - } - } - let new_key = self - .insert_key_int(item.key, item.value.is_some(), conn) - .await?; - Ok((new_key, item.value.is_some())) - } - - // set value_retrieved to true and return if the key already exists - async fn update_value_retrieved_int( - &mut self, - key: &EventId, - conn: &mut DbTx<'_>, - ) -> Result { - let update = sqlx::query( - "UPDATE store_key SET value_retrieved = true WHERE sort_key = ? AND key = ?", - ); - let resp = update - .bind(SORT_KEY) - .bind(key.as_bytes()) - .execute(&mut **conn) - .await?; - let rows_affected = resp.rows_affected(); - debug_assert!(rows_affected <= 1); - Ok(rows_affected == 1) - } - - // store a block in the db. - async fn insert_block_int( - &self, - key: &EventId, - idx: i32, - root: bool, - cid: Cid, - blob: &Bytes, - conn: &mut DbTx<'_>, - ) -> Result<()> { - let hash = match cid.hash().code() { - 0x12 => Code::Sha2_256.digest(blob), - 0x1b => Code::Keccak256.digest(blob), - 0x11 => return Err(anyhow!("Sha1 not supported")), - _ => { - return Err(anyhow!( - "multihash type {:#x} not Sha2_256, Keccak256", - cid.hash().code(), - )) - } - }; - if cid.hash().to_bytes() != hash.to_bytes() { - return Err(anyhow!( - "cid did not match blob {} != {}", - hex::encode(cid.hash().to_bytes()), - hex::encode(hash.to_bytes()) - )); - } - - sqlx::query( - "INSERT INTO store_block (sort_key, key, idx, root, cid, bytes) VALUES (?, ?, ?, ?, ?, ?)", - ) - .bind(SORT_KEY) - .bind(key.as_bytes()) - .bind(idx) - .bind(root) - .bind(cid.to_bytes()) - .bind(blob.to_vec()) - .execute(&mut **conn) - .await?; - Ok(()) - } - - async fn insert_key_int( - &mut self, - key: &EventId, - has_value: bool, - conn: &mut DbTx<'_>, - ) -> Result { - let key_insert = sqlx::query( - "INSERT INTO store_key ( - sort_key, key, - ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7, - value_retrieved - ) VALUES ( - ?, ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ? - );", - ); - - let hash = Sha256a::digest(key); - let resp = key_insert - .bind(SORT_KEY) - .bind(key.as_bytes()) - .bind(hash.as_u32s()[0]) - .bind(hash.as_u32s()[1]) - .bind(hash.as_u32s()[2]) - .bind(hash.as_u32s()[3]) - .bind(hash.as_u32s()[4]) - .bind(hash.as_u32s()[5]) - .bind(hash.as_u32s()[6]) - .bind(hash.as_u32s()[7]) - .bind(has_value) - .execute(&mut **conn) - .await; - match resp { - std::result::Result::Ok(_rows) => Ok(true), - Err(sqlx::Error::Database(err)) => { - if err.is_unique_violation() { - Ok(false) - } else { - Err(sqlx::Error::Database(err).into()) - } - } - Err(err) => Err(err.into()), - } - } - - async fn rebuild_car(&mut self, blocks: Vec) -> Result>> { - if blocks.is_empty() { - return Ok(None); - } - - let size = blocks.iter().fold(0, |sum, row| sum + row.bytes.len()); - let roots: Vec = blocks - .iter() - .filter(|row| row.root) - .map(|row| row.cid) - .collect(); - // Reconstruct the car file - // TODO figure out a better capacity calculation - let mut car = Vec::with_capacity(size + 100 * blocks.len()); - let mut writer = CarWriter::new(CarHeader::V1(roots.into()), &mut car); - for BlockRow { - cid, - bytes, - root: _, - } in blocks - { - writer.write(cid, bytes).await?; - } - writer.finish().await?; - Ok(Some(car)) - } -} - -#[async_trait] -impl recon::Store for Store { - type Key = EventId; - type Hash = Sha256a; - - /// Returns true if the key was new. The value is always updated if included - async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { - let mut tx = self.pool.writer().begin().await?; - let (new_key, _new_val) = self.insert_item_int(&item, &mut tx).await?; - tx.commit().await?; - Ok(new_key) - } - - /// Insert new keys into the key space. - /// Returns true if a key did not previously exist. - async fn insert_many<'a, I>(&mut self, items: I) -> Result - where - I: ExactSizeIterator> + Send + Sync, - { - match items.len() { - 0 => Ok(InsertResult::new(vec![], 0)), - _ => { - let mut results = vec![false; items.len()]; - let mut new_val_cnt = 0; - let mut tx = self.pool.writer().begin().await?; - - for (idx, item) in items.enumerate() { - let (new_key, new_val) = self.insert_item_int(&item, &mut tx).await?; - results[idx] = new_key; - if new_val { - new_val_cnt += 1; - } - } - tx.commit().await?; - Ok(InsertResult::new(results, new_val_cnt)) - } - } - } - - /// return the hash and count for a range - #[instrument(skip(self))] - async fn hash_range( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - if left_fencepost >= right_fencepost { - return Ok(HashCount::new(Sha256a::identity(), 0)); - } - - let query = sqlx::query( - "SELECT - TOTAL(ahash_0) & 0xFFFFFFFF, TOTAL(ahash_1) & 0xFFFFFFFF, - TOTAL(ahash_2) & 0xFFFFFFFF, TOTAL(ahash_3) & 0xFFFFFFFF, - TOTAL(ahash_4) & 0xFFFFFFFF, TOTAL(ahash_5) & 0xFFFFFFFF, - TOTAL(ahash_6) & 0xFFFFFFFF, TOTAL(ahash_7) & 0xFFFFFFFF, - COUNT(1) - FROM store_key WHERE sort_key = ? AND key > ? AND key < ?;", - ); - let row = query - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_one(self.pool.reader()) - .await?; - let bytes: [u32; 8] = [ - row.get(0), - row.get(1), - row.get(2), - row.get(3), - row.get(4), - row.get(5), - row.get(6), - row.get(7), - ]; - let count: i64 = row.get(8); // sql int type is signed - let count: u64 = count - .try_into() - .expect("COUNT(1) should never return a negative number"); - Ok(HashCount::new(Sha256a::from(bytes), count)) - } - - #[instrument(skip(self))] - async fn range( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - offset: usize, - limit: usize, - ) -> Result + Send + 'static>> { - let query = sqlx::query( - " - SELECT - key - FROM - store_key - WHERE - sort_key = ? AND - key > ? AND key < ? - ORDER BY - key ASC - LIMIT - ? - OFFSET - ?; - ", - ); - let rows = query - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .bind(limit as i64) - .bind(offset as i64) - .fetch_all(self.pool.reader()) - .await?; - //debug!(count = rows.len(), "rows"); - Ok(Box::new(rows.into_iter().map(|row| { - let bytes: Vec = row.get(0); - EventId::from(bytes) - }))) - } - #[instrument(skip(self))] - async fn range_with_values( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - let query = sqlx::query( - " - SELECT - store_block.key, store_block.cid, store_block.root, store_block.bytes - FROM ( - SELECT - key - FROM store_key - WHERE - sort_key = ? - AND key > ? AND key < ? - AND value_retrieved = true - ORDER BY - key ASC - LIMIT - ? - OFFSET - ? - ) key - JOIN - store_block - ON - key.key = store_block.key - ;", - ); - let all_blocks = query - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .bind(limit as i64) - .bind(offset as i64) - .fetch_all(self.pool.reader()) - .await?; - - // Consume all block into groups of blocks by their key. - let all_blocks: Vec<(Self::Key, Vec)> = process_results( - all_blocks.into_iter().map(|row| { - Cid::read_bytes(row.get::<&[u8], _>(1)) - .map_err(anyhow::Error::from) - .map(|cid| { - ( - Self::Key::from(row.get::, _>(0)), - cid, - row.get(2), - row.get(3), - ) - }) - }), - |blocks| { - blocks - .group_by(|(key, _, _, _)| key.clone()) - .into_iter() - .map(|(key, group)| { - ( - key, - group - .map(|(_key, cid, root, bytes)| BlockRow { cid, root, bytes }) - .collect::>(), - ) - }) - .collect() - }, - )?; - - let mut values: Vec<(Self::Key, Vec)> = Vec::new(); - for (key, blocks) in all_blocks { - if let Some(value) = self.rebuild_car(blocks).await? { - values.push((key.clone(), value)); - } - } - Ok(Box::new(values.into_iter())) - } - /// Return the number of keys within the range. - #[instrument(skip(self))] - async fn count( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result { - let query = sqlx::query( - " - SELECT - count(key) - FROM - store_key - WHERE - sort_key = ? AND - key > ? AND key < ? - ;", - ); - let row = query - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_one(self.pool.reader()) - .await?; - Ok(row.get::<'_, i64, _>(0) as usize) - } - - /// Return the first key within the range. - #[instrument(skip(self))] - async fn first( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - let query = sqlx::query( - " - SELECT - key - FROM - store_key - WHERE - sort_key = ? AND - key > ? AND key < ? - ORDER BY - key ASC - LIMIT - 1 - ; ", - ); - let rows = query - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - Ok(rows.first().map(|row| { - let bytes: Vec = row.get(0); - EventId::from(bytes) - })) - } - - #[instrument(skip(self))] - async fn last( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - let query = sqlx::query( - " - SELECT - key - FROM - store_key - WHERE - sort_key = ? AND - key > ? AND key < ? - ORDER BY - key DESC - LIMIT - 1 - ;", - ); - let rows = query - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - Ok(rows.first().map(|row| { - let bytes: Vec = row.get(0); - EventId::from(bytes) - })) - } - - #[instrument(skip(self))] - async fn first_and_last( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - let query = sqlx::query( - " - SELECT first.key, last.key - FROM - ( - SELECT key - FROM store_key - WHERE - sort_key = ? AND - key > ? AND key < ? - ORDER BY key ASC - LIMIT 1 - ) as first - JOIN - ( - SELECT key - FROM store_key - WHERE - sort_key = ? AND - key > ? AND key < ? - ORDER BY key DESC - LIMIT 1 - ) as last - ;", - ); - let rows = query - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .bind(SORT_KEY) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - if let Some(row) = rows.first() { - let first = EventId::from(row.get::, _>(0)); - let last = EventId::from(row.get::, _>(1)); - Ok(Some((first, last))) - } else { - Ok(None) - } - } - - #[instrument(skip(self))] - async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { - let query = sqlx::query( - " - SELECT - cid, root, bytes - FROM store_block - WHERE - sort_key=? - AND key=? - ORDER BY idx - ;", - ); - let blocks = query - .bind(SORT_KEY) - .bind(key.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - self.rebuild_car( - blocks - .into_iter() - .map(|row| { - Cid::read_bytes(row.get::<&[u8], _>(0)) - .map_err(anyhow::Error::from) - .map(|cid| BlockRow { - cid, - root: row.get(1), - bytes: row.get(2), - }) - }) - .collect::>>()?, - ) - .await - } - - #[instrument(skip(self))] - async fn keys_with_missing_values( - &mut self, - range: RangeOpen, - ) -> Result> { - if range.start >= range.end { - return Ok(vec![]); - }; - let query = sqlx::query( - " - SELECT key - FROM store_key - WHERE - sort_key=? - AND key > ? - AND key < ? - AND value_retrieved = false - ;", - ); - let row = query - .bind(SORT_KEY) - .bind(range.start.as_bytes()) - .bind(range.end.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - Ok(row - .into_iter() - .map(|row| EventId::from(row.get::, _>(0))) - .collect()) - } -} - -#[async_trait] -impl iroh_bitswap::Store for Store { - async fn get_size(&self, cid: &Cid) -> Result { - Ok( - sqlx::query("SELECT length(bytes) FROM store_block WHERE cid = ?;") - .bind(cid.to_bytes()) - .fetch_one(self.pool.reader()) - .await? - .get::<'_, i64, _>(0) as usize, - ) - } - - async fn get(&self, cid: &Cid) -> Result { - Ok(Block::new( - sqlx::query("SELECT bytes FROM store_block WHERE cid = ?;") - .bind(cid.to_bytes()) - .fetch_one(self.pool.reader()) - .await? - .get::<'_, Vec, _>(0) - .into(), - cid.to_owned(), - )) - } - - async fn has(&self, cid: &Cid) -> Result { - Ok( - sqlx::query("SELECT count(1) FROM store_block WHERE cid = ?;") - .bind(cid.to_bytes()) - .fetch_one(self.pool.reader()) - .await? - .get::<'_, i64, _>(0) - > 0, - ) - } -} +pub use sqlite::{DbTx, InterestStore, ModelStore, SqlitePool}; diff --git a/recon/src/recon/sqlitestore.rs b/store/src/sqlite/interest.rs similarity index 56% rename from recon/src/recon/sqlitestore.rs rename to store/src/sqlite/interest.rs index d3e798c4a..3ec25cc35 100644 --- a/recon/src/recon/sqlitestore.rs +++ b/store/src/sqlite/interest.rs @@ -1,40 +1,34 @@ #![warn(missing_docs, missing_debug_implementations, clippy::all)] -use super::{HashCount, InsertResult, ReconItem}; -use crate::{AssociativeHash, Key, Store}; use anyhow::Result; use async_trait::async_trait; -use ceramic_core::{DbTx, RangeOpen, SqlitePool}; +use ceramic_core::{Interest, RangeOpen}; +use recon::{AssociativeHash, HashCount, InsertResult, Key, ReconItem, Store}; use sqlx::Row; use std::marker::PhantomData; -use std::result::Result::Ok; use tracing::instrument; -/// ReconSQLite is a implementation of Recon store +use crate::{DbTx, SqlitePool}; + #[derive(Debug)] -pub struct SQLiteStore +/// InterestStore is a [`recon::Store`] implementation for Interests. +pub struct InterestStore where - K: Key, H: AssociativeHash, { - key: PhantomData, hash: PhantomData, pool: SqlitePool, - sort_key: String, } -impl SQLiteStore +impl InterestStore where - K: Key, H: AssociativeHash, { - /// Make a new SQLiteStore from a connection and sort_key. - /// This will create the recon table if it does not already exist. - pub async fn new(pool: SqlitePool, sort_key: String) -> Result { - let mut store = SQLiteStore { + /// Make a new InterestSqliteStore from a connection pool. + /// This will create the interest_key table if it does not already exist. + pub async fn new(pool: SqlitePool) -> Result { + let mut store = InterestStore { pool, - sort_key, - key: PhantomData, hash: PhantomData, }; store.create_table_if_not_exists().await?; @@ -42,16 +36,13 @@ where } } -impl SQLiteStore +impl InterestStore where - K: Key, H: AssociativeHash + std::convert::From<[u32; 8]>, { - /// Initialize the recon table. + /// Initialize the interest_key table. async fn create_table_if_not_exists(&mut self) -> Result<()> { - // Do we want to remove CID from the table? - const CREATE_RECON_TABLE: &str = "CREATE TABLE IF NOT EXISTS recon ( - sort_key TEXT, -- the field in the event header to sort by e.g. model + const CREATE_INTEREST_KEY_TABLE: &str = "CREATE TABLE IF NOT EXISTS interest_key ( key BLOB, -- network_id sort_value controller StreamID height event_cid ahash_0 INTEGER, -- the ahash is decomposed as [u32; 8] ahash_1 INTEGER, @@ -61,27 +52,11 @@ where ahash_5 INTEGER, ahash_6 INTEGER, ahash_7 INTEGER, - CID TEXT, - value_retrieved BOOL, -- indicates if we still want the value - PRIMARY KEY(sort_key, key) - )"; - const CREATE_VALUE_RETRIEVED_INDEX: &str = - "CREATE INDEX IF NOT EXISTS idx_recon_value_retrieved - ON recon (sort_key, key, value_retrieved)"; - - const CREATE_RECON_VALUE_TABLE: &str = "CREATE TABLE IF NOT EXISTS recon_value ( - sort_key TEXT, - key BLOB, - value BLOB, - PRIMARY KEY(sort_key, key) + PRIMARY KEY(key) )"; let mut tx = self.pool.tx().await?; - sqlx::query(CREATE_RECON_TABLE).execute(&mut *tx).await?; - sqlx::query(CREATE_VALUE_RETRIEVED_INDEX) - .execute(&mut *tx) - .await?; - sqlx::query(CREATE_RECON_VALUE_TABLE) + sqlx::query(CREATE_INTEREST_KEY_TABLE) .execute(&mut *tx) .await?; tx.commit().await?; @@ -91,81 +66,36 @@ where /// returns (new_key, new_val) tuple async fn insert_item_int( &mut self, - item: &ReconItem<'_, K>, + item: &ReconItem<'_, Interest>, conn: &mut DbTx<'_>, ) -> Result<(bool, bool)> { - // we insert the value first as it's possible we already have the key and can skip that step - // as it happens in a transaction, we'll roll back the value insert if the key insert fails and try again + // interests don't have values, if someone gives us something we throw an error but allow None/vec![] if let Some(val) = item.value { - // Update the value_retrieved flag, and report if the key already exists. - let key_exists = self.update_value_retrieved_int(item.key, conn).await?; - self.insert_value_int(item.key, val, conn).await?; - if key_exists { - return Ok((false, true)); + if !val.is_empty() { + return Err(anyhow::anyhow!( + "Interests do not support values! Invalid request." + )); } } - let new_key = self - .insert_key_int(item.key, item.value.is_some(), conn) - .await?; + let new_key = self.insert_key_int(item.key, conn).await?; Ok((new_key, item.value.is_some())) } - // set value_retrieved to true and return if the key already exists - async fn update_value_retrieved_int(&mut self, key: &K, conn: &mut DbTx<'_>) -> Result { - let update = - sqlx::query("UPDATE recon SET value_retrieved = true WHERE sort_key = ? AND key = ?"); - let resp = update - .bind(&self.sort_key) - .bind(key.as_bytes()) - .execute(&mut **conn) - .await?; - let rows_affected = resp.rows_affected(); - debug_assert!(rows_affected <= 1); - Ok(rows_affected == 1) - } - - /// returns true if the key already exists in the recon table - async fn insert_value_int(&mut self, key: &K, val: &[u8], conn: &mut DbTx<'_>) -> Result<()> { - let value_insert = sqlx::query( - r#"INSERT INTO recon_value (value, sort_key, key) - VALUES (?, ?, ?) - ON CONFLICT (sort_key, key) DO UPDATE - SET value=excluded.value"#, - ); - - value_insert - .bind(val) - .bind(&self.sort_key) - .bind(key.as_bytes()) - .execute(&mut **conn) - .await?; - - Ok(()) - } - - async fn insert_key_int( - &mut self, - key: &K, - has_value: bool, - conn: &mut DbTx<'_>, - ) -> Result { + async fn insert_key_int(&mut self, key: &Interest, conn: &mut DbTx<'_>) -> Result { let key_insert = sqlx::query( - "INSERT INTO recon ( - sort_key, key, + "INSERT INTO interest_key ( + key, ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7, - value_retrieved + ahash_4, ahash_5, ahash_6, ahash_7 ) VALUES ( - ?, ?, - ?, ?, ?, ?, + ?, ?, ?, ?, ?, - ? + ?, ?, ?, ? );", ); let hash = H::digest(key); let resp = key_insert - .bind(&self.sort_key) .bind(key.as_bytes()) .bind(hash.as_u32s()[0]) .bind(hash.as_u32s()[1]) @@ -175,7 +105,6 @@ where .bind(hash.as_u32s()[5]) .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) - .bind(has_value) .execute(&mut **conn) .await; match resp { @@ -193,12 +122,11 @@ where } #[async_trait] -impl Store for SQLiteStore +impl Store for InterestStore where - K: Key, H: AssociativeHash, { - type Key = K; + type Key = Interest; type Hash = H; /// Returns true if the key was new. The value is always updated if included @@ -213,7 +141,7 @@ where /// Returns true if a key did not previously exist. async fn insert_many<'a, I>(&mut self, items: I) -> Result where - I: ExactSizeIterator> + Send + Sync, + I: ExactSizeIterator> + Send + Sync, { match items.len() { 0 => Ok(InsertResult::new(vec![], 0)), @@ -243,10 +171,7 @@ where right_fencepost: &Self::Key, ) -> Result> { if left_fencepost >= right_fencepost { - return Ok(HashCount { - hash: H::identity(), - count: 0, - }); + return Ok(HashCount::new(H::identity(), 0)); } let query = sqlx::query( @@ -256,10 +181,9 @@ where TOTAL(ahash_4) & 0xFFFFFFFF, TOTAL(ahash_5) & 0xFFFFFFFF, TOTAL(ahash_6) & 0xFFFFFFFF, TOTAL(ahash_7) & 0xFFFFFFFF, COUNT(1) - FROM recon WHERE sort_key = ? AND key > ? AND key < ?;", + FROM interest_key WHERE key > ? AND key < ?;", ); let row = query - .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .fetch_one(self.pool.reader()) @@ -278,10 +202,7 @@ where let count: u64 = count .try_into() .expect("COUNT(1) should never return a negative number"); - Ok(HashCount { - hash: H::from(bytes), - count, - }) + Ok(HashCount::new(H::from(bytes), count)) } #[instrument(skip(self))] @@ -297,9 +218,8 @@ where SELECT key FROM - recon + interest_key WHERE - sort_key = ? AND key > ? AND key < ? ORDER BY key ASC @@ -310,7 +230,6 @@ where ", ); let rows = query - .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .bind(limit as i64) @@ -320,10 +239,12 @@ where //debug!(count = rows.len(), "rows"); Ok(Box::new(rows.into_iter().map(|row| { let bytes: Vec = row.get(0); - K::from(bytes) + Interest::from(bytes) }))) } + #[instrument(skip(self))] + /// Interests don't have values, so the value will always be an empty vec. Use `range` instead. async fn range_with_values( &mut self, left_fencepost: &Self::Key, @@ -331,37 +252,10 @@ where offset: usize, limit: usize, ) -> Result)> + Send + 'static>> { - let query = sqlx::query( - " - SELECT - key, value - FROM - recon_value - WHERE - sort_key = ? AND - key > ? AND key < ? - AND value IS NOT NULL - ORDER BY - key ASC - LIMIT - ? - OFFSET - ?; - ", - ); - let rows = query - .bind(&self.sort_key) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .bind(limit as i64) - .bind(offset as i64) - .fetch_all(self.pool.reader()) + let rows = self + .range(left_fencepost, right_fencepost, offset, limit) .await?; - Ok(Box::new(rows.into_iter().map(|row| { - let key: Vec = row.get(0); - let value: Vec = row.get(1); - (K::from(key), value) - }))) + Ok(Box::new(rows.into_iter().map(|key| (key, Vec::new())))) } /// Return the number of keys within the range. #[instrument(skip(self))] @@ -375,14 +269,12 @@ where SELECT count(key) FROM - recon + interest_key WHERE - sort_key = ? AND key > ? AND key < ? ;", ); let row = query - .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .fetch_one(self.pool.reader()) @@ -402,9 +294,8 @@ where SELECT key FROM - recon + interest_key WHERE - sort_key = ? AND key > ? AND key < ? ORDER BY key ASC @@ -413,14 +304,13 @@ where ; ", ); let rows = query - .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .fetch_all(self.pool.reader()) .await?; Ok(rows.first().map(|row| { let bytes: Vec = row.get(0); - K::from(bytes) + Interest::from(bytes) })) } @@ -435,9 +325,8 @@ where SELECT key FROM - recon + interest_key WHERE - sort_key = ? AND key > ? AND key < ? ORDER BY key DESC @@ -446,14 +335,13 @@ where ;", ); let rows = query - .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .fetch_all(self.pool.reader()) .await?; Ok(rows.first().map(|row| { let bytes: Vec = row.get(0); - K::from(bytes) + Interest::from(bytes) })) } @@ -469,9 +357,8 @@ where FROM ( SELECT key - FROM recon + FROM interest_key WHERE - sort_key = ? AND key > ? AND key < ? ORDER BY key ASC LIMIT 1 @@ -479,9 +366,8 @@ where JOIN ( SELECT key - FROM recon + FROM interest_key WHERE - sort_key = ? AND key > ? AND key < ? ORDER BY key DESC LIMIT 1 @@ -489,17 +375,17 @@ where ;", ); let rows = query - .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) - .bind(&self.sort_key) .bind(left_fencepost.as_bytes()) .bind(right_fencepost.as_bytes()) .fetch_all(self.pool.reader()) .await?; if let Some(row) = rows.first() { - let first = K::from(row.get(0)); - let last = K::from(row.get(1)); + let f_bytes: Vec = row.get(0); + let l_bytes: Vec = row.get(1); + let first = Interest::from(f_bytes); + let last = Interest::from(l_bytes); Ok(Some((first, last))) } else { Ok(None) @@ -507,92 +393,63 @@ where } #[instrument(skip(self))] - async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { - let query = sqlx::query("SELECT value FROM recon_value WHERE sort_key=? AND key=?;"); - let row = query - .bind(&self.sort_key) - .bind(key.as_bytes()) - .fetch_optional(self.pool.reader()) - .await?; - Ok(row.map(|row| row.get(0))) + async fn value_for_key(&mut self, _key: &Self::Key) -> Result>> { + Ok(Some(vec![])) } #[instrument(skip(self))] async fn keys_with_missing_values( &mut self, - range: RangeOpen, + _range: RangeOpen, ) -> Result> { - if range.start >= range.end { - return Ok(vec![]); - }; - let query = sqlx::query( - " - SELECT key - FROM recon - WHERE - sort_key=? - AND key > ? - AND key < ? - AND value_retrieved = false - ;", - ); - let row = query - .bind(&self.sort_key) - .bind(range.start.as_bytes()) - .bind(range.end.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - Ok(row.into_iter().map(|row| K::from(row.get(0))).collect()) + Ok(vec![]) } } #[cfg(test)] -mod tests { +mod interest_tests { use super::*; - use crate::recon::ReconItem; - use crate::tests::AlphaNumBytes; - use crate::Sha256a; + use recon::{AssociativeHash, Key, ReconItem, Sha256a, Store}; use expect_test::expect; use test_log::test; - async fn new_store() -> SQLiteStore { + async fn new_store() -> InterestStore { let conn = SqlitePool::connect("sqlite::memory:").await.unwrap(); - SQLiteStore::::new(conn, "test".to_string()) - .await - .unwrap() + InterestStore::::new(conn).await.unwrap() } #[test(tokio::test)] async fn test_hash_range_query() { let mut store = new_store().await; store - .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .insert(ReconItem::new_key(&Interest::from("hello".as_bytes()))) .await .unwrap(); store - .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .insert(ReconItem::new_key(&Interest::from("world".as_bytes()))) .await .unwrap(); - let hash: Sha256a = store + let hash_cnt = store .hash_range(&b"a".as_slice().into(), &b"z".as_slice().into()) .await - .unwrap() - .hash; + .unwrap(); expect![[r#"7460F21C83815F5EDC682F7A4154BC09AA3A0AE5DD1A2DEDCD709888A12751CC"#]] - .assert_eq(&hash.to_hex()); + .assert_eq(&hash_cnt.hash().to_hex()); } #[test(tokio::test)] async fn test_range_query() { let mut store = new_store().await; + let hello_interest = Interest::from("hello".as_bytes()); + let world_interest = Interest::from("world".as_bytes()); store - .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .insert(ReconItem::new_key(&hello_interest)) .await .unwrap(); store - .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .insert(ReconItem::new_key(&world_interest)) .await .unwrap(); let ids = store @@ -604,17 +461,60 @@ mod tests { ) .await .unwrap(); - expect![[r#" - [ - Bytes( - "hello", - ), - Bytes( - "world", - ), - ] - "#]] - .assert_debug_eq(&ids.collect::>()); + let interests = ids.collect::>(); + assert_eq!(2, interests.len()); + assert_eq!(vec![hello_interest, world_interest], interests); + // TODO: need to fix bug in interests format impl and regenerate/fix these expects + // expect![[r#" + // [ + // Bytes( + // "hello", + // ), + // Bytes( + // "world", + // ), + // ] + // "#]] + // .assert_debug_eq(&ids.collect::>()); + } + + #[test(tokio::test)] + async fn test_range_with_values_query() { + let mut store = new_store().await; + let hello_interest = Interest::from("hello".as_bytes()); + let world_interest = Interest::from("world".as_bytes()); + store + .insert(ReconItem::new_key(&hello_interest)) + .await + .unwrap(); + store + .insert(ReconItem::new_key(&world_interest)) + .await + .unwrap(); + let ids = store + .range_with_values( + &b"a".as_slice().into(), + &b"z".as_slice().into(), + 0, + usize::MAX, + ) + .await + .unwrap(); + let interests = ids.into_iter().map(|(i, _v)| i).collect::>(); + assert_eq!(2, interests.len()); + assert_eq!(vec![hello_interest, world_interest], interests); + // TODO: need to fix bug in interests format impl and regenerate/fix these expects + // expect![[r#" + // [ + // Bytes( + // "hello", + // ), + // Bytes( + // "world", + // ), + // ] + // "#]] + // .assert_debug_eq(&ids.collect::>()); } #[test(tokio::test)] @@ -631,7 +531,7 @@ mod tests { ] .assert_debug_eq( &store - .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .insert(ReconItem::new_key(&Interest::from("hello".as_bytes()))) .await, ); @@ -645,7 +545,7 @@ mod tests { ] .assert_debug_eq( &store - .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .insert(ReconItem::new_key(&Interest::from("hello".as_bytes()))) .await, ); } @@ -653,37 +553,49 @@ mod tests { #[test(tokio::test)] async fn test_first_and_last() { let mut store = new_store().await; + let hello_interest = Interest::from("hello".as_bytes()); + let world_interest = Interest::from("world".as_bytes()); store - .insert(ReconItem::new_key(&AlphaNumBytes::from("hello"))) + .insert(ReconItem::new_key(&hello_interest)) .await .unwrap(); store - .insert(ReconItem::new_key(&AlphaNumBytes::from("world"))) + .insert(ReconItem::new_key(&world_interest)) .await .unwrap(); // Only one key in range let ret = store - .first_and_last(&AlphaNumBytes::from("a"), &AlphaNumBytes::from("j")) + .first_and_last( + &Interest::from("a".as_bytes()), + &Interest::from("j".as_bytes()), + ) .await + .unwrap() .unwrap(); - expect![[r#" - Some( - ( - Bytes( - "hello", - ), - Bytes( - "hello", - ), - ), - ) - "#]] - .assert_debug_eq(&ret); + + assert_eq!(hello_interest, ret.0); + assert_eq!(hello_interest, ret.1); + // expect![[r#" + // Some( + // ( + // Bytes( + // "hello", + // ), + // Bytes( + // "hello", + // ), + // ), + // ) + // "#]] + // .assert_debug_eq(&ret); // No keys in range let ret = store - .first_and_last(&AlphaNumBytes::from("j"), &AlphaNumBytes::from("p")) + .first_and_last( + &Interest::from("j".as_bytes()), + &Interest::from("p".as_bytes()), + ) .await .unwrap(); expect![[r#" @@ -693,61 +605,60 @@ mod tests { // Two keys in range let ret = store - .first_and_last(&AlphaNumBytes::from("a"), &AlphaNumBytes::from("z")) + .first_and_last( + &Interest::from("a".as_bytes()), + &Interest::from("z".as_bytes()), + ) .await + .unwrap() .unwrap(); - expect![[r#" - Some( - ( - Bytes( - "hello", - ), - Bytes( - "world", - ), - ), - ) - "#]] - .assert_debug_eq(&ret); + // both keys exist + assert_eq!(hello_interest, ret.0); + assert_eq!(world_interest, ret.1); + // expect![[r#" + // Some( + // ( + // Bytes( + // "hello", + // ), + // Bytes( + // "world", + // ), + // ), + // ) + // "#]] + // .assert_debug_eq(&ret); } #[test(tokio::test)] - async fn test_store_value_for_key() { + #[should_panic(expected = "Interests do not support values! Invalid request.")] + async fn test_store_value_for_key_error() { let mut store = new_store().await; - let key = AlphaNumBytes::from("hello"); - let store_value = AlphaNumBytes::from("world"); + let key = Interest::from("hello".as_bytes()); + let store_value = Interest::from("world".as_bytes()); store .insert(ReconItem::new_with_value(&key, store_value.as_slice())) .await .unwrap(); - let value = store.value_for_key(&key).await.unwrap().unwrap(); - expect![[r#"776f726c64"#]].assert_eq(hex::encode(&value).as_str()); } + #[test(tokio::test)] async fn test_keys_with_missing_value() { let mut store = new_store().await; - let key = AlphaNumBytes::from("hello"); + let key = Interest::from("hello".as_bytes()); store.insert(ReconItem::new(&key, None)).await.unwrap(); let missing_keys = store - .keys_with_missing_values( - (AlphaNumBytes::min_value(), AlphaNumBytes::max_value()).into(), - ) + .keys_with_missing_values((Interest::min_value(), Interest::max_value()).into()) .await .unwrap(); expect![[r#" - [ - Bytes( - "hello", - ), - ] + [] "#]] .assert_debug_eq(&missing_keys); store.insert(ReconItem::new(&key, Some(&[]))).await.unwrap(); let missing_keys = store - .keys_with_missing_values( - (AlphaNumBytes::min_value(), AlphaNumBytes::max_value()).into(), - ) + .keys_with_missing_values((Interest::min_value(), Interest::max_value()).into()) .await .unwrap(); expect![[r#" @@ -755,4 +666,15 @@ mod tests { "#]] .assert_debug_eq(&missing_keys); } + + #[test(tokio::test)] + async fn test_value_for_key() { + let mut store = new_store().await; + let key = Interest::from("hello".as_bytes()); + store.insert(ReconItem::new(&key, None)).await.unwrap(); + let value = store.value_for_key(&key).await.unwrap(); + let val = value.unwrap(); + let empty: Vec = vec![]; + assert_eq!(empty, val); + } } diff --git a/core/src/sql.rs b/store/src/sqlite/mod.rs similarity index 96% rename from core/src/sql.rs rename to store/src/sqlite/mod.rs index b97712d48..cfbaf210b 100644 --- a/core/src/sql.rs +++ b/store/src/sqlite/mod.rs @@ -1,3 +1,9 @@ +mod interest; +mod model; + +pub use interest::InterestStore; +pub use model::ModelStore; + use std::{path::Path, str::FromStr}; use sqlx::{ diff --git a/store/src/sqlite/model.rs b/store/src/sqlite/model.rs new file mode 100644 index 000000000..00825367a --- /dev/null +++ b/store/src/sqlite/model.rs @@ -0,0 +1,1185 @@ +use std::{collections::BTreeSet, marker::PhantomData}; + +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use ceramic_core::{EventId, RangeOpen}; +use cid::Cid; +use iroh_bitswap::Block; +use iroh_car::{CarHeader, CarReader, CarWriter}; +use itertools::{process_results, Itertools}; +use multihash::{Code, MultihashDigest}; +use recon::{AssociativeHash, HashCount, InsertResult, Key, ReconItem, Sha256a}; +use sqlx::Row; +use tracing::instrument; + +use crate::{DbTx, SqlitePool}; + +/// Unified implementation of [`recon::Store`] and [`iroh_bitswap::Store`] that can expose the +/// individual blocks from the CAR files directly. +#[derive(Clone, Debug)] +pub struct ModelStore +where + H: AssociativeHash, +{ + pool: SqlitePool, + hash: PhantomData, +} + +#[derive(Debug)] +struct BlockRow { + cid: Cid, + root: bool, + bytes: Vec, +} + +impl ModelStore +where + H: AssociativeHash + std::convert::From<[u32; 8]>, +{ + /// Create an instance of the store initializing any neccessary tables. + pub async fn new(pool: SqlitePool) -> Result { + let mut store = ModelStore { + pool, + hash: PhantomData, + }; + store.create_table_if_not_exists().await?; + Ok(store) + } + + /// Initialize the recon table. + async fn create_table_if_not_exists(&mut self) -> Result<()> { + const CREATE_STORE_KEY_TABLE: &str = "CREATE TABLE IF NOT EXISTS model_key ( + key BLOB, -- network_id sort_value controller StreamID height event_cid + ahash_0 INTEGER, -- the ahash is decomposed as [u32; 8] + ahash_1 INTEGER, + ahash_2 INTEGER, + ahash_3 INTEGER, + ahash_4 INTEGER, + ahash_5 INTEGER, + ahash_6 INTEGER, + ahash_7 INTEGER, + value_retrieved BOOL, -- indicates if we have the value + PRIMARY KEY(key) + )"; + const CREATE_VALUE_RETRIEVED_INDEX: &str = + "CREATE INDEX IF NOT EXISTS idx_key_value_retrieved + ON model_key (key, value_retrieved)"; + + const CREATE_MODEL_BLOCK_TABLE: &str = "CREATE TABLE IF NOT EXISTS model_block ( + key BLOB, -- network_id sort_value controller StreamID height event_cid + cid BLOB, -- the cid of the Block as bytes no 0x00 prefix + idx INTEGER, -- the index of the block in the CAR file + root BOOL, -- when true the block is a root in the CAR file + bytes BLOB, -- the Block + PRIMARY KEY(key, cid) + )"; + // TODO should this include idx or not? + const CREATE_BLOCK_ORDER_INDEX: &str = "CREATE INDEX IF NOT EXISTS idx_model_block_cid + ON model_block (cid)"; + + let mut tx = self.pool.tx().await?; + sqlx::query(CREATE_STORE_KEY_TABLE) + .execute(&mut *tx) + .await?; + sqlx::query(CREATE_VALUE_RETRIEVED_INDEX) + .execute(&mut *tx) + .await?; + sqlx::query(CREATE_MODEL_BLOCK_TABLE) + .execute(&mut *tx) + .await?; + sqlx::query(CREATE_BLOCK_ORDER_INDEX) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + /// returns (new_key, new_val) tuple + async fn insert_item_int( + &mut self, + item: &ReconItem<'_, EventId>, + conn: &mut DbTx<'_>, + ) -> Result<(bool, bool)> { + // we insert the value first as it's possible we already have the key and can skip that step + // as it happens in a transaction, we'll roll back the value insert if the key insert fails and try again + if let Some(val) = item.value { + // Update the value_retrieved flag, and report if the key already exists. + let key_exists = self.update_value_retrieved_int(item.key, conn).await?; + + // Put each block from the car file + let mut reader = CarReader::new(val).await?; + let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); + let mut idx = 0; + while let Some((cid, data)) = reader.next_block().await? { + self.insert_block_int(item.key, idx, roots.contains(&cid), cid, &data.into(), conn) + .await?; + idx += 1; + } + + if key_exists { + return Ok((false, true)); + } + } + let new_key = self + .insert_key_int(item.key, item.value.is_some(), conn) + .await?; + Ok((new_key, item.value.is_some())) + } + + // set value_retrieved to true and return if the key already exists + async fn update_value_retrieved_int( + &mut self, + key: &EventId, + conn: &mut DbTx<'_>, + ) -> Result { + let update = sqlx::query("UPDATE model_key SET value_retrieved = true WHERE key = ?"); + let resp = update.bind(key.as_bytes()).execute(&mut **conn).await?; + let rows_affected = resp.rows_affected(); + debug_assert!(rows_affected <= 1); + Ok(rows_affected == 1) + } + + // store a block in the db. + async fn insert_block_int( + &self, + key: &EventId, + idx: i32, + root: bool, + cid: Cid, + blob: &Bytes, + conn: &mut DbTx<'_>, + ) -> Result<()> { + let hash = match cid.hash().code() { + 0x12 => Code::Sha2_256.digest(blob), + 0x1b => Code::Keccak256.digest(blob), + 0x11 => return Err(anyhow!("Sha1 not supported")), + _ => { + return Err(anyhow!( + "multihash type {:#x} not Sha2_256, Keccak256", + cid.hash().code(), + )) + } + }; + if cid.hash().to_bytes() != hash.to_bytes() { + return Err(anyhow!( + "cid did not match blob {} != {}", + hex::encode(cid.hash().to_bytes()), + hex::encode(hash.to_bytes()) + )); + } + + sqlx::query("INSERT INTO model_block (key, idx, root, cid, bytes) VALUES (?, ?, ?, ?, ?)") + .bind(key.as_bytes()) + .bind(idx) + .bind(root) + .bind(cid.to_bytes()) + .bind(blob.to_vec()) + .execute(&mut **conn) + .await?; + Ok(()) + } + + async fn insert_key_int( + &mut self, + key: &EventId, + has_value: bool, + conn: &mut DbTx<'_>, + ) -> Result { + let key_insert = sqlx::query( + "INSERT INTO model_key ( + key, + ahash_0, ahash_1, ahash_2, ahash_3, + ahash_4, ahash_5, ahash_6, ahash_7, + value_retrieved + ) VALUES ( + ?, + ?, ?, ?, ?, + ?, ?, ?, ?, + ? + );", + ); + + let hash = Sha256a::digest(key); + let resp = key_insert + .bind(key.as_bytes()) + .bind(hash.as_u32s()[0]) + .bind(hash.as_u32s()[1]) + .bind(hash.as_u32s()[2]) + .bind(hash.as_u32s()[3]) + .bind(hash.as_u32s()[4]) + .bind(hash.as_u32s()[5]) + .bind(hash.as_u32s()[6]) + .bind(hash.as_u32s()[7]) + .bind(has_value) + .execute(&mut **conn) + .await; + match resp { + std::result::Result::Ok(_rows) => Ok(true), + Err(sqlx::Error::Database(err)) => { + if err.is_unique_violation() { + Ok(false) + } else { + Err(sqlx::Error::Database(err).into()) + } + } + Err(err) => Err(err.into()), + } + } + + async fn rebuild_car(&mut self, blocks: Vec) -> Result>> { + if blocks.is_empty() { + return Ok(None); + } + + let size = blocks.iter().fold(0, |sum, row| sum + row.bytes.len()); + let roots: Vec = blocks + .iter() + .filter(|row| row.root) + .map(|row| row.cid) + .collect(); + // Reconstruct the car file + // TODO figure out a better capacity calculation + let mut car = Vec::with_capacity(size + 100 * blocks.len()); + let mut writer = CarWriter::new(CarHeader::V1(roots.into()), &mut car); + for BlockRow { + cid, + bytes, + root: _, + } in blocks + { + writer.write(cid, bytes).await?; + } + writer.finish().await?; + Ok(Some(car)) + } +} + +#[async_trait] +impl recon::Store for ModelStore +where + H: AssociativeHash, +{ + type Key = EventId; + type Hash = H; + + /// Returns true if the key was new. The value is always updated if included + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + let mut tx = self.pool.writer().begin().await?; + let (new_key, _new_val) = self.insert_item_int(&item, &mut tx).await?; + tx.commit().await?; + Ok(new_key) + } + + /// Insert new keys into the key space. + /// Returns true if a key did not previously exist. + async fn insert_many<'a, I>(&mut self, items: I) -> Result + where + I: ExactSizeIterator> + Send + Sync, + { + match items.len() { + 0 => Ok(InsertResult::new(vec![], 0)), + _ => { + let mut results = vec![false; items.len()]; + let mut new_val_cnt = 0; + let mut tx = self.pool.writer().begin().await?; + + for (idx, item) in items.enumerate() { + let (new_key, new_val) = self.insert_item_int(&item, &mut tx).await?; + results[idx] = new_key; + if new_val { + new_val_cnt += 1; + } + } + tx.commit().await?; + Ok(InsertResult::new(results, new_val_cnt)) + } + } + } + + /// return the hash and count for a range + #[instrument(skip(self))] + async fn hash_range( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + if left_fencepost >= right_fencepost { + return Ok(HashCount::new(H::identity(), 0)); + } + + let query = sqlx::query( + "SELECT + TOTAL(ahash_0) & 0xFFFFFFFF, TOTAL(ahash_1) & 0xFFFFFFFF, + TOTAL(ahash_2) & 0xFFFFFFFF, TOTAL(ahash_3) & 0xFFFFFFFF, + TOTAL(ahash_4) & 0xFFFFFFFF, TOTAL(ahash_5) & 0xFFFFFFFF, + TOTAL(ahash_6) & 0xFFFFFFFF, TOTAL(ahash_7) & 0xFFFFFFFF, + COUNT(1) + FROM model_key WHERE key > ? AND key < ?;", + ); + let row = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_one(self.pool.reader()) + .await?; + let bytes: [u32; 8] = [ + row.get(0), + row.get(1), + row.get(2), + row.get(3), + row.get(4), + row.get(5), + row.get(6), + row.get(7), + ]; + let count: i64 = row.get(8); // sql int type is signed + let count: u64 = count + .try_into() + .expect("COUNT(1) should never return a negative number"); + Ok(HashCount::new(H::from(bytes), count)) + } + + #[instrument(skip(self))] + async fn range( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result + Send + 'static>> { + let query = sqlx::query( + " + SELECT + key + FROM + model_key + WHERE + key > ? AND key < ? + ORDER BY + key ASC + LIMIT + ? + OFFSET + ?; + ", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(self.pool.reader()) + .await?; + //debug!(count = rows.len(), "rows"); + Ok(Box::new(rows.into_iter().map(|row| { + let bytes: Vec = row.get(0); + EventId::from(bytes) + }))) + } + #[instrument(skip(self))] + async fn range_with_values( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result)> + Send + 'static>> { + let query = sqlx::query( + " + SELECT + model_block.key, model_block.cid, model_block.root, model_block.bytes + FROM ( + SELECT + key + FROM model_key + WHERE + key > ? AND key < ? + AND value_retrieved = true + ORDER BY + key ASC + LIMIT + ? + OFFSET + ? + ) key + JOIN + model_block + ON + key.key = model_block.key + ORDER BY model_block.key, model_block.idx + ;", + ); + let all_blocks = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(self.pool.reader()) + .await?; + + // Consume all block into groups of blocks by their key. + let all_blocks: Vec<(Self::Key, Vec)> = process_results( + all_blocks.into_iter().map(|row| { + Cid::read_bytes(row.get::<&[u8], _>(1)) + .map_err(anyhow::Error::from) + .map(|cid| { + ( + Self::Key::from(row.get::, _>(0)), + cid, + row.get(2), + row.get(3), + ) + }) + }), + |blocks| { + blocks + .group_by(|(key, _, _, _)| key.clone()) + .into_iter() + .map(|(key, group)| { + ( + key, + group + .map(|(_key, cid, root, bytes)| BlockRow { cid, root, bytes }) + .collect::>(), + ) + }) + .collect() + }, + )?; + + let mut values: Vec<(Self::Key, Vec)> = Vec::new(); + for (key, blocks) in all_blocks { + if let Some(value) = self.rebuild_car(blocks).await? { + values.push((key.clone(), value)); + } + } + Ok(Box::new(values.into_iter())) + } + /// Return the number of keys within the range. + #[instrument(skip(self))] + async fn count( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result { + let query = sqlx::query( + " + SELECT + count(key) + FROM + model_key + WHERE + key > ? AND key < ? + ;", + ); + let row = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_one(self.pool.reader()) + .await?; + Ok(row.get::<'_, i64, _>(0) as usize) + } + + /// Return the first key within the range. + #[instrument(skip(self))] + async fn first( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + let query = sqlx::query( + " + SELECT + key + FROM + model_key + WHERE + key > ? AND key < ? + ORDER BY + key ASC + LIMIT + 1 + ; ", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + Ok(rows.first().map(|row| { + let bytes: Vec = row.get(0); + EventId::from(bytes) + })) + } + + #[instrument(skip(self))] + async fn last( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + let query = sqlx::query( + " + SELECT + key + FROM + model_key + WHERE + key > ? AND key < ? + ORDER BY + key DESC + LIMIT + 1 + ;", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + Ok(rows.first().map(|row| { + let bytes: Vec = row.get(0); + EventId::from(bytes) + })) + } + + #[instrument(skip(self))] + async fn first_and_last( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + let query = sqlx::query( + " + SELECT first.key, last.key + FROM + ( + SELECT key + FROM model_key + WHERE + key > ? AND key < ? + ORDER BY key ASC + LIMIT 1 + ) as first + JOIN + ( + SELECT key + FROM model_key + WHERE + key > ? AND key < ? + ORDER BY key DESC + LIMIT 1 + ) as last + ;", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + if let Some(row) = rows.first() { + let first = EventId::from(row.get::, _>(0)); + let last = EventId::from(row.get::, _>(1)); + Ok(Some((first, last))) + } else { + Ok(None) + } + } + + #[instrument(skip(self))] + async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { + let query = sqlx::query( + " + SELECT + cid, root, bytes + FROM model_block + WHERE + key=? + ORDER BY idx + ;", + ); + let blocks = query + .bind(key.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + self.rebuild_car( + blocks + .into_iter() + .map(|row| { + Cid::read_bytes(row.get::<&[u8], _>(0)) + .map_err(anyhow::Error::from) + .map(|cid| BlockRow { + cid, + root: row.get(1), + bytes: row.get(2), + }) + }) + .collect::>>()?, + ) + .await + } + + #[instrument(skip(self))] + async fn keys_with_missing_values( + &mut self, + range: RangeOpen, + ) -> Result> { + if range.start >= range.end { + return Ok(vec![]); + }; + let query = sqlx::query( + " + SELECT key + FROM model_key + WHERE + key > ? + AND key < ? + AND value_retrieved = false + ;", + ); + let row = query + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + Ok(row + .into_iter() + .map(|row| EventId::from(row.get::, _>(0))) + .collect()) + } +} + +#[async_trait] +impl iroh_bitswap::Store for ModelStore { + async fn get_size(&self, cid: &Cid) -> Result { + Ok( + sqlx::query("SELECT length(bytes) FROM model_block WHERE cid = ?;") + .bind(cid.to_bytes()) + .fetch_one(self.pool.reader()) + .await? + .get::<'_, i64, _>(0) as usize, + ) + } + + async fn get(&self, cid: &Cid) -> Result { + Ok(Block::new( + sqlx::query("SELECT bytes FROM model_block WHERE cid = ?;") + .bind(cid.to_bytes()) + .fetch_one(self.pool.reader()) + .await? + .get::<'_, Vec, _>(0) + .into(), + cid.to_owned(), + )) + } + + async fn has(&self, cid: &Cid) -> Result { + Ok( + sqlx::query("SELECT count(1) FROM model_block WHERE cid = ?;") + .bind(cid.to_bytes()) + .fetch_one(self.pool.reader()) + .await? + .get::<'_, i64, _>(0) + > 0, + ) + } +} + +#[cfg(test)] +mod test { + + use crate::tests::*; + + use super::*; + use expect_test::expect; + use recon::{Key, ReconItem}; + use test_log::test; + + #[test(tokio::test)] + async fn hash_range_query() { + let mut store = new_store().await; + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id( + Some(1), + Some("baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u"), + )), + ) + .await + .unwrap(); + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id( + Some(2), + Some("baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny"), + )), + ) + .await + .unwrap(); + let hash = + recon::Store::hash_range(&mut store, &random_event_id_min(), &random_event_id_max()) + .await + .unwrap(); + expect!["65C7A25327CC05C19AB5812103EEB8D1156595832B453C7BAC6A186F4811FA0A#2"] + .assert_eq(&format!("{hash}")); + } + + #[test(tokio::test)] + async fn range_query() { + let mut store = new_store().await; + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id( + Some(1), + Some("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524"), + )), + ) + .await + .unwrap(); + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id( + Some(2), + Some("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty"), + )), + ) + .await + .unwrap(); + let ids = recon::Store::range( + &mut store, + &random_event_id_min(), + &random_event_id_max(), + 0, + usize::MAX, + ) + .await + .unwrap(); + expect![[r#" + [ + EventId { + network_id: Some( + 2, + ), + separator: Some( + "b51217a029eb540d", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + event_height: Some( + 1, + ), + cid: Some( + "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", + ), + }, + EventId { + network_id: Some( + 2, + ), + separator: Some( + "b51217a029eb540d", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + event_height: Some( + 2, + ), + cid: Some( + "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", + ), + }, + ] + "#]] + .assert_debug_eq(&ids.collect::>()); + } + + #[test(tokio::test)] + async fn range_query_with_values() { + let mut store = new_store().await; + // Write three keys, two with values and one without + let one_id = random_event_id( + Some(1), + Some("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524"), + ); + let two_id = random_event_id( + Some(2), + Some("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty"), + ); + let (_one_blocks, one_car) = build_car_file(2).await; + let (_two_blocks, two_car) = build_car_file(3).await; + recon::Store::insert(&mut store, ReconItem::new(&one_id, Some(&one_car))) + .await + .unwrap(); + recon::Store::insert(&mut store, ReconItem::new(&two_id, Some(&two_car))) + .await + .unwrap(); + // Insert new event without a value to ensure we skip it in the query + recon::Store::insert( + &mut store, + ReconItem::new( + &random_event_id( + Some(2), + Some("baeabeicyxeqioadjgy6v6cpy62a3gngylax54sds7rols2b67yetzaw5r4"), + ), + None, + ), + ) + .await + .unwrap(); + let values: Vec<(EventId, Vec)> = recon::Store::range_with_values( + &mut store, + &random_event_id_min(), + &random_event_id_max(), + 0, + usize::MAX, + ) + .await + .unwrap() + .collect(); + + assert_eq!(vec![(one_id, one_car), (two_id, two_car)], values); + } + + #[test(tokio::test)] + async fn double_insert() { + let mut store = new_store().await; + let id = random_event_id(Some(10), None); + + // first insert reports its a new key + expect![ + r#" + Ok( + true, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); + + // second insert of same key reports it already existed + expect![ + r#" + Ok( + false, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); + } + + #[test(tokio::test)] + async fn double_insert_with_value() { + let mut store = new_store().await; + let id = random_event_id(Some(10), None); + let (_, car) = build_car_file(2).await; + + let item = ReconItem::new_with_value(&id, &car); + + // do take the first one + expect![ + r#" + Ok( + true, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, item.clone()).await); + + // reject the second insert of same key with value. Do not override values + expect![[r#" + Err( + Database( + SqliteError { + code: 1555, + message: "UNIQUE constraint failed: model_block.key, model_block.cid", + }, + ), + ) + "#]] + .assert_debug_eq(&recon::Store::insert(&mut store, item).await); + } + + #[test(tokio::test)] + async fn update_missing_value() { + let mut store = new_store().await; + let id = random_event_id(Some(10), None); + let (_, car) = build_car_file(2).await; + + let item_without_value = ReconItem::new_key(&id); + let item_with_value = ReconItem::new_with_value(&id, &car); + + // do take the first one + expect![ + r#" + Ok( + true, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, item_without_value).await); + + // accept the second insert of same key with the value + expect![[r#" + Ok( + false, + ) + "#]] + .assert_debug_eq(&recon::Store::insert(&mut store, item_with_value).await); + } + + #[test(tokio::test)] + async fn first_and_last() { + let mut store = new_store().await; + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id( + Some(10), + Some("baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau"), + )), + ) + .await + .unwrap(); + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id( + Some(11), + Some("baeabeianftvrst5bja422dod6uf42pmwkwix6rprguanwsxylfut56e3ue"), + )), + ) + .await + .unwrap(); + + // Only one key in range + let ret = recon::Store::first_and_last( + &mut store, + &event_id_builder().with_event_height(9).build_fencepost(), + &event_id_builder().with_event_height(11).build_fencepost(), + ) + .await + .unwrap(); + expect![[r#" + Some( + ( + EventId { + network_id: Some( + 2, + ), + separator: Some( + "b51217a029eb540d", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + event_height: Some( + 10, + ), + cid: Some( + "baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau", + ), + }, + EventId { + network_id: Some( + 2, + ), + separator: Some( + "b51217a029eb540d", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + event_height: Some( + 10, + ), + cid: Some( + "baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau", + ), + }, + ), + ) + "#]] + .assert_debug_eq(&ret); + + // No keys in range + let ret = recon::Store::first_and_last( + &mut store, + &event_id_builder().with_event_height(12).build_fencepost(), + &event_id_builder().with_max_event_height().build_fencepost(), + ) + .await + .unwrap(); + expect![[r#" + None + "#]] + .assert_debug_eq(&ret); + + // Two keys in range + let ret = recon::Store::first_and_last( + &mut store, + &random_event_id_min(), + &random_event_id_max(), + ) + .await + .unwrap(); + expect![[r#" + Some( + ( + EventId { + network_id: Some( + 2, + ), + separator: Some( + "b51217a029eb540d", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + event_height: Some( + 10, + ), + cid: Some( + "baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau", + ), + }, + EventId { + network_id: Some( + 2, + ), + separator: Some( + "b51217a029eb540d", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + event_height: Some( + 11, + ), + cid: Some( + "baeabeianftvrst5bja422dod6uf42pmwkwix6rprguanwsxylfut56e3ue", + ), + }, + ), + ) + "#]] + .assert_debug_eq(&ret); + } + + #[test(tokio::test)] + async fn store_value_for_key() { + let mut store = new_store().await; + let key = random_event_id(None, None); + let (_, store_value) = build_car_file(3).await; + recon::Store::insert( + &mut store, + ReconItem::new_with_value(&key, store_value.as_slice()), + ) + .await + .unwrap(); + let value = recon::Store::value_for_key(&mut store, &key) + .await + .unwrap() + .unwrap(); + assert_eq!(hex::encode(store_value), hex::encode(value)); + } + #[test(tokio::test)] + async fn keys_with_missing_value() { + let mut store = new_store().await; + let key = random_event_id( + Some(4), + Some("baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu"), + ); + recon::Store::insert(&mut store, ReconItem::new(&key, None)) + .await + .unwrap(); + let missing_keys = recon::Store::keys_with_missing_values( + &mut store, + (EventId::min_value(), EventId::max_value()).into(), + ) + .await + .unwrap(); + expect![[r#" + [ + EventId { + network_id: Some( + 2, + ), + separator: Some( + "b51217a029eb540d", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + event_height: Some( + 4, + ), + cid: Some( + "baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu", + ), + }, + ] + "#]] + .assert_debug_eq(&missing_keys); + + let (_, value) = build_car_file(2).await; + recon::Store::insert(&mut store, ReconItem::new(&key, Some(&value))) + .await + .unwrap(); + let missing_keys = recon::Store::keys_with_missing_values( + &mut store, + (EventId::min_value(), EventId::max_value()).into(), + ) + .await + .unwrap(); + expect![[r#" + [] + "#]] + .assert_debug_eq(&missing_keys); + } + + #[test(tokio::test)] + async fn read_value_as_block() { + let mut store = new_store().await; + let key = random_event_id(None, None); + let (blocks, store_value) = build_car_file(3).await; + recon::Store::insert( + &mut store, + ReconItem::new_with_value(&key, store_value.as_slice()), + ) + .await + .unwrap(); + let value = recon::Store::value_for_key(&mut store, &key) + .await + .unwrap() + .unwrap(); + assert_eq!(hex::encode(store_value), hex::encode(value)); + + // Read each block from the CAR + for block in blocks { + let value = iroh_bitswap::Store::get(&store, &block.cid).await.unwrap(); + assert_eq!(block, value); + } + } +} diff --git a/store/src/tests.rs b/store/src/tests.rs index c9c6ba0f1..03f730a7a 100644 --- a/store/src/tests.rs +++ b/store/src/tests.rs @@ -1,22 +1,26 @@ +use self::ModelStore; + use super::*; use std::str::FromStr; use ceramic_core::{ event_id::{Builder, WithInit}, - Network, SqlitePool, + EventId, Network, }; + use cid::Cid; -use expect_test::expect; +use iroh_bitswap::Block; use iroh_car::{CarHeader, CarWriter}; use libipld::{ipld, prelude::Encode, Ipld}; use libipld_cbor::DagCborCodec; +use multihash::{Code, MultihashDigest}; use rand::Rng; -use test_log::test; +use recon::Sha256a; -async fn new_store() -> Store { +pub(crate) async fn new_store() -> ModelStore { let conn = SqlitePool::connect("sqlite::memory:").await.unwrap(); - Store::new(conn).await.unwrap() + ModelStore::new(conn).await.unwrap() } #[tokio::test] @@ -26,15 +30,16 @@ async fn get_nonexistent_block() { let cid = Cid::from_str("bafybeibazl2z4vqp2tmwcfag6wirmtpnomxknqcgrauj7m2yisrz3qjbom").unwrap(); // cspell:disable-line let exists = iroh_bitswap::Store::has(&store, &cid).await.unwrap(); - assert_eq!(false, exists); + assert!(!exists); } const MODEL_ID: &str = "k2t6wz4yhfp1r5pwi52gw89nzjbu53qk7m32o5iguw42c6knsaj0feuf927agb"; const CONTROLLER: &str = "did:key:z6Mkqtw7Pj5Lv9xc4PgUYAnwfaVoMC6FRneGWVr5ekTEfKVL"; const INIT_ID: &str = "baeabeiajn5ypv2gllvkk4muvzujvcnoen2orknxix7qtil2daqn6vu6khq"; +const SORT_KEY: &str = "model"; // Return an builder for an event with the same network,model,controller,stream. -fn event_id_builder() -> Builder { +pub(crate) fn event_id_builder() -> Builder { EventId::builder() .with_network(&Network::DevUnstable) .with_sort_value(SORT_KEY, MODEL_ID) @@ -44,33 +49,33 @@ fn event_id_builder() -> Builder { // Generate an event for the same network,model,controller,stream // The event and height are random when when its None. -fn random_event_id(height: Option, event: Option<&str>) -> EventId { +pub(crate) fn random_event_id(height: Option, event: Option<&str>) -> EventId { event_id_builder() .with_event_height(height.unwrap_or_else(|| rand::thread_rng().gen())) .with_event( &event .map(|cid| Cid::from_str(cid).unwrap()) - .unwrap_or_else(|| random_cid()), + .unwrap_or_else(random_cid), ) .build() } // The EventId that is the minumum of all possible random event ids -fn random_event_id_min() -> EventId { +pub(crate) fn random_event_id_min() -> EventId { event_id_builder().with_min_event_height().build_fencepost() } // The EventId that is the maximum of all possible random event ids -fn random_event_id_max() -> EventId { +pub(crate) fn random_event_id_max() -> EventId { event_id_builder().with_max_event_height().build_fencepost() } -fn random_cid() -> Cid { +pub(crate) fn random_cid() -> Cid { let mut data = [0u8; 8]; rand::Rng::fill(&mut ::rand::thread_rng(), &mut data); let hash = MultihashDigest::digest(&Code::Sha2_256, &data); Cid::new_v1(0x00, hash) } -async fn build_car_file(count: usize) -> (Vec, Vec) { +pub(crate) async fn build_car_file(count: usize) -> (Vec, Vec) { let blocks: Vec = (0..count).map(|_| random_block()).collect(); let root = ipld!( { "links": blocks.iter().map(|block| Ipld::Link(block.cid)).collect::>(), @@ -92,7 +97,7 @@ async fn build_car_file(count: usize) -> (Vec, Vec) { (blocks, car) } -fn random_block() -> Block { +pub(crate) fn random_block() -> Block { let mut data = [0u8; 1024]; rand::Rng::fill(&mut ::rand::thread_rng(), &mut data); let hash = ::multihash::MultihashDigest::digest(&::multihash::Code::Sha2_256, &data); @@ -101,485 +106,3 @@ fn random_block() -> Block { data: data.to_vec().into(), } } - -#[test(tokio::test)] -async fn hash_range_query() { - let mut store = new_store().await; - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id( - Some(1), - Some("baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u"), - )), - ) - .await - .unwrap(); - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id( - Some(2), - Some("baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny"), - )), - ) - .await - .unwrap(); - let hash = recon::Store::hash_range(&mut store, &random_event_id_min(), &random_event_id_max()) - .await - .unwrap(); - expect!["65C7A25327CC05C19AB5812103EEB8D1156595832B453C7BAC6A186F4811FA0A#2"] - .assert_eq(&format!("{hash}")); -} - -#[test(tokio::test)] -async fn range_query() { - let mut store = new_store().await; - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id( - Some(1), - Some("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524"), - )), - ) - .await - .unwrap(); - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id( - Some(2), - Some("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty"), - )), - ) - .await - .unwrap(); - let ids = recon::Store::range( - &mut store, - &random_event_id_min(), - &random_event_id_max(), - 0, - usize::MAX, - ) - .await - .unwrap(); - expect![[r#" - [ - EventId { - network_id: Some( - 2, - ), - separator: Some( - "b51217a029eb540d", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - event_height: Some( - 1, - ), - cid: Some( - "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", - ), - }, - EventId { - network_id: Some( - 2, - ), - separator: Some( - "b51217a029eb540d", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - event_height: Some( - 2, - ), - cid: Some( - "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", - ), - }, - ] - "#]] - .assert_debug_eq(&ids.collect::>()); -} - -#[test(tokio::test)] -async fn range_query_with_values() { - let mut store = new_store().await; - // Write three keys, two with values and one without - let one_id = random_event_id( - Some(1), - Some("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524"), - ); - let two_id = random_event_id( - Some(2), - Some("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty"), - ); - let (_one_blocks, one_car) = build_car_file(2).await; - let (_two_blocks, two_car) = build_car_file(3).await; - recon::Store::insert(&mut store, ReconItem::new(&one_id, Some(&one_car))) - .await - .unwrap(); - recon::Store::insert(&mut store, ReconItem::new(&two_id, Some(&two_car))) - .await - .unwrap(); - // Insert new event without a value to ensure we skip it in the query - recon::Store::insert( - &mut store, - ReconItem::new( - &random_event_id( - Some(2), - Some("baeabeicyxeqioadjgy6v6cpy62a3gngylax54sds7rols2b67yetzaw5r4"), - ), - None, - ), - ) - .await - .unwrap(); - let values: Vec<(EventId, Vec)> = recon::Store::range_with_values( - &mut store, - &random_event_id_min(), - &random_event_id_max(), - 0, - usize::MAX, - ) - .await - .unwrap() - .collect(); - - assert_eq!(vec![(one_id, one_car), (two_id, two_car)], values); -} - -#[test(tokio::test)] -async fn double_insert() { - let mut store = new_store().await; - let id = random_event_id(Some(10), None); - - // first insert reports its a new key - expect![ - r#" - Ok( - true, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); - - // second insert of same key reports it already existed - expect![ - r#" - Ok( - false, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); -} - -#[test(tokio::test)] -async fn double_insert_with_value() { - let mut store = new_store().await; - let id = random_event_id(Some(10), None); - let (_, car) = build_car_file(2).await; - - let item = ReconItem::new_with_value(&id, &car); - - // do take the first one - expect![ - r#" - Ok( - true, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, item.clone()).await); - - // reject the second insert of same key with value. Do not override values - expect![[r#" - Err( - Database( - SqliteError { - code: 1555, - message: "UNIQUE constraint failed: store_block.cid", - }, - ), - ) - "#]] - .assert_debug_eq(&recon::Store::insert(&mut store, item).await); -} - -#[test(tokio::test)] -async fn update_missing_value() { - let mut store = new_store().await; - let id = random_event_id(Some(10), None); - let (_, car) = build_car_file(2).await; - - let item_without_value = ReconItem::new_key(&id); - let item_with_value = ReconItem::new_with_value(&id, &car); - - // do take the first one - expect![ - r#" - Ok( - true, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, item_without_value).await); - - // accept the second insert of same key with the value - expect![[r#" - Ok( - false, - ) - "#]] - .assert_debug_eq(&recon::Store::insert(&mut store, item_with_value).await); -} - -#[test(tokio::test)] -async fn first_and_last() { - let mut store = new_store().await; - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id( - Some(10), - Some("baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau"), - )), - ) - .await - .unwrap(); - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id( - Some(11), - Some("baeabeianftvrst5bja422dod6uf42pmwkwix6rprguanwsxylfut56e3ue"), - )), - ) - .await - .unwrap(); - - // Only one key in range - let ret = recon::Store::first_and_last( - &mut store, - &event_id_builder().with_event_height(9).build_fencepost(), - &event_id_builder().with_event_height(11).build_fencepost(), - ) - .await - .unwrap(); - expect![[r#" - Some( - ( - EventId { - network_id: Some( - 2, - ), - separator: Some( - "b51217a029eb540d", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - event_height: Some( - 10, - ), - cid: Some( - "baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau", - ), - }, - EventId { - network_id: Some( - 2, - ), - separator: Some( - "b51217a029eb540d", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - event_height: Some( - 10, - ), - cid: Some( - "baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau", - ), - }, - ), - ) - "#]] - .assert_debug_eq(&ret); - - // No keys in range - let ret = recon::Store::first_and_last( - &mut store, - &event_id_builder().with_event_height(12).build_fencepost(), - &event_id_builder().with_max_event_height().build_fencepost(), - ) - .await - .unwrap(); - expect![[r#" - None - "#]] - .assert_debug_eq(&ret); - - // Two keys in range - let ret = - recon::Store::first_and_last(&mut store, &random_event_id_min(), &random_event_id_max()) - .await - .unwrap(); - expect![[r#" - Some( - ( - EventId { - network_id: Some( - 2, - ), - separator: Some( - "b51217a029eb540d", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - event_height: Some( - 10, - ), - cid: Some( - "baeabeie2bcird7765t7646jcoatd72tfn2tscdaap7g6kvvy7k43s34aau", - ), - }, - EventId { - network_id: Some( - 2, - ), - separator: Some( - "b51217a029eb540d", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - event_height: Some( - 11, - ), - cid: Some( - "baeabeianftvrst5bja422dod6uf42pmwkwix6rprguanwsxylfut56e3ue", - ), - }, - ), - ) - "#]] - .assert_debug_eq(&ret); -} - -#[test(tokio::test)] -async fn store_value_for_key() { - let mut store = new_store().await; - let key = random_event_id(None, None); - let (_, store_value) = build_car_file(3).await; - recon::Store::insert( - &mut store, - ReconItem::new_with_value(&key, store_value.as_slice()), - ) - .await - .unwrap(); - let value = recon::Store::value_for_key(&mut store, &key) - .await - .unwrap() - .unwrap(); - assert_eq!(hex::encode(store_value), hex::encode(value)); -} -#[test(tokio::test)] -async fn keys_with_missing_value() { - let mut store = new_store().await; - let key = random_event_id( - Some(4), - Some("baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu"), - ); - recon::Store::insert(&mut store, ReconItem::new(&key, None)) - .await - .unwrap(); - let missing_keys = recon::Store::keys_with_missing_values( - &mut store, - (EventId::min_value(), EventId::max_value()).into(), - ) - .await - .unwrap(); - expect![[r#" - [ - EventId { - network_id: Some( - 2, - ), - separator: Some( - "b51217a029eb540d", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - event_height: Some( - 4, - ), - cid: Some( - "baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu", - ), - }, - ] - "#]] - .assert_debug_eq(&missing_keys); - - let (_, value) = build_car_file(2).await; - recon::Store::insert(&mut store, ReconItem::new(&key, Some(&value))) - .await - .unwrap(); - let missing_keys = recon::Store::keys_with_missing_values( - &mut store, - (EventId::min_value(), EventId::max_value()).into(), - ) - .await - .unwrap(); - expect![[r#" - [] - "#]] - .assert_debug_eq(&missing_keys); -} - -#[test(tokio::test)] -async fn read_value_as_block() { - let mut store = new_store().await; - let key = random_event_id(None, None); - let (blocks, store_value) = build_car_file(3).await; - recon::Store::insert( - &mut store, - ReconItem::new_with_value(&key, store_value.as_slice()), - ) - .await - .unwrap(); - let value = recon::Store::value_for_key(&mut store, &key) - .await - .unwrap() - .unwrap(); - assert_eq!(hex::encode(store_value), hex::encode(value)); - - // Read each block from the CAR - for block in blocks { - let value = iroh_bitswap::Store::get(&store, &block.cid).await.unwrap(); - assert_eq!(block, value); - } -}