Skip to content

Commit 493b686

Browse files
committed
Convert sweeper to use an async ChangeDestinationSource and provide
synchronous wrappers for usage in a sync context.
1 parent b5ec74a commit 493b686

File tree

5 files changed

+306
-62
lines changed

5 files changed

+306
-62
lines changed

lightning-background-processor/src/lib.rs

+19-12
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,18 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39-
use lightning::sign::{ChangeDestinationSource, OutputSpender};
39+
#[cfg(feature = "futures")]
40+
use lightning::sign::ChangeDestinationSource;
41+
#[cfg(feature = "std")]
42+
use lightning::sign::ChangeDestinationSourceSync;
43+
use lightning::sign::OutputSpender;
4044
use lightning::util::logger::Logger;
4145
use lightning::util::persist::{KVStore, Persister};
46+
#[cfg(feature = "futures")]
4247
use lightning::util::sweep::OutputSweeper;
4348
#[cfg(feature = "std")]
49+
use lightning::util::sweep::OutputSweeperSync;
50+
#[cfg(feature = "std")]
4451
use lightning::util::wakers::Sleeper;
4552
use lightning_rapid_gossip_sync::RapidGossipSync;
4653

@@ -866,7 +873,7 @@ where
866873
gossip_sync,
867874
{
868875
if let Some(ref sweeper) = sweeper {
869-
sweeper.regenerate_and_broadcast_spend_if_necessary()
876+
sweeper.regenerate_and_broadcast_spend_if_necessary().await
870877
} else {
871878
Ok(())
872879
}
@@ -994,7 +1001,7 @@ impl BackgroundProcessor {
9941001
D: 'static + Deref,
9951002
O: 'static + Deref,
9961003
K: 'static + Deref,
997-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
1004+
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send + Sync,
9981005
>(
9991006
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
10001007
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
@@ -1012,8 +1019,8 @@ impl BackgroundProcessor {
10121019
OM::Target: AOnionMessenger,
10131020
PM::Target: APeerManager,
10141021
LM::Target: ALiquidityManager,
1022+
D::Target: ChangeDestinationSourceSync,
10151023
O::Target: 'static + OutputSpender,
1016-
D::Target: 'static + ChangeDestinationSource,
10171024
K::Target: 'static + KVStore,
10181025
{
10191026
let stop_thread = Arc::new(AtomicBool::new(false));
@@ -1179,7 +1186,7 @@ mod tests {
11791186
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
11801187
use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
11811188
use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1182-
use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
1189+
use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager};
11831190
use lightning::types::features::{ChannelFeatures, NodeFeatures};
11841191
use lightning::types::payment::PaymentHash;
11851192
use lightning::util::config::UserConfig;
@@ -1191,7 +1198,7 @@ mod tests {
11911198
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
11921199
};
11931200
use lightning::util::ser::Writeable;
1194-
use lightning::util::sweep::{OutputSpendStatus, OutputSweeper, PRUNE_DELAY_BLOCKS};
1201+
use lightning::util::sweep::{OutputSpendStatus, OutputSweeperSync, PRUNE_DELAY_BLOCKS};
11951202
use lightning::util::test_utils;
11961203
use lightning::{get_event, get_event_msg};
11971204
use lightning_liquidity::LiquidityManager;
@@ -1317,7 +1324,7 @@ mod tests {
13171324
best_block: BestBlock,
13181325
scorer: Arc<LockingWrapper<TestScorer>>,
13191326
sweeper: Arc<
1320-
OutputSweeper<
1327+
OutputSweeperSync<
13211328
Arc<test_utils::TestBroadcaster>,
13221329
Arc<TestWallet>,
13231330
Arc<test_utils::TestFeeEstimator>,
@@ -1618,7 +1625,7 @@ mod tests {
16181625

16191626
struct TestWallet {}
16201627

1621-
impl ChangeDestinationSource for TestWallet {
1628+
impl ChangeDestinationSourceSync for TestWallet {
16221629
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
16231630
Ok(ScriptBuf::new())
16241631
}
@@ -1696,7 +1703,7 @@ mod tests {
16961703
IgnoringMessageHandler {},
16971704
));
16981705
let wallet = Arc::new(TestWallet {});
1699-
let sweeper = Arc::new(OutputSweeper::new(
1706+
let sweeper = Arc::new(OutputSweeperSync::new(
17001707
best_block,
17011708
Arc::clone(&tx_broadcaster),
17021709
Arc::clone(&fee_estimator),
@@ -2113,7 +2120,7 @@ mod tests {
21132120
nodes[0].rapid_gossip_sync(),
21142121
nodes[0].peer_manager.clone(),
21152122
Some(Arc::clone(&nodes[0].liquidity_manager)),
2116-
Some(nodes[0].sweeper.clone()),
2123+
Some(nodes[0].sweeper.sweeper_async()),
21172124
nodes[0].logger.clone(),
21182125
Some(nodes[0].scorer.clone()),
21192126
move |dur: Duration| {
@@ -2621,7 +2628,7 @@ mod tests {
26212628
nodes[0].rapid_gossip_sync(),
26222629
nodes[0].peer_manager.clone(),
26232630
Some(Arc::clone(&nodes[0].liquidity_manager)),
2624-
Some(nodes[0].sweeper.clone()),
2631+
Some(nodes[0].sweeper.sweeper_async()),
26252632
nodes[0].logger.clone(),
26262633
Some(nodes[0].scorer.clone()),
26272634
move |dur: Duration| {
@@ -2837,7 +2844,7 @@ mod tests {
28372844
nodes[0].no_gossip_sync(),
28382845
nodes[0].peer_manager.clone(),
28392846
Some(Arc::clone(&nodes[0].liquidity_manager)),
2840-
Some(nodes[0].sweeper.clone()),
2847+
Some(nodes[0].sweeper.sweeper_async()),
28412848
nodes[0].logger.clone(),
28422849
Some(nodes[0].scorer.clone()),
28432850
move |dur: Duration| {

lightning/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//! * `grind_signatures`
3131
3232
#![cfg_attr(not(any(test, fuzzing, feature = "_test_utils")), deny(missing_docs))]
33-
#![cfg_attr(not(any(test, feature = "_test_utils")), forbid(unsafe_code))]
3433

3534
#![deny(rustdoc::broken_intra_doc_links)]
3635
#![deny(rustdoc::private_intra_doc_links)]

lightning/src/sign/mod.rs

+38
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use crate::ln::msgs::{UnsignedChannelAnnouncement, UnsignedGossipMessage};
5757
use crate::ln::script::ShutdownScript;
5858
use crate::offers::invoice::UnsignedBolt12Invoice;
5959
use crate::types::payment::PaymentPreimage;
60+
use crate::util::async_poll::AsyncResult;
6061
use crate::util::ser::{ReadableArgs, Writeable};
6162
use crate::util::transaction_utils;
6263

@@ -67,6 +68,7 @@ use crate::sign::ecdsa::EcdsaChannelSigner;
6768
use crate::sign::taproot::TaprootChannelSigner;
6869
use crate::util::atomic_counter::AtomicCounter;
6970
use core::convert::TryInto;
71+
use core::ops::Deref;
7072
use core::sync::atomic::{AtomicUsize, Ordering};
7173
#[cfg(taproot)]
7274
use musig2::types::{PartialSignature, PublicNonce};
@@ -981,11 +983,47 @@ pub trait ChangeDestinationSource {
981983
/// Returns a script pubkey which can be used as a change destination for
982984
/// [`OutputSpender::spend_spendable_outputs`].
983985
///
986+
/// This method should return a different value each time it is called, to avoid linking
987+
/// on-chain funds controlled to the same user.
988+
fn get_change_destination_script<'a>(&self) -> AsyncResult<'a, ScriptBuf>;
989+
}
990+
991+
/// A synchronous helper trait that describes an on-chain wallet capable of returning a (change) destination script.
992+
pub trait ChangeDestinationSourceSync {
984993
/// This method should return a different value each time it is called, to avoid linking
985994
/// on-chain funds controlled to the same user.
986995
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()>;
987996
}
988997

998+
/// A wrapper around [`ChangeDestinationSource`] to allow for async calls.
999+
#[cfg(any(test, feature = "_test_utils"))]
1000+
pub struct ChangeDestinationSourceSyncWrapper<T: Deref>(T)
1001+
where
1002+
T::Target: ChangeDestinationSourceSync;
1003+
#[cfg(not(any(test, feature = "_test_utils")))]
1004+
pub(crate) struct ChangeDestinationSourceSyncWrapper<T: Deref>(T)
1005+
where
1006+
T::Target: ChangeDestinationSourceSync;
1007+
1008+
impl<T: Deref> ChangeDestinationSourceSyncWrapper<T>
1009+
where
1010+
T::Target: ChangeDestinationSourceSync,
1011+
{
1012+
/// Creates a new [`ChangeDestinationSourceSyncWrapper`].
1013+
pub fn new(source: T) -> Self {
1014+
Self(source)
1015+
}
1016+
}
1017+
impl<T: Deref> ChangeDestinationSource for ChangeDestinationSourceSyncWrapper<T>
1018+
where
1019+
T::Target: ChangeDestinationSourceSync,
1020+
{
1021+
fn get_change_destination_script<'a>(&self) -> AsyncResult<'a, ScriptBuf> {
1022+
let script = self.0.get_change_destination_script();
1023+
Box::pin(async move { script })
1024+
}
1025+
}
1026+
9891027
mod sealed {
9901028
use bitcoin::secp256k1::{Scalar, SecretKey};
9911029

lightning/src/util/async_poll.rs

+25-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::prelude::*;
1313
use core::future::Future;
1414
use core::marker::Unpin;
1515
use core::pin::Pin;
16-
use core::task::{Context, Poll};
16+
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

1818
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
1919
Pending(F),
@@ -74,3 +74,27 @@ impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future
7474
}
7575
}
7676
}
77+
78+
// If we want to poll a future without an async context to figure out if it has completed or
79+
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
80+
// but sadly there's a good bit of boilerplate here.
81+
//
82+
// Waker::noop() would be preferable, but requires an MSRV of 1.85.
83+
fn dummy_waker_clone(_: *const ()) -> RawWaker {
84+
RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
85+
}
86+
fn dummy_waker_action(_: *const ()) {}
87+
88+
const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
89+
dummy_waker_clone,
90+
dummy_waker_action,
91+
dummy_waker_action,
92+
dummy_waker_action,
93+
);
94+
95+
pub(crate) fn dummy_waker() -> Waker {
96+
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
97+
}
98+
99+
/// A type alias for a future that returns a result of type T.
100+
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;

0 commit comments

Comments
 (0)