Skip to content

Commit aa3fb3d

Browse files
committed
Wire OnionMessageInterceptor into LSPS2 service handler
Define the `OnionMessageInterceptor` trait with `register_peer_for_interception()` and `deregister_peer_for_interception()` methods, and implement it for `OnionMessenger`. This allows external components to register peers for onion message interception via a trait object, without needing to know the concrete `OnionMessenger` type. Wire the trait into `LSPS2ServiceHandler` as an optional `Arc<dyn OnionMessageInterceptor>`. When provided: - On init, all peers with active intercept SCIDs are registered - In `invoice_parameters_generated()`, the counterparty is registered when a new intercept SCID is assigned This ensures that onion messages for LSPS2 clients with active JIT channel sessions are intercepted when those clients are offline, enabling the LSP to store and forward messages when the client reconnects. Co-Authored-By: HAL 9000
1 parent 706d90d commit aa3fb3d

8 files changed

Lines changed: 94 additions & 0 deletions

File tree

fuzz/src/lsps_message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ pub fn do_test(data: &[u8]) {
8787
Arc::clone(&tx_broadcaster),
8888
None,
8989
None,
90+
None,
9091
)
9192
.unwrap(),
9293
);

lightning-background-processor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2556,6 +2556,7 @@ mod tests {
25562556
Arc::clone(&tx_broadcaster),
25572557
None,
25582558
None,
2559+
None,
25592560
)
25602561
.unwrap(),
25612562
);

lightning-liquidity/src/lsps2/service.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use lightning::events::HTLCHandlingFailureType;
4545
use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId};
4646
use lightning::ln::msgs::{ErrorAction, LightningError};
4747
use lightning::ln::types::ChannelId;
48+
use lightning::onion_message::messenger::OnionMessageInterceptor;
4849
use lightning::util::errors::APIError;
4950
use lightning::util::logger::Level;
5051
use lightning::util::ser::Writeable;
@@ -717,6 +718,7 @@ where
717718
total_pending_requests: AtomicUsize,
718719
config: LSPS2ServiceConfig,
719720
persistence_in_flight: AtomicUsize,
721+
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
720722
}
721723

722724
impl<CM: Deref, K: KVStore + Clone, T: BroadcasterInterface + Clone> LSPS2ServiceHandler<CM, K, T>
@@ -728,6 +730,7 @@ where
728730
per_peer_state: HashMap<PublicKey, Mutex<PeerState>>, pending_messages: Arc<MessageQueue>,
729731
pending_events: Arc<EventQueue<K>>, channel_manager: CM, kv_store: K, tx_broadcaster: T,
730732
config: LSPS2ServiceConfig,
733+
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
731734
) -> Result<Self, lightning::io::Error> {
732735
let mut peer_by_intercept_scid = new_hash_map();
733736
let mut peer_by_channel_id = new_hash_map();
@@ -756,6 +759,17 @@ where
756759
}
757760
}
758761

762+
// Register all peers and SCIDs with active intercept SCIDs for onion message
763+
// interception, so that messages for offline peers are held rather than dropped.
764+
// Both peer-based and SCID-based registration are needed to support clients using
765+
// either pubkey or compact SCID encoding in their message blinded paths.
766+
if let Some(ref interceptor) = onion_message_interceptor {
767+
for (scid, node_id) in &peer_by_intercept_scid {
768+
interceptor.register_peer_for_interception(*node_id);
769+
interceptor.register_scid_for_interception(*scid, *node_id);
770+
}
771+
}
772+
759773
Ok(Self {
760774
pending_messages,
761775
pending_events,
@@ -768,6 +782,7 @@ where
768782
kv_store,
769783
tx_broadcaster,
770784
config,
785+
onion_message_interceptor,
771786
})
772787
}
773788

@@ -776,6 +791,33 @@ where
776791
&self.config
777792
}
778793

