Skip to content

Commit dd696a5

Browse files
committed
Decouple spending from chain notifications
To prepare for asynchronous processing of the sweep, we need to decouple the spending from the chain notifications. These notifications run in a sync context and wouldn't allow calls into an async trait. Instead we now periodically call into the sweeper, to open up the possibility to do so from an async context if desired.
1 parent f507778 commit dd696a5

File tree

2 files changed

+137
-86
lines changed

2 files changed

+137
-86
lines changed

lightning-background-processor/src/lib.rs

+91-13
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39+
use lightning::sign::{ChangeDestinationSource, OutputSpender};
3940
use lightning::util::logger::Logger;
40-
use lightning::util::persist::Persister;
41+
use lightning::util::persist::{KVStore, Persister};
42+
use lightning::util::sweep::OutputSweeper;
4143
#[cfg(feature = "std")]
4244
use lightning::util::wakers::Sleeper;
4345
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -132,6 +134,11 @@ const REBROADCAST_TIMER: u64 = 30;
132134
#[cfg(test)]
133135
const REBROADCAST_TIMER: u64 = 1;
134136

137+
#[cfg(not(test))]
138+
const SWEEPER_TIMER: u64 = 30;
139+
#[cfg(test)]
140+
const SWEEPER_TIMER: u64 = 1;
141+
135142
#[cfg(feature = "futures")]
136143
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
137144
const fn min_u64(a: u64, b: u64) -> u64 {
@@ -308,6 +315,7 @@ macro_rules! define_run_body {
308315
$channel_manager: ident, $process_channel_manager_events: expr,
309316
$onion_messenger: ident, $process_onion_message_handler_events: expr,
310317
$peer_manager: ident, $gossip_sync: ident,
318+
$process_sweeper: expr,
311319
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
312320
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
313321
) => { {
@@ -322,6 +330,7 @@ macro_rules! define_run_body {
322330
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
323331
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
324332
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
333+
let mut last_sweeper_call = $get_timer(SWEEPER_TIMER);
325334
let mut have_pruned = false;
326335
let mut have_decayed_scorer = false;
327336

@@ -465,6 +474,12 @@ macro_rules! define_run_body {
465474
$chain_monitor.rebroadcast_pending_claims();
466475
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
467476
}
477+
478+
if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) {
479+
log_trace!($logger, "Regenerating sweeper spends if necessary");
480+
let _ = $process_sweeper;
481+
last_sweeper_call = $get_timer(SWEEPER_TIMER);
482+
}
468483
}
469484

470485
// After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -627,6 +642,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627642
/// ```
628643
/// # use lightning::io;
629644
/// # use lightning::events::ReplayEvent;
645+
/// # use lightning::util::sweep::OutputSweeper;
630646
/// # use std::sync::{Arc, RwLock};
631647
/// # use std::sync::atomic::{AtomicBool, Ordering};
632648
/// # use std::time::SystemTime;
@@ -666,6 +682,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
666682
/// # F: lightning::chain::Filter + Send + Sync + 'static,
667683
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
668684
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
685+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
686+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
687+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
669688
/// # > {
670689
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
671690
/// # event_handler: Arc<EventHandler>,
@@ -677,14 +696,18 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
677696
/// # persister: Arc<Store>,
678697
/// # logger: Arc<Logger>,
679698
/// # scorer: Arc<Scorer>,
699+
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
680700
/// # }
681701
/// #
682702
/// # async fn setup_background_processing<
683703
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
684704
/// # F: lightning::chain::Filter + Send + Sync + 'static,
685705
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
686706
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
687-
/// # >(node: Node<B, F, FE, UL>) {
707+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
708+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
709+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
710+
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
688711
/// let background_persister = Arc::clone(&node.persister);
689712
/// let background_event_handler = Arc::clone(&node.event_handler);
690713
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -695,7 +718,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
695718
/// let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
696719
/// let background_logger = Arc::clone(&node.logger);
697720
/// let background_scorer = Arc::clone(&node.scorer);
698-
///
721+
/// let background_sweeper = Arc::clone(&node.sweeper);
699722
/// // Setup the sleeper.
700723
#[cfg_attr(
701724
feature = "std",
@@ -729,6 +752,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
729752
/// background_gossip_sync,
730753
/// background_peer_man,
731754
/// Some(background_liquidity_manager),
755+
/// Some(background_sweeper),
732756
/// background_logger,
733757
/// Some(background_scorer),
734758
/// sleeper,
@@ -767,6 +791,10 @@ pub async fn process_events_async<
767791
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
768792
PM: 'static + Deref,
769793
LM: 'static + Deref,
794+
D: 'static + Deref,
795+
O: 'static + Deref,
796+
K: 'static + Deref,
797+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
770798
S: 'static + Deref<Target = SC> + Send + Sync,
771799
SC: for<'b> WriteableScore<'b>,
772800
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -775,12 +803,12 @@ pub async fn process_events_async<
775803
>(
776804
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
777805
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
778-
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>, sleeper: Sleeper,
779-
mobile_interruptable_platform: bool, fetch_time: FetchTime,
806+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
807+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
780808
) -> Result<(), lightning::io::Error>
781809
where
782810
UL::Target: 'static + UtxoLookup,
783-
CF::Target: 'static + chain::Filter,
811+
CF::Target: 'static + chain::Filter + Sync + Send,
784812
T::Target: 'static + BroadcasterInterface,
785813
F::Target: 'static + FeeEstimator,
786814
L::Target: 'static + Logger,
@@ -790,6 +818,9 @@ where
790818
OM::Target: AOnionMessenger,
791819
PM::Target: APeerManager,
792820
LM::Target: ALiquidityManager,
821+
O::Target: 'static + OutputSpender,
822+
D::Target: 'static + ChangeDestinationSource,
823+
K::Target: 'static + KVStore,
793824
{
794825
let mut should_break = false;
795826
let async_event_handler = |event| {
@@ -833,6 +864,13 @@ where
833864
},
834865
peer_manager,
835866
gossip_sync,
867+
{
868+
if let Some(ref sweeper) = sweeper {
869+
sweeper.regenerate_and_broadcast_spend_if_necessary()
870+
} else {
871+
Ok(())
872+
}
873+
},
836874
logger,
837875
scorer,
838876
should_break,
@@ -953,14 +991,18 @@ impl BackgroundProcessor {
953991
LM: 'static + Deref + Send,
954992
S: 'static + Deref<Target = SC> + Send + Sync,
955993
SC: for<'b> WriteableScore<'b>,
994+
D: 'static + Deref,
995+
O: 'static + Deref,
996+
K: 'static + Deref,
997+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
956998
>(
957999
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
9581000
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
959-
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>,
1001+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
9601002
) -> Self
9611003
where
9621004
UL::Target: 'static + UtxoLookup,
963-
CF::Target: 'static + chain::Filter,
1005+
CF::Target: 'static + chain::Filter + Sync + Send,
9641006
T::Target: 'static + BroadcasterInterface,
9651007
F::Target: 'static + FeeEstimator,
9661008
L::Target: 'static + Logger,
@@ -970,6 +1012,9 @@ impl BackgroundProcessor {
9701012
OM::Target: AOnionMessenger,
9711013
PM::Target: APeerManager,
9721014
LM::Target: ALiquidityManager,
1015+
O::Target: 'static + OutputSpender,
1016+
D::Target: 'static + ChangeDestinationSource,
1017+
K::Target: 'static + KVStore,
9731018
{
9741019
let stop_thread = Arc::new(AtomicBool::new(false));
9751020
let stop_thread_clone = stop_thread.clone();
@@ -1005,6 +1050,13 @@ impl BackgroundProcessor {
10051050
},
10061051
peer_manager,
10071052
gossip_sync,
1053+
{
1054+
if let Some(ref sweeper) = sweeper {
1055+
sweeper.regenerate_and_broadcast_spend_if_necessary()
1056+
} else {
1057+
Ok(())
1058+
}
1059+
},
10081060
logger,
10091061
scorer,
10101062
stop_thread.load(Ordering::Acquire),
@@ -1269,7 +1321,7 @@ mod tests {
12691321
Arc<test_utils::TestBroadcaster>,
12701322
Arc<TestWallet>,
12711323
Arc<test_utils::TestFeeEstimator>,
1272-
Arc<dyn Filter + Sync + Send>,
1324+
Arc<test_utils::TestChainSource>,
12731325
Arc<FilesystemStore>,
12741326
Arc<test_utils::TestLogger>,
12751327
Arc<KeysManager>,
@@ -1648,7 +1700,7 @@ mod tests {
16481700
best_block,
16491701
Arc::clone(&tx_broadcaster),
16501702
Arc::clone(&fee_estimator),
1651-
None::<Arc<dyn Filter + Sync + Send>>,
1703+
None::<Arc<test_utils::TestChainSource>>,
16521704
Arc::clone(&keys_manager),
16531705
wallet,
16541706
Arc::clone(&kv_store),
@@ -1888,6 +1940,7 @@ mod tests {
18881940
nodes[0].p2p_gossip_sync(),
18891941
nodes[0].peer_manager.clone(),
18901942
Some(Arc::clone(&nodes[0].liquidity_manager)),
1943+
Some(nodes[0].sweeper.clone()),
18911944
nodes[0].logger.clone(),
18921945
Some(nodes[0].scorer.clone()),
18931946
);
@@ -1982,6 +2035,7 @@ mod tests {
19822035
nodes[0].no_gossip_sync(),
19832036
nodes[0].peer_manager.clone(),
19842037
Some(Arc::clone(&nodes[0].liquidity_manager)),
2038+
Some(nodes[0].sweeper.clone()),
19852039
nodes[0].logger.clone(),
19862040
Some(nodes[0].scorer.clone()),
19872041
);
@@ -2025,6 +2079,7 @@ mod tests {
20252079
nodes[0].no_gossip_sync(),
20262080
nodes[0].peer_manager.clone(),
20272081
Some(Arc::clone(&nodes[0].liquidity_manager)),
2082+
Some(nodes[0].sweeper.clone()),
20282083
nodes[0].logger.clone(),
20292084
Some(nodes[0].scorer.clone()),
20302085
);
@@ -2058,6 +2113,7 @@ mod tests {
20582113
nodes[0].rapid_gossip_sync(),
20592114
nodes[0].peer_manager.clone(),
20602115
Some(Arc::clone(&nodes[0].liquidity_manager)),
2116+
Some(nodes[0].sweeper.clone()),
20612117
nodes[0].logger.clone(),
20622118
Some(nodes[0].scorer.clone()),
20632119
move |dur: Duration| {
@@ -2095,6 +2151,7 @@ mod tests {
20952151
nodes[0].p2p_gossip_sync(),
20962152
nodes[0].peer_manager.clone(),
20972153
Some(Arc::clone(&nodes[0].liquidity_manager)),
2154+
Some(nodes[0].sweeper.clone()),
20982155
nodes[0].logger.clone(),
20992156
Some(nodes[0].scorer.clone()),
21002157
);
@@ -2125,6 +2182,7 @@ mod tests {
21252182
nodes[0].no_gossip_sync(),
21262183
nodes[0].peer_manager.clone(),
21272184
Some(Arc::clone(&nodes[0].liquidity_manager)),
2185+
Some(nodes[0].sweeper.clone()),
21282186
nodes[0].logger.clone(),
21292187
Some(nodes[0].scorer.clone()),
21302188
);
@@ -2172,6 +2230,7 @@ mod tests {
21722230
nodes[0].no_gossip_sync(),
21732231
nodes[0].peer_manager.clone(),
21742232
Some(Arc::clone(&nodes[0].liquidity_manager)),
2233+
Some(nodes[0].sweeper.clone()),
21752234
nodes[0].logger.clone(),
21762235
Some(nodes[0].scorer.clone()),
21772236
);
@@ -2235,6 +2294,7 @@ mod tests {
22352294
nodes[0].no_gossip_sync(),
22362295
nodes[0].peer_manager.clone(),
22372296
Some(Arc::clone(&nodes[0].liquidity_manager)),
2297+
Some(nodes[0].sweeper.clone()),
22382298
nodes[0].logger.clone(),
22392299
Some(nodes[0].scorer.clone()),
22402300
);
@@ -2280,10 +2340,22 @@ mod tests {
22802340

22812341
advance_chain(&mut nodes[0], 3);
22822342

2343+
let tx_broadcaster = nodes[0].tx_broadcaster.clone();
2344+
let wait_for_sweep_tx = || -> Transaction {
2345+
loop {
2346+
let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
2347+
if let Some(sweep_tx) = sweep_tx {
2348+
return sweep_tx;
2349+
}
2350+
2351+
std::thread::sleep(Duration::from_millis(10));
2352+
}
2353+
};
2354+
22832355
// Check we generate an initial sweeping tx.
22842356
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2357+
let sweep_tx_0 = wait_for_sweep_tx();
22852358
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2286-
let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22872359
match tracked_output.status {
22882360
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22892361
assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
@@ -2294,8 +2366,8 @@ mod tests {
22942366
// Check we regenerate and rebroadcast the sweeping tx each block.
22952367
advance_chain(&mut nodes[0], 1);
22962368
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2369+
let sweep_tx_1 = wait_for_sweep_tx();
22972370
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2298-
let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22992371
match tracked_output.status {
23002372
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
23012373
assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
@@ -2306,8 +2378,8 @@ mod tests {
23062378

23072379
advance_chain(&mut nodes[0], 1);
23082380
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2381+
let sweep_tx_2 = wait_for_sweep_tx();
23092382
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2310-
let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
23112383
match tracked_output.status {
23122384
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
23132385
assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
@@ -2387,6 +2459,7 @@ mod tests {
23872459
nodes[0].no_gossip_sync(),
23882460
nodes[0].peer_manager.clone(),
23892461
Some(Arc::clone(&nodes[0].liquidity_manager)),
2462+
Some(nodes[0].sweeper.clone()),
23902463
nodes[0].logger.clone(),
23912464
Some(nodes[0].scorer.clone()),
23922465
);
@@ -2417,6 +2490,7 @@ mod tests {
24172490
nodes[0].no_gossip_sync(),
24182491
nodes[0].peer_manager.clone(),
24192492
Some(Arc::clone(&nodes[0].liquidity_manager)),
2493+
Some(nodes[0].sweeper.clone()),
24202494
nodes[0].logger.clone(),
24212495
Some(nodes[0].scorer.clone()),
24222496
);
@@ -2513,6 +2587,7 @@ mod tests {
25132587
nodes[0].rapid_gossip_sync(),
25142588
nodes[0].peer_manager.clone(),
25152589
Some(Arc::clone(&nodes[0].liquidity_manager)),
2590+
Some(nodes[0].sweeper.clone()),
25162591
nodes[0].logger.clone(),
25172592
Some(nodes[0].scorer.clone()),
25182593
);
@@ -2546,6 +2621,7 @@ mod tests {
25462621
nodes[0].rapid_gossip_sync(),
25472622
nodes[0].peer_manager.clone(),
25482623
Some(Arc::clone(&nodes[0].liquidity_manager)),
2624+
Some(nodes[0].sweeper.clone()),
25492625
nodes[0].logger.clone(),
25502626
Some(nodes[0].scorer.clone()),
25512627
move |dur: Duration| {
@@ -2709,6 +2785,7 @@ mod tests {
27092785
nodes[0].no_gossip_sync(),
27102786
nodes[0].peer_manager.clone(),
27112787
Some(Arc::clone(&nodes[0].liquidity_manager)),
2788+
Some(nodes[0].sweeper.clone()),
27122789
nodes[0].logger.clone(),
27132790
Some(nodes[0].scorer.clone()),
27142791
);
@@ -2760,6 +2837,7 @@ mod tests {
27602837
nodes[0].no_gossip_sync(),
27612838
nodes[0].peer_manager.clone(),
27622839
Some(Arc::clone(&nodes[0].liquidity_manager)),
2840+
Some(nodes[0].sweeper.clone()),
27632841
nodes[0].logger.clone(),
27642842
Some(nodes[0].scorer.clone()),
27652843
move |dur: Duration| {

0 commit comments

Comments
 (0)