Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Commit

Permalink
Run downloader in a separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Jun 14, 2023
1 parent 8936eb4 commit ae0d393
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 48 deletions.
14 changes: 10 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ use crate::service::app;
use crate::service::app::commands::implementation as commandsimpl;
use crate::service::app::common;
use crate::service::ports::http;
use service::adapters::sqlite::{
self as sqliteadapters, SqliteConnection, SqliteConnectionAdapter,
};
use std::borrow::Borrow;
use service::adapters::sqlite as sqliteadapters;
use service::app::commands::downloader::Downloader;
use std::thread;

fn main() {
let conn = sqlite::Connection::open("/tmp/db.sqlite").unwrap();
Expand Down Expand Up @@ -69,6 +68,13 @@ fn main() {
let server = http::Server::new(&app);

runner.run(&migrations).unwrap();

let downloader = Downloader::new(transaction_provider);

thread::spawn(|| {
downloader.run().unwrap();
});

server.listen_and_serve();
}

Expand Down
136 changes: 92 additions & 44 deletions src/service/adapters/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::migrations;
use crate::service::app::common;
use crate::service::domain;
use sqlite;
use std::borrow::Borrow;
use sqlite::State;
use std::sync::{Arc,Mutex,MutexGuard};

pub type AdaptersFactoryFn<T, A> = dyn Fn(T) -> A;

Expand All @@ -13,22 +14,42 @@ pub struct TransactionProvider<'a, T, A> {
factory_fn: Box<AdaptersFactoryFn<&'a T, A>>,
}