794+
/// Cleans up `peer_by_intercept_scid` entries for the given SCIDs, and deregisters the peer
795+
/// from onion message interception if they have no remaining active intercept SCIDs.
796+
fn cleanup_intercept_scids(
797+
&self, counterparty_node_id: &PublicKey, pruned_scids: &[u64], has_remaining_channels: bool,
798+
) {
799+
if pruned_scids.is_empty() {
800+
return;
801+
}
802+
803+
{
804+
let mut peer_by_intercept_scid = self.peer_by_intercept_scid.write().unwrap();
805+
for scid in pruned_scids {
806+
peer_by_intercept_scid.remove(scid);
807+
}
808+
}
809+
810+
if let Some(ref interceptor) = self.onion_message_interceptor {
811+
for scid in pruned_scids {
812+
interceptor.deregister_scid_for_interception(*scid);
813+
}
814+
815+
if !has_remaining_channels {
816+
interceptor.deregister_peer_for_interception(counterparty_node_id);
817+
}
818+
}
819+
}
820+
779821
/// Returns whether the peer has any active LSPS2 requests.
780822
pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool {
781823
let outer_state_lock = self.per_peer_state.read().unwrap();
@@ -921,6 +963,14 @@ where
921963
peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
922964
}
923965

966+
if let Some(ref interceptor) = self.onion_message_interceptor {
967+
interceptor.register_peer_for_interception(*counterparty_node_id);
968+
interceptor.register_scid_for_interception(
969+
intercept_scid,
970+
*counterparty_node_id,
971+
);
972+
}
973+
924974
let outbound_jit_channel = OutboundJITChannel::new(
925975
buy_request.payment_size_msat,
926976
buy_request.opening_fee_params,
@@ -1858,6 +1908,31 @@ where
18581908
debug_assert!(false);
18591909
}
18601910
}
1911+
if future_opt.is_some() {
1912+
// Clean up handler-level maps for the removed peer.
1913+
let removed_scids: Vec<u64> = {
1914+
let mut peer_by_intercept_scid =
1915+
self.peer_by_intercept_scid.write().unwrap();
1916+
let scids: Vec<u64> = peer_by_intercept_scid
1917+
.iter()
1918+
.filter(|(_, nid)| **nid == counterparty_node_id)
1919+
.map(|(scid, _)| *scid)
1920+
.collect();
1921+
peer_by_intercept_scid
1922+
.retain(|_, node_id| *node_id != counterparty_node_id);
1923+
scids
1924+
};
1925+
self.peer_by_channel_id
1926+
.write()
1927+
.unwrap()
1928+
.retain(|_, node_id| *node_id != counterparty_node_id);
1929+
if let Some(ref interceptor) = self.onion_message_interceptor {
1930+
for scid in &removed_scids {
1931+
interceptor.deregister_scid_for_interception(*scid);
1932+
}
1933+
interceptor.deregister_peer_for_interception(&counterparty_node_id);
1934+
}
1935+
}
18611936
if let Some(future) = future_opt {
18621937
future.await?;
18631938
did_persist = true;

lightning-liquidity/src/manager.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use lightning::ln::channelmanager::AChannelManager;
4747
use lightning::ln::msgs::{ErrorAction, LightningError};
4848
use lightning::ln::peer_handler::CustomMessageHandler;
4949
use lightning::ln::wire::CustomMessageReader;
50+
use lightning::onion_message::messenger::OnionMessageInterceptor;
5051
use lightning::sign::{EntropySource, NodeSigner};
5152
use lightning::util::logger::Level;
5253
use lightning::util::persist::{KVStore, KVStoreSync, KVStoreSyncWrapper};
@@ -310,6 +311,7 @@ where
310311
entropy_source: ES, node_signer: NS, channel_manager: CM, kv_store: K,
311312
transaction_broadcaster: T, service_config: Option<LiquidityServiceConfig>,
312313
client_config: Option<LiquidityClientConfig>,
314+
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
313315
) -> Result<Self, lightning::io::Error> {
314316
Self::new_with_custom_time_provider(
315317
entropy_source,
@@ -320,6 +322,7 @@ where
320322
service_config,
321323
client_config,
322324
DefaultTimeProvider,
325+
onion_message_interceptor,
323326
)
324327
.await
325328
}
@@ -349,6 +352,7 @@ where
349352
entropy_source: ES, node_signer: NS, channel_manager: CM, transaction_broadcaster: T,
350353
kv_store: K, service_config: Option<LiquidityServiceConfig>,
351354
client_config: Option<LiquidityClientConfig>, time_provider: TP,
355+
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
352356
) -> Result<Self, lightning::io::Error> {
353357
let pending_msgs_or_needs_persist_notifier = Arc::new(Notifier::new());
354358
let pending_messages =
@@ -391,6 +395,7 @@ where
391395
kv_store.clone(),
392396
transaction_broadcaster.clone(),
393397
lsps2_service_config.clone(),
398+
onion_message_interceptor.clone(),
394399
)?)
395400
} else {
396401
None
@@ -940,6 +945,7 @@ where
940945
entropy_source: ES, node_signer: NS, channel_manager: CM, kv_store_sync: KS,
941946
transaction_broadcaster: T, service_config: Option<LiquidityServiceConfig>,
942947
client_config: Option<LiquidityClientConfig>,
948+
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
943949
) -> Result<Self, lightning::io::Error> {
944950
let kv_store = KVStoreSyncWrapper(kv_store_sync);
945951

@@ -951,6 +957,7 @@ where
951957
transaction_broadcaster,
952958
service_config,
953959
client_config,
960+
onion_message_interceptor,
954961
));
955962

956963
let mut waker = dummy_waker();
@@ -986,6 +993,7 @@ where
986993
entropy_source: ES, node_signer: NS, channel_manager: CM, kv_store_sync: KS,
987994
transaction_broadcaster: T, service_config: Option<LiquidityServiceConfig>,
988995
client_config: Option<LiquidityClientConfig>, time_provider: TP,
996+
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
989997
) -> Result<Self, lightning::io::Error> {
990998
let kv_store = KVStoreSyncWrapper(kv_store_sync);
991999
let mut fut = pin!(LiquidityManager::new_with_custom_time_provider(
@@ -997,6 +1005,7 @@ where
9971005
service_config,
9981006
client_config,
9991007
time_provider,
1008+
onion_message_interceptor,
10001009
));
10011010

10021011
let mut waker = dummy_waker();

lightning-liquidity/tests/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>(
3636
Some(service_config),
3737
None,
3838
Arc::clone(&time_provider),
39+
None,
3940
)
4041
.unwrap();
4142

@@ -48,6 +49,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>(
4849
None,
4950
Some(client_config),
5051
time_provider,
52+
None,
5153
)
5254
.unwrap();
5355

lightning-liquidity/tests/lsps1_integration_tests.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ fn lsps1_service_handler_persistence_across_restarts() {
434434
Some(service_config),
435435
None,
436436
Arc::clone(&time_provider),
437+
None,
437438
)
438439
.unwrap();
439440

@@ -454,6 +455,7 @@ fn lsps1_service_handler_persistence_across_restarts() {
454455
None,
455456
Some(client_config),
456457
time_provider,
458+
None,
457459
)
458460
.unwrap();
459461

@@ -1087,6 +1089,7 @@ fn lsps1_expired_orders_are_pruned_and_not_persisted() {
10871089
Some(service_config),
10881090
None,
10891091
Arc::clone(&time_provider),
1092+
None,
10901093
)
10911094
.unwrap();
10921095

@@ -1106,6 +1109,7 @@ fn lsps1_expired_orders_are_pruned_and_not_persisted() {
11061109
None,
11071110
Some(client_config),
11081111
time_provider,
1112+
None,
11091113
)
11101114
.unwrap();
11111115

lightning-liquidity/tests/lsps2_integration_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,7 @@ fn lsps2_service_handler_persistence_across_restarts() {
10791079
Some(service_config),
10801080
None,
10811081
time_provider,
1082+
None,
10821083
)
10831084
.unwrap();
10841085

lightning-liquidity/tests/lsps5_integration_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,6 +1604,7 @@ fn lsps5_service_handler_persistence_across_restarts() {
16041604
Some(service_config),
16051605
None,
16061606
Arc::clone(&time_provider),
1607+
None,
16071608
)
16081609
.unwrap();
16091610

0 commit comments

Comments
 (0)