diff --git a/src/maker/api.rs b/src/maker/api.rs index e73caf3a..da86dfb5 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -803,95 +803,101 @@ pub(crate) fn recover_from_swap( // Check for contract confirmations and broadcast timelocked transaction let mut timelock_boardcasted = Vec::new(); - while !maker.shutdown.load(Relaxed) { - for ((outgoing_reedemscript, contract), (timelock, timelocked_tx)) in outgoings.iter() { - // We have already broadcasted this tx, so skip - if timelock_boardcasted.contains(&timelocked_tx) { - continue; - } - // Check if the contract tx has reached required maturity - // Failure here means the transaction hasn't been broadcasted yet. So do nothing and try again. - let tx_from_chain = if let Ok(result) = maker - .wallet - .read()? - .rpc - .get_raw_transaction_info(&contract.compute_txid(), None) - { - log::info!( - "[{}] Contract Txid : {} reached confirmation : {:?}, Required Confirmation : {}", - maker.config.network_port, - contract.compute_txid(), - result.confirmations, - timelock - ); - result - } else { - continue; - }; - - if let Some(confirmation) = tx_from_chain.confirmations { - // Now the transaction is confirmed in a block, check for required maturity - if confirmation > (*timelock as u32) { - log::info!( - "[{}] Timelock maturity of {} blocks reached for Contract Txid : {}", - maker.config.network_port, - timelock, - contract.compute_txid() - ); - log::info!( - "[{}] Broadcasting timelocked tx: {}", - maker.config.network_port, - timelocked_tx.compute_txid() - ); - maker - .wallet - .read()? - .rpc - .send_raw_transaction(timelocked_tx) - .map_err(WalletError::Rpc)?; - timelock_boardcasted.push(timelocked_tx); + let trigger_count = if cfg!(feature = "integration-test") { + 10 / HEART_BEAT_INTERVAL.as_secs() // triggers every 10 secs for tests + } else { + 60 / HEART_BEAT_INTERVAL.as_secs() // triggers every 60 secs for prod + }; - let outgoing_removed = maker - .wallet - .write()? - .remove_outgoing_swapcoin(outgoing_reedemscript)? - .expect("outgoing swapcoin expected"); + let mut i = 0; + while !maker.shutdown.load(Relaxed) { + if i >= trigger_count || i == 0 { + for ((outgoing_reedemscript, contract), (timelock, timelocked_tx)) in outgoings.iter() { + // We have already broadcasted this tx, so skip + if timelock_boardcasted.contains(&timelocked_tx) { + continue; + } + // Check if the contract tx has reached required maturity + // Failure here means the transaction hasn't been broadcasted yet. So do nothing and try again. + let tx_from_chain = if let Ok(result) = maker + .wallet + .read()? + .rpc + .get_raw_transaction_info(&contract.compute_txid(), None) + { log::info!( - "[{}] Removed Outgoing Swapcoin from Wallet, Contract Txid: {}", + "[{}] Contract Txid : {} reached confirmation : {:?}, Required Confirmation : {}", maker.config.network_port, - outgoing_removed.contract_tx.compute_txid() + contract.compute_txid(), + result.confirmations, + timelock ); + result + } else { + continue; + }; + + if let Some(confirmation) = tx_from_chain.confirmations { + // Now the transaction is confirmed in a block, check for required maturity + if confirmation > (*timelock as u32) { + log::info!( + "[{}] Timelock maturity of {} blocks reached for Contract Txid : {}", + maker.config.network_port, + timelock, + contract.compute_txid() + ); + log::info!( + "[{}] Broadcasting timelocked tx: {}", + maker.config.network_port, + timelocked_tx.compute_txid() + ); + maker + .wallet + .read()? + .rpc + .send_raw_transaction(timelocked_tx) + .map_err(WalletError::Rpc)?; + timelock_boardcasted.push(timelocked_tx); + + let outgoing_removed = maker + .wallet + .write()? + .remove_outgoing_swapcoin(outgoing_reedemscript)? + .expect("outgoing swapcoin expected"); - log::info!("initializing Wallet Sync."); - { - let mut wallet_write = maker.wallet.write()?; - wallet_write.sync()?; - wallet_write.save_to_disk()?; + log::info!( + "[{}] Removed Outgoing Swapcoin from Wallet, Contract Txid: {}", + maker.config.network_port, + outgoing_removed.contract_tx.compute_txid() + ); + + log::info!("initializing Wallet Sync."); + { + let mut wallet_write = maker.wallet.write()?; + wallet_write.sync()?; + wallet_write.save_to_disk()?; + } + log::info!("Completed Wallet Sync."); } - log::info!("Completed Wallet Sync."); } } - } - if timelock_boardcasted.len() == outgoings.len() { - // For tests, terminate the maker at this stage. - #[cfg(feature = "integration-test")] - maker.shutdown.store(true, Relaxed); + if timelock_boardcasted.len() == outgoings.len() { + // For tests, terminate the maker at this stage. + #[cfg(feature = "integration-test")] + maker.shutdown.store(true, Relaxed); - log::info!( - "All outgoing transactions claimed back via timelock. Recovery loop exiting." - ); - break; + log::info!( + "All outgoing transactions claimed back via timelock. Recovery loop exiting." + ); + break; + } + // Reset counter + i = 0; } - - // Sleep before next blockchain scan - let block_lookup_interval = if cfg!(feature = "integration-test") { - Duration::from_secs(10) - } else { - Duration::from_secs(60) - }; - std::thread::sleep(block_lookup_interval); + i += 1; + std::thread::sleep(HEART_BEAT_INTERVAL); } Ok(()) } diff --git a/src/maker/config.rs b/src/maker/config.rs index ed0b9da2..e8c225dd 100644 --- a/src/maker/config.rs +++ b/src/maker/config.rs @@ -37,7 +37,8 @@ impl Default for MakerConfig { rpc_port: 6103, min_swap_amount: MIN_SWAP_AMOUNT, socks_port: 19050, - directory_server_address: "127.0.0.1:8080".to_string(), + directory_server_address: + "bhbzkndgad52ojm75w4goii7xsi6ou73fzyvorxas7swg2snlto4c4ad.onion:8080".to_string(), #[cfg(feature = "integration-test")] fidelity_amount: 5_000_000, // 0.05 BTC for tests #[cfg(feature = "integration-test")] diff --git a/src/maker/server.rs b/src/maker/server.rs index edd6d86d..7871feb7 100644 --- a/src/maker/server.rs +++ b/src/maker/server.rs @@ -46,7 +46,7 @@ use crate::utill::monitor_log_for_completion; use crate::maker::error::MakerError; // Default values for Maker configurations -pub(crate) const _DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS: u64 = 60 * 60 * 12; // 12 Hours +pub(crate) const DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS: u64 = 60 * 15; // 15 minutes /// Fetches the Maker and DNS address, and sends maker address to the DNS server. /// Depending upon ConnectionType and test/prod environment, different maker address and DNS addresses are returned. @@ -157,54 +157,63 @@ fn network_bootstrap(maker: Arc) -> Result, MakerError> { metadata: dns_metadata, }; - // Loop until shoutdown is initiated. - while !maker.shutdown.load(Relaxed) { - let stream = match maker.config.connection_type { - ConnectionType::CLEARNET => TcpStream::connect(&dns_address), - #[cfg(feature = "tor")] - ConnectionType::TOR => Socks5Stream::connect( - format!("127.0.0.1:{}", maker.config.socks_port), - dns_address.as_str(), - ) - .map(|stream| stream.into_inner()), - }; + thread::spawn(move || { + let trigger_count = DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS / HEART_BEAT_INTERVAL.as_secs(); + let mut i = 0; - log::info!( - "[{}] Connecting to DNS: {}", - maker.config.network_port, - dns_address - ); + while !maker.shutdown.load(Relaxed) { + if i >= trigger_count || i == 0 { + let stream = match maker.config.connection_type { + ConnectionType::CLEARNET => TcpStream::connect(&dns_address), + #[cfg(feature = "tor")] + ConnectionType::TOR => Socks5Stream::connect( + format!("127.0.0.1:{}", maker.config.socks_port), + dns_address.as_str(), + ) + .map(|stream| stream.into_inner()), + }; - let mut stream = match stream { - Ok(s) => s, - Err(e) => { - log::warn!( - "[{}] TCP connection error with directory, reattempting: {}", + log::info!( + "[{}] Connecting to DNS: {}", + maker.config.network_port, + dns_address + ); + + let mut stream = match stream { + Ok(s) => s, + Err(e) => { + log::warn!( + "[{}] TCP connection error with directory, reattempting: {}", + maker_port, + e + ); + thread::sleep(HEART_BEAT_INTERVAL); + continue; + } + }; + + if let Err(e) = send_message(&mut stream, &request) { + log::warn!( + "[{}] Failed to send our address to directory, reattempting: {}", + maker_port, + e + ); + thread::sleep(HEART_BEAT_INTERVAL); + continue; + } + + log::info!( + "[{}] Successfully sent our address to DNS at {}", maker_port, - e + dns_address ); - thread::sleep(HEART_BEAT_INTERVAL); - continue; + // Reset counter when success + i = 0; } - }; - - if let Err(e) = send_message(&mut stream, &request) { - log::warn!( - "[{}] Failed to send our address to directory, reattempting: {}", - maker_port, - e - ); + i += 1; thread::sleep(HEART_BEAT_INTERVAL); - continue; - }; - - log::info!( - "[{}] Successfully sent our address to dns at {}", - maker_port, - dns_address - ); - break; - } + } + }); Ok(tor_handle) } @@ -337,34 +346,37 @@ fn check_connection_with_core( accepting_clients: Arc, ) -> Result<(), MakerError> { let mut rpc_ping_success = false; + let mut i = 0; while !maker.shutdown.load(Relaxed) { // If connection is disrupted keep trying at heart_beat_interval (3 sec). // If connection is live, keep tring at rpc_ping_interval (60 sec). - match rpc_ping_success { - true => { - sleep(RPC_PING_INTERVAL); - } - false => { - sleep(HEART_BEAT_INTERVAL); - } - } - if let Err(e) = maker.wallet.read()?.rpc.get_blockchain_info() { - log::error!( - "[{}] RPC Connection failed. Reattempting {}", - maker.config.network_port, - e - ); - rpc_ping_success = false; - } else { - if !rpc_ping_success { - log::info!( - "[{}] Bitcoin Core RPC connection is live.", - maker.config.network_port + let trigger_count = match rpc_ping_success { + true => RPC_PING_INTERVAL.as_secs() / HEART_BEAT_INTERVAL.as_secs(), + false => 1, + }; + + if i >= trigger_count || i == 0 { + if let Err(e) = maker.wallet.read()?.rpc.get_blockchain_info() { + log::error!( + "[{}] RPC Connection failed. Reattempting {}", + maker.config.network_port, + e ); + rpc_ping_success = false; + } else { + if !rpc_ping_success { + log::info!( + "[{}] Bitcoin Core RPC connection is back online.", + maker.config.network_port + ); + } + rpc_ping_success = true; } - rpc_ping_success = true; + accepting_clients.store(rpc_ping_success, Relaxed); + i = 0; } - accepting_clients.store(rpc_ping_success, Relaxed); + i += 1; + thread::sleep(HEART_BEAT_INTERVAL); } Ok(()) diff --git a/src/market/directory.rs b/src/market/directory.rs index 1be8d0a8..1cf1fc37 100644 --- a/src/market/directory.rs +++ b/src/market/directory.rs @@ -23,16 +23,15 @@ use std::{ collections::HashMap, convert::TryFrom, fs::{self, File}, - io::{BufRead, BufReader, Write}, + io::Write, net::{Ipv4Addr, TcpListener, TcpStream}, path::{Path, PathBuf}, - str::FromStr, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard, }, thread::{self, sleep}, - time::Duration, + time::{Duration, Instant}, }; use crate::error::NetError; @@ -129,7 +128,7 @@ pub struct DirectoryServer { /// Shutdown flag to stop the directory server pub shutdown: AtomicBool, /// A store of all the received maker addresses indexed by fidelity bond outpoints. - pub addresses: Arc>>, + pub addresses: Arc>>, } impl Default for DirectoryServer { @@ -211,9 +210,7 @@ impl DirectoryServer { config_file.write_all(content.as_bytes())?; } - // Load all addresses from address.dat file - let address_file = data_dir.join("addresses.dat"); - let addresses = Arc::new(RwLock::new(read_addresses_from_file(&address_file)?)); + let addresses = Arc::new(RwLock::new(HashMap::new())); let default_dns = Self::default(); Ok(DirectoryServer { @@ -240,7 +237,7 @@ impl DirectoryServer { if let Some(existing_key) = write_lock .iter() - .find_map(|(k, v)| if v == &metadata.0 { Some(*k) } else { None }) + .find_map(|(k, v)| if v.0 == metadata.0 { Some(*k) } else { None }) { // Update the fielity for the existing address if existing_key != metadata.1 { @@ -251,28 +248,40 @@ impl DirectoryServer { metadata.1 ); write_lock.remove(&existing_key); - write_lock.insert(metadata.1, metadata.0); + write_lock.insert(metadata.1, (metadata.0, Instant::now())); } else { - log::info!("Maker data already exist for {}", metadata.0); + log::info!( + "Maker data already exist for {} | restarted counter", + metadata.0 + ); + write_lock + .entry(metadata.1) + .and_modify(|(_, instant)| *instant = Instant::now()); } } else if write_lock.contains_key(&metadata.1) { // Update the address for the existing fidelity - if write_lock[&metadata.1] != metadata.0 { + if write_lock[&metadata.1].0 != metadata.0 { let old_addr = write_lock - .insert(metadata.1, metadata.0.clone()) + .insert(metadata.1, (metadata.0.clone(), Instant::now())) .expect("value expected"); log::info!( - "Address updated for fidelity: {} | old address {} | new address {}", + "Address updated for fidelity: {} | old address {:?} | new address {}", metadata.1, old_addr, metadata.0 ); } else { - log::info!("Maker data already exist for {}", metadata.0); + log::info!( + "Maker data already exist for {} | restarted counter", + metadata.0 + ); + write_lock + .entry(metadata.1) + .and_modify(|(_, instant)| *instant = Instant::now()); } } else { // Add a new entry if both fidelity and address are new - write_lock.insert(metadata.1, metadata.0.clone()); + write_lock.insert(metadata.1, (metadata.0.clone(), Instant::now())); log::info!( "Added new maker info: Fidelity {} | Address {}", metadata.1, @@ -302,65 +311,28 @@ fn write_default_directory_config(config_path: &Path) -> Result<(), DirectorySer pub(crate) fn start_address_writer_thread( directory: Arc, ) -> Result<(), DirectoryServerError> { - let address_file = directory.data_dir.join("addresses.dat"); - - let interval = if cfg!(feature = "integration-test") { - 3 // 3 seconds for tests - } else { - 600 // 10 minutes for production - }; + let interval = 60 * 15; loop { sleep(Duration::from_secs(interval)); - - if let Err(e) = write_addresses_to_file(&directory, &address_file) { - log::error!("Error writing addresses: {:?}", e); + let mut directory_address_book = directory.addresses.write()?; + let ttl = Duration::from_secs(60 * 30); + + let expired_outpoints: Vec<_> = directory_address_book + .iter() + .filter(|(_, (_, timestamp))| timestamp.elapsed() > ttl) + .map(|(outpoint, _)| *outpoint) + .collect(); + for outpoint in &expired_outpoints { + log::info!( + "No update for 30 mins from maker with fidelity : {}", + outpoint + ); + directory_address_book.remove(outpoint); + log::info!("Maker entry removed"); } } } -/// Write in-memory address data to file. -pub(crate) fn write_addresses_to_file( - directory: &Arc, - address_file: &Path, -) -> Result<(), DirectoryServerError> { - let file_content = directory - .addresses - .read()? - .iter() - .map(|(op, addr)| format!("{},{}\n", op, addr)) - .collect::>() - .join(""); - - let mut file = File::create(address_file)?; - file.write_all(file_content.as_bytes())?; - file.flush()?; - Ok(()) -} - -/// Read address data from file and return the HashMap -pub fn read_addresses_from_file( - path: &Path, -) -> Result, DirectoryServerError> { - if !path.exists() { - return Ok(HashMap::new()); - } - let reader = BufReader::new(File::open(path)?); - - reader - .lines() - .map(|line| { - let line = line?; - let (outpoint, addr) = - line.split_once(',') - .ok_or(DirectoryServerError::AddressFileCorrupted( - "deliminator missing in address.dat file".to_string(), - ))?; - let op = OutPoint::from_str(outpoint)?; - Ok((op, addr.to_string())) - }) - .collect::, DirectoryServerError>>() -} - /// Initializes and starts the Directory Server with the provided configuration. /// /// This function configures the Directory Server based on the specified `directory` and optional `rpc_config`. @@ -437,7 +409,6 @@ pub fn start_directory_server( start_rpc_server_thread(directory_clone) }); - let address_file = directory.data_dir.join("addresses.dat"); let directory_clone = directory.clone(); let address_writer_thread = thread::spawn(move || { log::info!("Spawning Address Writer Thread"); @@ -483,8 +454,6 @@ pub fn start_directory_server( } } - write_addresses_to_file(&directory, &address_file)?; - Ok(()) } @@ -528,16 +497,21 @@ fn handle_client( } DnsRequest::Get => { log::info!("Received GET"); + let addresses = directory.addresses.read()?; + let response = addresses .iter() - .fold(String::new(), |acc, (_, addr)| acc + addr + "\n"); + .filter(|(_, (_, timestamp))| timestamp.elapsed() <= Duration::from_secs(30 * 60)) + .fold(String::new(), |acc, (_, addr)| acc + &addr.0 + "\n"); + log::debug!("Sending Addresses: {}", response); send_message(stream, &response)?; } #[cfg(feature = "integration-test")] // Used for IT, only checks the updated_address_map() function. DnsRequest::Dummy { url, vout } => { + use std::str::FromStr; log::info!("Got new maker address: {}", &url); // Create a constant txid for tests diff --git a/src/market/rpc/server.rs b/src/market/rpc/server.rs index 119770a9..f2b7f369 100644 --- a/src/market/rpc/server.rs +++ b/src/market/rpc/server.rs @@ -12,11 +12,11 @@ use std::{ net::{TcpListener, TcpStream}, sync::{atomic::Ordering::Relaxed, Arc, RwLock}, thread::sleep, - time::Duration, + time::{Duration, Instant}, }; fn handle_request( socket: &mut TcpStream, - address: Arc>>, + address: Arc>>, ) -> Result<(), DirectoryServerError> { let req_bytes = read_message(socket)?; let rpc_request: RpcMsgReq = serde_cbor::from_slice(&req_bytes).map_err(NetError::Cbor)?; @@ -28,7 +28,7 @@ fn handle_request( address .read()? .iter() - .map(|(op, address)| (*op, address.clone())) + .map(|(op, address)| (*op, address.0.clone())) .collect::>(), ); send_message(socket, &resp)?; diff --git a/src/taker/config.rs b/src/taker/config.rs index b23021f6..31fc62f6 100644 --- a/src/taker/config.rs +++ b/src/taker/config.rs @@ -24,7 +24,8 @@ impl Default for TakerConfig { Self { network_port: 8000, socks_port: 19070, - directory_server_address: "directoryhiddenserviceaddress.onion:8080".to_string(), + directory_server_address: + "bhbzkndgad52ojm75w4goii7xsi6ou73fzyvorxas7swg2snlto4c4ad.onion:8080".to_string(), connection_type: { #[cfg(feature = "tor")] { @@ -131,7 +132,6 @@ mod tests { let contents = r#" network_port = 8000 socks_port = 19070 - directory_server_address = directoryhiddenserviceaddress.onion:8080 connection_type = "TOR" rpc_port = 8081 "#; diff --git a/tests/dns.rs b/tests/dns.rs index 7cec48d0..db4ee8de 100644 --- a/tests/dns.rs +++ b/tests/dns.rs @@ -82,22 +82,12 @@ fn test_dns() { thread::sleep(Duration::from_secs(10)); verify_addresses(&initial_addresses); - // Persistence check - process.kill().expect("Failed to kill directoryd process"); - process.wait().unwrap(); - - let mut process = start_dns(&data_dir, &bitcoind); - // Replace address 8082 to 8083 registered for Bond index 2. // Add a new entry with a new bond index let additional_addresses = vec![("127.0.0.1:8083", 2), ("127.0.0.1:8084", 3)]; send_addresses(&additional_addresses); thread::sleep(Duration::from_secs(10)); - process.kill().expect("Failed to kill directoryd process"); - process.wait().unwrap(); - - let mut process = start_dns(&data_dir, &bitcoind); let all_addresses = vec![ ("127.0.0.1:8080", 0), ("127.0.0.1:8081", 1), @@ -106,6 +96,7 @@ fn test_dns() { ]; verify_addresses(&all_addresses); + // Persistence check process.kill().expect("Failed to kill directoryd process"); process.wait().unwrap(); }