Skip to content

Commit

Permalink
fix: refactor recon storage and remove sqlx dependency from recon and…
Browse files Browse the repository at this point in the history
… core crates (#254)

* fix: remove sqlx dependency from recon and core crates

Modified the store implementations to be interest/model specific as they have different requirements.
Interests don't have values, so those requests no-op (they error if you specify a value other than []).
Removed the sort_key column from the tables as its purpose was interest/model differentiation.
Renamed the tables accordingly and adjusted the queries.

* fix: remove sort_key from model_block table as well

* fix: Make model_block PK (cid,key)

it's (probably) possible to have keys point to the same CID, so make sure they are different rows

* fix: interest range_with_values query was not updated

In current feed PR, the API was modified to use range instead of range_with_values in this case, but it needed to be fixed here too

* chore: review clean up

* fix: interest store always has a value for a key, not never

---------

Co-authored-by: David Estes <[email protected]>
  • Loading branch information
dav1do and dav1do authored Feb 3, 2024
1 parent e4dc833 commit d5bbf9a
Show file tree
Hide file tree
Showing 17 changed files with 1,494 additions and 1,534 deletions.
75 changes: 51 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ serde_bytes.workspace = true
serde_ipld_dagcbor.workspace = true
serde_json.workspace = true
ssi.workspace = true
sqlx.workspace = true
unsigned-varint.workspace = true

[dev-dependencies]
Expand Down
2 changes: 0 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod jws;
mod network;
mod range;
mod signer;
mod sql;
mod stream_id;

pub use bytes::Bytes;
Expand All @@ -20,7 +19,6 @@ pub use jws::{Jws, JwsSignature};
pub use network::Network;
pub use range::RangeOpen;
pub use signer::{JwkSigner, Signer};
pub use sql::{DbTx, SqlitePool};
pub use stream_id::{StreamId, StreamIdType};

pub use cid::Cid;
Expand Down
2 changes: 1 addition & 1 deletion one/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use ceramic_core::SqlitePool;
use ceramic_p2p::SQLiteBlockStore;
use ceramic_store::SqlitePool;
use chrono::{SecondsFormat, Utc};
use cid::{multibase, multihash, Cid};
use clap::{Args, Subcommand};
Expand Down
15 changes: 6 additions & 9 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod network;
use std::{env, num::NonZeroUsize, path::PathBuf, time::Duration};

use anyhow::{anyhow, Result};
use ceramic_core::{EventId, Interest, PeerId, SqlitePool};
use ceramic_core::{EventId, Interest, PeerId};
use ceramic_kubo_rpc::Multiaddr;

use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
Expand All @@ -18,10 +18,7 @@ use clap::{Args, Parser, Subcommand, ValueEnum};
use futures::StreamExt;
use multibase::Base;
use multihash::{Code, Hasher, Multihash, MultihashDigest};
use recon::{
FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a,
StoreMetricsMiddleware,
};
use recon::{FullInterests, Recon, ReconInterestProvider, Server, Sha256a, StoreMetricsMiddleware};
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext};
Expand Down Expand Up @@ -283,12 +280,12 @@ pub async fn run() -> Result<()> {
}
}

type InterestStore = SQLiteStore<Interest, Sha256a>;
type InterestStore = ceramic_store::InterestStore<Sha256a>;
type InterestInterest = FullInterests<Interest>;
type ReconInterest =
Server<Interest, Sha256a, StoreMetricsMiddleware<InterestStore>, InterestInterest>;

type ModelStore = ceramic_store::Store;
type ModelStore = ceramic_store::ModelStore<Sha256a>;
type ModelInterest = ReconInterestProvider<Sha256a>;
type ReconModel = Server<EventId, Sha256a, StoreMetricsMiddleware<ModelStore>, ModelInterest>;

Expand Down Expand Up @@ -424,14 +421,14 @@ impl Daemon {

// Connect to sqlite
let sql_db_path: PathBuf = dir.join("db.sqlite3");
let sql_pool = SqlitePool::connect(&sql_db_path).await?;
let sql_pool = ceramic_store::SqlitePool::connect(&sql_db_path).await?;

// Create recon metrics
let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register);

// Create recon store for interests.
let interest_store = StoreMetricsMiddleware::new(
InterestStore::new(sql_pool.clone(), "interest".to_string()).await?,
InterestStore::new(sql_pool.clone()).await?,
recon_metrics.clone(),
);

Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ backoff.workspace = true
bytes.workspace = true
ceramic-core.workspace = true
ceramic-metrics.workspace = true
ceramic-store.workspace = true
cid.workspace = true
expect-test.workspace = true
futures-util.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,8 @@ mod tests {
use crate::keys::Keypair;

use async_trait::async_trait;
use ceramic_core::{RangeOpen, SqlitePool};
use ceramic_core::RangeOpen;
use ceramic_store::SqlitePool;
use futures::TryStreamExt;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
Expand Down Expand Up @@ -1389,7 +1390,7 @@ mod tests {
rpc_server_addr,
keypair.into(),
None::<(DummyRecon<Interest>, DummyRecon<EventId>)>,
ceramic_store::Store::new(sql_pool).await?,
ceramic_store::ModelStore::new(sql_pool).await?,
metrics,
)
.await?;
Expand Down Expand Up @@ -1421,7 +1422,7 @@ mod tests {
.await
.context("timed out before getting a listening address for the node")??;
let mut dial_addr = addr.clone();
dial_addr.push(Protocol::P2p(peer_id.into()));
dial_addr.push(Protocol::P2p(peer_id));
Ok(TestRunner {
task,
client,
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/sqliteblockstore.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use ceramic_core::SqlitePool;
use ceramic_store::SqlitePool;
use cid::{
multihash::Code::{Keccak256, Sha2_256},
multihash::MultihashDigest,
Expand Down Expand Up @@ -226,7 +226,7 @@ mod tests {
use crate::SQLiteBlockStore;
use anyhow::Error;
use bytes::Bytes;
use ceramic_core::SqlitePool;
use ceramic_store::SqlitePool;
use cid::{Cid, CidGeneric};
use expect_test::expect;
use iroh_bitswap::Store;
Expand Down
1 change: 0 additions & 1 deletion recon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ multihash.workspace = true
prometheus-client.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlx.workspace = true
tokio.workspace = true
tracing.workspace = true
void.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ pub use crate::{
client::{Client, Server},
metrics::Metrics,
recon::{
btreestore::BTreeStore, sqlitestore::SQLiteStore, store_metrics::StoreMetricsMiddleware,
AssociativeHash, FullInterests, HashCount, InsertResult, InterestProvider, Key, Range,
Recon, ReconInterestProvider, ReconItem, Store, SyncState,
btreestore::BTreeStore, store_metrics::StoreMetricsMiddleware, AssociativeHash,
FullInterests, HashCount, InsertResult, InterestProvider, Key, Range, Recon,
ReconInterestProvider, ReconItem, Store, SyncState,
},
sha256a::Sha256a,
};
Expand Down
11 changes: 10 additions & 1 deletion recon/src/recon.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod btreestore;
pub mod sqlitestore;
pub mod store_metrics;
#[cfg(test)]
pub mod tests;
Expand Down Expand Up @@ -348,6 +347,16 @@ impl<H> HashCount<H> {
pub fn new(hash: H, count: u64) -> Self {
Self { hash, count }
}

/// The hash of the values.
pub fn hash(&self) -> &H {
&self.hash
}

/// The number of values that produced the hash.
pub fn count(&self) -> u64 {
self.count
}
}

impl<H> std::fmt::Debug for HashCount<H>
Expand Down
Loading

0 comments on commit d5bbf9a

Please sign in to comment.