impl<'a, T, A> TransactionProvider<'_, T, A>
where
T: SqliteConnection,
{
pub fn new<TA>(
conn: &'a T,
factory_fn: Box<AdaptersFactoryFn<&'a T, A>>,
) -> TransactionProvider<'a, T, A>
where
T: Borrow<TA>,
TA: SqliteConnection,
{
TransactionProvider {
//conn: Mutex::new(conn),
conn,
factory_fn,
impl TransactionProvider {
pub fn new(conn: SqliteConnectionAdapter) -> TransactionProvider {
TransactionProvider { conn }
}

fn new_adapters(&self) -> common::Adapters {
let registrations = Box::new(RegistrationRepository::new(self.conn.clone()));
let events = Box::new(EventRepository::new(self.conn.clone()));
common::Adapters::new(registrations, events)
}
}

impl common::TransactionProvider for TransactionProvider {
fn start_transaction(&self) -> Result<Box<dyn common::Transaction>> {
let conn = self.conn.clone();

conn.get()?.execute("BEGIN TRANSACTION")?;

let adapters = self.new_adapters();
let t = Transaction::new(self.conn.clone(), adapters);
Ok(Box::new(t))
}
}

struct Transaction {
_conn: SqliteConnectionAdapter,
adapters: common::Adapters,
_commited: bool,
}

impl Transaction {
fn new(conn: SqliteConnectionAdapter, adapters: common::Adapters) -> Self {
Self {
_conn: conn,
adapters,
_commited: false,
}
}
}
Expand Down Expand Up @@ -80,7 +101,9 @@ where
fn save(&self, registration: domain::Registration) -> Result<()> {
let hex_public_key = registration.pub_key().hex();

let mut statement = self.conn.prepare(
let conn = self.conn.get()?;

let mut statement = conn.prepare(
"INSERT OR REPLACE INTO
registration(public_key, apns_token, locale)
VALUES (:public_key, :apns_token, :locale)
Expand Down Expand Up @@ -108,6 +131,40 @@ where

Ok(())
}

fn get_relays(&self) -> Result<Vec<domain::RelayAddress>> {
let conn = self.conn.get()?;
let query = "SELECT address FROM relays GROUP BY address";
let mut statement = conn.prepare(query)?;

let mut relay_addresses = Vec::new();

while let Ok(State::Row) = statement.next() {
let address_string = statement.read::<String, _>("address")?;
let relay_address = domain::RelayAddress::new(address_string)?;
relay_addresses.push(relay_address);
}

Ok(relay_addresses)
}

fn get_pub_keys(&self, address: domain::RelayAddress) -> Result<Vec<common::PubKeyInfo>> {
let conn = self.conn.get()?;
let query = "SELECT public_key FROM relays WHERE address = :address";
let mut statement = conn.prepare(query)?;
statement.bind((":address", address.as_ref()))?;

let mut results = Vec::new();

while let Ok(State::Row) = statement.next() {
let public_key_string = statement.read::<String, _>("public_key")?;
let pub_key = domain::PubKey::new_from_hex(public_key_string.as_ref())?;
let pub_key_info = common::PubKeyInfo::new(pub_key);
results.push(pub_key_info);
}

Ok(results)
}
}

pub struct RegistrationRepositoryMigration0001<T> {
Expand All @@ -132,15 +189,17 @@ where
T: SqliteConnection,
{
fn run(&self) -> Result<()> {
self.conn.execute(
let conn = self.conn.get()?;

conn.execute(
"CREATE TABLE registration (
public_key TEXT,
apns_token TEXT,
locale TEXT,
PRIMARY KEY (public_key)
)",
)?;
self.conn.execute(
conn.execute(
"CREATE TABLE relays (
public_key TEXT,
address TEXT,
Expand All @@ -152,30 +211,16 @@ where
}
}

pub trait SqliteConnection {
fn execute<T: AsRef<str>>(&self, statement: T) -> sqlite::Result<()>;
fn prepare<T: AsRef<str>>(&self, statement: T) -> sqlite::Result<sqlite::Statement<'_>>;
}

pub struct SqliteConnectionAdapter(pub sqlite::Connection);

impl SqliteConnection for SqliteConnectionAdapter {
fn execute<T: AsRef<str>>(&self, statement: T) -> sqlite::Result<()> {
self.0.execute(statement)
}
#[derive(Clone)]
pub struct SqliteConnectionAdapter(Arc<Mutex<sqlite::Connection>>);

fn prepare<T: AsRef<str>>(&self, statement: T) -> sqlite::Result<sqlite::Statement<'_>> {
self.0.prepare(statement)
impl SqliteConnectionAdapter {
pub fn new(conn: sqlite::Connection) -> Self {
Self(Arc::new(Mutex::new(conn)))
}
}

impl SqliteConnection for &SqliteConnectionAdapter {
fn execute<T: AsRef<str>>(&self, statement: T) -> sqlite::Result<()> {
self.0.execute(statement)
}

fn prepare<T: AsRef<str>>(&self, statement: T) -> sqlite::Result<sqlite::Statement<'_>> {
self.0.prepare(statement)
pub fn get(&self) -> Result<MutexGuard<sqlite::Connection>>{
Ok(self.0.lock()?)
}
}

Expand All @@ -197,7 +242,7 @@ where
status TEXT,
PRIMARY KEY (name)
);";
conn.execute(query)?;
conn.get()?.execute(query)?;

Ok(MigrationStatusRepository { conn })
}
Expand All @@ -210,7 +255,9 @@ where
fn get_status(&self, name: &str) -> Result<Option<migrations::Status>> {
let query = "SELECT status FROM migration_status WHERE name = :name LIMIT 1";

let mut statement = self.conn.prepare(query)?;
let conn= self.conn.get()?;
let mut statement = conn.prepare(query)?;

statement.bind((":name", name))?;

if let Ok(sqlite::State::Row) = statement.next() {
Expand All @@ -224,7 +271,8 @@ where
fn save_status(&self, name: &str, status: migrations::Status) -> Result<()> {
let persisted_status = status_to_persisted(&status);

let mut statement = self.conn.prepare(
let conn = self.conn.get()?;
let mut statement = conn.prepare(
"INSERT OR REPLACE INTO
migration_status(name, status)
VALUES (:name, :status)
Expand Down
152 changes: 152 additions & 0 deletions src/service/app/commands/downloader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use crate::errors::Result;
use crate::service::app::common;

pub struct Downloader<T> {
_transaction_provider: T,
//relay_downloaders: HashMap<domain::RelayAddress, RelayDownloader<T, R1, R2>>,
}

impl<T> Downloader<T> {
pub fn new(transaction_provider: T) -> Self {
Self {
_transaction_provider: transaction_provider,
}
}
}

impl<T> Downloader<T>
where
T: common::TransactionProvider,
{
pub fn run(&mut self) -> Result<()> {
let transaction = self._transaction_provider.start_transaction()?;

let adapters = transaction.adapters();
let registrations = adapters.registrations.borrow();

let initial_relays = registrations.get_relays()?;

//let initial_relays: Vec<domain::RelayAddress> = self
// .transaction_provider
// .transact(|adapters| adapters.registrations.get_relays())
// .unwrap(); // todo don't unwrap

for _initial_relay in initial_relays {
//let v = RelayDownloader::new(initial_relay.clone(), self.transaction_provider);
//self.relay_downloaders.insert(initial_relay.clone(), v);
}

// todo update the list in a loop

Err("not implemented".into())
}
}

//pub struct RelayDownloader<T, R1, R2> {
// transaction_provider: T,
// r1: PhantomData<R1>,
// r2: PhantomData<R2>,
//
// relay: domain::RelayAddress,
//
// //run_join: thread::JoinHandle<()>,
// tx: mpsc::Sender<()>,
//}
//
//impl<T, R1, R2> RelayDownloader<T, R1, R2> {
// fn new<R>(relay: domain::RelayAddress, transaction_provider: T) -> RelayDownloader<T, R1, R2>
// where
// R1: common::RegistrationRepository,
// R2: common::EventRepository,
// T: common::TransactionProvider,
// {
// let (tx, rx) = mpsc::channel::<()>();
//
// //let h = thread::spawn(|| {
// // Self::run(relay, transaction_provider, rx);
// //});
//
// let v = RelayDownloader {
// transaction_provider,
// r1: PhantomData {},
// r2: PhantomData {},
// relay,
// //run_join: h,
// tx,
// };
//
// return v;
// }
//
// fn run(relay: domain::RelayAddress, transaction_provider: T, rx: mpsc::Receiver<()>) {
// let initial_pub_keys: Vec<domain::RelayAddress> = Vec::new();
//
// //let c = |adapters: Adapters<R1, R2>| -> Result<()> {
// // let relays = adapters.registrations.get_relays()?;
//
// // for relay in relays {
// // initial_relays.push(relay);
// // }
//
// // return Ok(());
// //};
//
// //for initial_relay in initial_relays {
// //}
//
// //self.transaction_provider.transact(&c);
// }
//}

//impl<T, R1, R2> Drop for RelayDownloader<T, R1, R2>
//where
// T: common::TransactionProvider<common::Adapters<R1, R2>, R>,
// R1: common::RegistrationRepository,
// R2: common::EventRepository,
//{
// fn drop(&mut self) {
// self.tx.send(());
// self.run_join.join();
// }
//}

#[cfg(test)]
mod tests {
use super::*;
use crate::service::app::common;

#[test]
fn it_works() -> Result<()> {
let transaction_provider = TransactionProviderMock::new();
let mut downloader = Downloader::new(transaction_provider);
match downloader.run() {
Ok(_) => return Err("should have failed".into()),
Err(_) => return Ok(()),
};

//match APNSToken::new(String::from("")) {
// Ok(_) => panic!("constructor should have returned an error"),
// Err(error) => assert_eq!(error.to_string(), String::from("empty token")),
//}
}

//fn new_sqlite() -> Result<SqliteConnectionAdapter> {
// //let conn = SqliteConnectionAdapter::new(sqlite::open(":memory:")?);
// //RegistrationRepositoryMigration0001::new(conn.clone()).run()?;
// Ok(conn)
//}

struct TransactionProviderMock {}

impl TransactionProviderMock {
pub fn new() -> Self {
TransactionProviderMock {}
}
}

impl common::TransactionProvider for TransactionProviderMock {
fn start_transaction(&self) -> Result<Box<dyn common::Transaction>> {
Err("not implemented".into())
}
}
}

0 comments on commit ae0d393

Please sign in to comment.