Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Commit

Permalink
Run downloader in a separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Jun 14, 2023
1 parent 3eed637 commit 9ca863b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
8 changes: 8 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::service::app::commands::implementation as commandsimpl;
use crate::service::ports::http;
use service::adapters::sqlite as sqliteadapters;
use service::app::commands::downloader::Downloader;
use std::thread;

fn main() {
let conn = sqlite::Connection::open("/tmp/db.sqlite").unwrap();
Expand Down Expand Up @@ -51,6 +52,13 @@ fn main() {
let _downloader = Downloader::new(transaction_provider);

runner.run(&migrations).unwrap();

let downloader = Downloader::new(transaction_provider);

thread::spawn(|| {
downloader.run().unwrap();
});

server.listen_and_serve();
}

Expand Down
35 changes: 21 additions & 14 deletions src/service/adapters/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use crate::service::app::common;
use crate::service::domain;
use sqlite;
use sqlite::State;
use std::cell::Ref;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc,Mutex,MutexGuard};

pub struct TransactionProvider {
conn: SqliteConnectionAdapter,
Expand All @@ -28,7 +26,7 @@ impl common::TransactionProvider for TransactionProvider {
fn start_transaction(&self) -> Result<Box<dyn common::Transaction>> {
let conn = self.conn.clone();

conn.0.borrow().execute("BEGIN TRANSACTION")?;
conn.get()?.execute("BEGIN TRANSACTION")?;

let adapters = self.new_adapters();
let t = Transaction::new(self.conn.clone(), adapters);
Expand Down Expand Up @@ -86,7 +84,7 @@ impl common::RegistrationRepository for RegistrationRepository {
fn save(&self, registration: &domain::Registration) -> Result<()> {
let hex_public_key = registration.pub_key().hex();

let conn = self.conn.0.borrow();
let conn = self.conn.get()?;

let mut statement = conn.prepare(
"INSERT OR REPLACE INTO
Expand Down Expand Up @@ -116,7 +114,7 @@ impl common::RegistrationRepository for RegistrationRepository {
}

fn get_relays(&self) -> Result<Vec<domain::RelayAddress>> {
let conn = self.conn.0.borrow();
let conn = self.conn.get()?;
let query = "SELECT address FROM relays GROUP BY address";
let mut statement = conn.prepare(query)?;

Expand All @@ -132,7 +130,7 @@ impl common::RegistrationRepository for RegistrationRepository {
}

fn get_pub_keys(&self, address: domain::RelayAddress) -> Result<Vec<common::PubKeyInfo>> {
let conn = self.conn.0.borrow();
let conn = self.conn.get()?;
let query = "SELECT public_key FROM relays WHERE address = :address";
let mut statement = conn.prepare(query)?;
statement.bind((":address", address.as_ref()))?;
Expand Down Expand Up @@ -162,15 +160,18 @@ impl RegistrationRepositoryMigration0001 {

impl migrations::MigrationCallable for RegistrationRepositoryMigration0001 {
fn run(&self) -> Result<()> {
self.conn.0.borrow().execute(
let conn = self.conn.get()?;

conn.execute(
"CREATE TABLE registration (
public_key TEXT,
apns_token TEXT,
locale TEXT,
PRIMARY KEY (public_key)
)",
)?;
self.conn.0.borrow().execute(

conn.execute(
"CREATE TABLE relays (
public_key TEXT,
address TEXT,
Expand All @@ -197,11 +198,15 @@ impl<T> common::EventRepository for EventRepository<T> {
}

#[derive(Clone)]
pub struct SqliteConnectionAdapter(pub Rc<RefCell<sqlite::Connection>>);
pub struct SqliteConnectionAdapter(Arc<Mutex<sqlite::Connection>>);

impl SqliteConnectionAdapter {
pub fn new(conn: sqlite::Connection) -> Self {
Self(Rc::new(RefCell::new(conn)))
Self(Arc::new(Mutex::new(conn)))
}

pub fn get(&self) -> Result<MutexGuard<sqlite::Connection>>{
Ok(self.0.lock()?)
}
}

Expand All @@ -216,7 +221,8 @@ impl MigrationStatusRepository {
status TEXT,
PRIMARY KEY (name)
);";
conn.0.borrow().execute(query)?;

conn.get()?.execute(query)?;

Ok(MigrationStatusRepository { conn })
}
Expand All @@ -226,8 +232,9 @@ impl migrations::StatusRepository for MigrationStatusRepository {
fn get_status(&self, name: &str) -> Result<Option<migrations::Status>> {
let query = "SELECT status FROM migration_status WHERE name = :name LIMIT 1";

let conn: Ref<sqlite::Connection> = self.conn.0.borrow();
let conn= self.conn.get()?;
let mut statement = conn.prepare(query)?;

statement.bind((":name", name))?;

if let Ok(sqlite::State::Row) = statement.next() {
Expand All @@ -241,7 +248,7 @@ impl migrations::StatusRepository for MigrationStatusRepository {
fn save_status(&self, name: &str, status: migrations::Status) -> Result<()> {
let persisted_status = status_to_persisted(&status);

let conn: Ref<sqlite::Connection> = self.conn.0.borrow();
let conn = self.conn.get()?;
let mut statement = conn.prepare(
"INSERT OR REPLACE INTO
migration_status(name, status)
Expand Down
4 changes: 2 additions & 2 deletions src/service/app/commands/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<T> Downloader<T>
where
T: common::TransactionProvider,
{
fn _run(&mut self) -> Result<()> {
pub fn run(&mut self) -> Result<()> {
let transaction = self._transaction_provider.start_transaction()?;

let adapters = transaction.adapters();
Expand Down Expand Up @@ -119,7 +119,7 @@ mod tests {
fn it_works() -> Result<()> {
let transaction_provider = TransactionProviderMock::new();
let mut downloader = Downloader::new(transaction_provider);
match downloader._run() {
match downloader.run() {
Ok(_) => return Err("should have failed".into()),
Err(_) => return Ok(()),
};
Expand Down

0 comments on commit 9ca863b

Please sign in to comment.