From ae0d393157cdc028192cee94c3d6f3df61b72afd Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 14 Jun 2023 14:32:45 +0200 Subject: [PATCH] Run downloader in a separate thread --- src/main.rs | 14 ++- src/service/adapters/sqlite.rs | 136 +++++++++++++++------- src/service/app/commands/downloader.rs | 152 +++++++++++++++++++++++++ 3 files changed, 254 insertions(+), 48 deletions(-) create mode 100644 src/service/app/commands/downloader.rs diff --git a/src/main.rs b/src/main.rs index f2dadb4..4ad407e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,10 +10,9 @@ use crate::service::app; use crate::service::app::commands::implementation as commandsimpl; use crate::service::app::common; use crate::service::ports::http; -use service::adapters::sqlite::{ - self as sqliteadapters, SqliteConnection, SqliteConnectionAdapter, -}; -use std::borrow::Borrow; +use service::adapters::sqlite as sqliteadapters; +use service::app::commands::downloader::Downloader; +use std::thread; fn main() { let conn = sqlite::Connection::open("/tmp/db.sqlite").unwrap(); @@ -69,6 +68,13 @@ fn main() { let server = http::Server::new(&app); runner.run(&migrations).unwrap(); + + let downloader = Downloader::new(transaction_provider); + + thread::spawn(|| { + downloader.run().unwrap(); + }); + server.listen_and_serve(); } diff --git a/src/service/adapters/sqlite.rs b/src/service/adapters/sqlite.rs index 45d099e..eb09e7e 100644 --- a/src/service/adapters/sqlite.rs +++ b/src/service/adapters/sqlite.rs @@ -3,7 +3,8 @@ use crate::migrations; use crate::service::app::common; use crate::service::domain; use sqlite; -use std::borrow::Borrow; +use sqlite::State; +use std::sync::{Arc,Mutex,MutexGuard}; pub type AdaptersFactoryFn = dyn Fn(T) -> A; @@ -13,22 +14,42 @@ pub struct TransactionProvider<'a, T, A> { factory_fn: Box>, } -impl<'a, T, A> TransactionProvider<'_, T, A> -where - T: SqliteConnection, -{ - pub fn new( - conn: &'a T, - factory_fn: Box>, - ) -> TransactionProvider<'a, T, A> - where - T: Borrow, - TA: SqliteConnection, - { - TransactionProvider { - //conn: Mutex::new(conn), - conn, - factory_fn, +impl TransactionProvider { + pub fn new(conn: SqliteConnectionAdapter) -> TransactionProvider { + TransactionProvider { conn } + } + + fn new_adapters(&self) -> common::Adapters { + let registrations = Box::new(RegistrationRepository::new(self.conn.clone())); + let events = Box::new(EventRepository::new(self.conn.clone())); + common::Adapters::new(registrations, events) + } +} + +impl common::TransactionProvider for TransactionProvider { + fn start_transaction(&self) -> Result> { + let conn = self.conn.clone(); + + conn.get()?.execute("BEGIN TRANSACTION")?; + + let adapters = self.new_adapters(); + let t = Transaction::new(self.conn.clone(), adapters); + Ok(Box::new(t)) + } +} + +struct Transaction { + _conn: SqliteConnectionAdapter, + adapters: common::Adapters, + _commited: bool, +} + +impl Transaction { + fn new(conn: SqliteConnectionAdapter, adapters: common::Adapters) -> Self { + Self { + _conn: conn, + adapters, + _commited: false, } } } @@ -80,7 +101,9 @@ where fn save(&self, registration: domain::Registration) -> Result<()> { let hex_public_key = registration.pub_key().hex(); - let mut statement = self.conn.prepare( + let conn = self.conn.get()?; + + let mut statement = conn.prepare( "INSERT OR REPLACE INTO registration(public_key, apns_token, locale) VALUES (:public_key, :apns_token, :locale) @@ -108,6 +131,40 @@ where Ok(()) } + + fn get_relays(&self) -> Result> { + let conn = self.conn.get()?; + let query = "SELECT address FROM relays GROUP BY address"; + let mut statement = conn.prepare(query)?; + + let mut relay_addresses = Vec::new(); + + while let Ok(State::Row) = statement.next() { + let address_string = statement.read::("address")?; + let relay_address = domain::RelayAddress::new(address_string)?; + relay_addresses.push(relay_address); + } + + Ok(relay_addresses) + } + + fn get_pub_keys(&self, address: domain::RelayAddress) -> Result> { + let conn = self.conn.get()?; + let query = "SELECT public_key FROM relays WHERE address = :address"; + let mut statement = conn.prepare(query)?; + statement.bind((":address", address.as_ref()))?; + + let mut results = Vec::new(); + + while let Ok(State::Row) = statement.next() { + let public_key_string = statement.read::("public_key")?; + let pub_key = domain::PubKey::new_from_hex(public_key_string.as_ref())?; + let pub_key_info = common::PubKeyInfo::new(pub_key); + results.push(pub_key_info); + } + + Ok(results) + } } pub struct RegistrationRepositoryMigration0001 { @@ -132,7 +189,9 @@ where T: SqliteConnection, { fn run(&self) -> Result<()> { - self.conn.execute( + let conn = self.conn.get()?; + + conn.execute( "CREATE TABLE registration ( public_key TEXT, apns_token TEXT, @@ -140,7 +199,7 @@ where PRIMARY KEY (public_key) )", )?; - self.conn.execute( + conn.execute( "CREATE TABLE relays ( public_key TEXT, address TEXT, @@ -152,30 +211,16 @@ where } } -pub trait SqliteConnection { - fn execute>(&self, statement: T) -> sqlite::Result<()>; - fn prepare>(&self, statement: T) -> sqlite::Result>; -} - -pub struct SqliteConnectionAdapter(pub sqlite::Connection); - -impl SqliteConnection for SqliteConnectionAdapter { - fn execute>(&self, statement: T) -> sqlite::Result<()> { - self.0.execute(statement) - } +#[derive(Clone)] +pub struct SqliteConnectionAdapter(Arc>); - fn prepare>(&self, statement: T) -> sqlite::Result> { - self.0.prepare(statement) +impl SqliteConnectionAdapter { + pub fn new(conn: sqlite::Connection) -> Self { + Self(Arc::new(Mutex::new(conn))) } -} -impl SqliteConnection for &SqliteConnectionAdapter { - fn execute>(&self, statement: T) -> sqlite::Result<()> { - self.0.execute(statement) - } - - fn prepare>(&self, statement: T) -> sqlite::Result> { - self.0.prepare(statement) + pub fn get(&self) -> Result>{ + Ok(self.0.lock()?) } } @@ -197,7 +242,7 @@ where status TEXT, PRIMARY KEY (name) );"; - conn.execute(query)?; + conn.get()?.execute(query)?; Ok(MigrationStatusRepository { conn }) } @@ -210,7 +255,9 @@ where fn get_status(&self, name: &str) -> Result> { let query = "SELECT status FROM migration_status WHERE name = :name LIMIT 1"; - let mut statement = self.conn.prepare(query)?; + let conn= self.conn.get()?; + let mut statement = conn.prepare(query)?; + statement.bind((":name", name))?; if let Ok(sqlite::State::Row) = statement.next() { @@ -224,7 +271,8 @@ where fn save_status(&self, name: &str, status: migrations::Status) -> Result<()> { let persisted_status = status_to_persisted(&status); - let mut statement = self.conn.prepare( + let conn = self.conn.get()?; + let mut statement = conn.prepare( "INSERT OR REPLACE INTO migration_status(name, status) VALUES (:name, :status) diff --git a/src/service/app/commands/downloader.rs b/src/service/app/commands/downloader.rs new file mode 100644 index 0000000..39c8920 --- /dev/null +++ b/src/service/app/commands/downloader.rs @@ -0,0 +1,152 @@ +use crate::errors::Result; +use crate::service::app::common; + +pub struct Downloader { + _transaction_provider: T, + //relay_downloaders: HashMap>, +} + +impl Downloader { + pub fn new(transaction_provider: T) -> Self { + Self { + _transaction_provider: transaction_provider, + } + } +} + +impl Downloader +where + T: common::TransactionProvider, +{ + pub fn run(&mut self) -> Result<()> { + let transaction = self._transaction_provider.start_transaction()?; + + let adapters = transaction.adapters(); + let registrations = adapters.registrations.borrow(); + + let initial_relays = registrations.get_relays()?; + + //let initial_relays: Vec = self + // .transaction_provider + // .transact(|adapters| adapters.registrations.get_relays()) + // .unwrap(); // todo don't unwrap + + for _initial_relay in initial_relays { + //let v = RelayDownloader::new(initial_relay.clone(), self.transaction_provider); + //self.relay_downloaders.insert(initial_relay.clone(), v); + } + + // todo update the list in a loop + + Err("not implemented".into()) + } +} + +//pub struct RelayDownloader { +// transaction_provider: T, +// r1: PhantomData, +// r2: PhantomData, +// +// relay: domain::RelayAddress, +// +// //run_join: thread::JoinHandle<()>, +// tx: mpsc::Sender<()>, +//} +// +//impl RelayDownloader { +// fn new(relay: domain::RelayAddress, transaction_provider: T) -> RelayDownloader +// where +// R1: common::RegistrationRepository, +// R2: common::EventRepository, +// T: common::TransactionProvider, +// { +// let (tx, rx) = mpsc::channel::<()>(); +// +// //let h = thread::spawn(|| { +// // Self::run(relay, transaction_provider, rx); +// //}); +// +// let v = RelayDownloader { +// transaction_provider, +// r1: PhantomData {}, +// r2: PhantomData {}, +// relay, +// //run_join: h, +// tx, +// }; +// +// return v; +// } +// +// fn run(relay: domain::RelayAddress, transaction_provider: T, rx: mpsc::Receiver<()>) { +// let initial_pub_keys: Vec = Vec::new(); +// +// //let c = |adapters: Adapters| -> Result<()> { +// // let relays = adapters.registrations.get_relays()?; +// +// // for relay in relays { +// // initial_relays.push(relay); +// // } +// +// // return Ok(()); +// //}; +// +// //for initial_relay in initial_relays { +// //} +// +// //self.transaction_provider.transact(&c); +// } +//} + +//impl Drop for RelayDownloader +//where +// T: common::TransactionProvider, R>, +// R1: common::RegistrationRepository, +// R2: common::EventRepository, +//{ +// fn drop(&mut self) { +// self.tx.send(()); +// self.run_join.join(); +// } +//} + +#[cfg(test)] +mod tests { + use super::*; + use crate::service::app::common; + + #[test] + fn it_works() -> Result<()> { + let transaction_provider = TransactionProviderMock::new(); + let mut downloader = Downloader::new(transaction_provider); + match downloader.run() { + Ok(_) => return Err("should have failed".into()), + Err(_) => return Ok(()), + }; + + //match APNSToken::new(String::from("")) { + // Ok(_) => panic!("constructor should have returned an error"), + // Err(error) => assert_eq!(error.to_string(), String::from("empty token")), + //} + } + + //fn new_sqlite() -> Result { + // //let conn = SqliteConnectionAdapter::new(sqlite::open(":memory:")?); + // //RegistrationRepositoryMigration0001::new(conn.clone()).run()?; + // Ok(conn) + //} + + struct TransactionProviderMock {} + + impl TransactionProviderMock { + pub fn new() -> Self { + TransactionProviderMock {} + } + } + + impl common::TransactionProvider for TransactionProviderMock { + fn start_transaction(&self) -> Result> { + Err("not implemented".into()) + } + } +}