@@ -2722,6 +2722,10 @@ pub struct ChannelManager<
2722
2722
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
2723
2723
pending_events_processor: AtomicBool,
2724
2724
2725
+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2726
+ /// [`Self::process_pending_htlc_forwards`].
2727
+ pending_htlc_forwards_processor: AtomicBool,
2728
+
2725
2729
/// If we are running during init (either directly during the deserialization method or in
2726
2730
/// block connection methods which run after deserialization but before normal operation) we
2727
2731
/// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3786,6 +3790,7 @@ where
3786
3790
3787
3791
pending_events: Mutex::new(VecDeque::new()),
3788
3792
pending_events_processor: AtomicBool::new(false),
3793
+ pending_htlc_forwards_processor: AtomicBool::new(false),
3789
3794
pending_background_events: Mutex::new(Vec::new()),
3790
3795
total_consistency_lock: RwLock::new(()),
3791
3796
background_events_processed_since_startup: AtomicBool::new(false),
@@ -6365,9 +6370,19 @@ where
6365
6370
///
6366
6371
/// Will regularly be called by the background processor.
6367
6372
pub fn process_pending_htlc_forwards(&self) {
6373
+ if self
6374
+ .pending_htlc_forwards_processor
6375
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6376
+ .is_err()
6377
+ {
6378
+ return;
6379
+ }
6380
+
6368
6381
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
6369
6382
self.internal_process_pending_htlc_forwards()
6370
6383
});
6384
+
6385
+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
6371
6386
}
6372
6387
6373
6388
// Returns whether or not we need to re-persist.
@@ -16445,6 +16460,7 @@ where
16445
16460
16446
16461
pending_events: Mutex::new(pending_events_read),
16447
16462
pending_events_processor: AtomicBool::new(false),
16463
+ pending_htlc_forwards_processor: AtomicBool::new(false),
16448
16464
pending_background_events: Mutex::new(pending_background_events),
16449
16465
total_consistency_lock: RwLock::new(()),
16450
16466
background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments