Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 71 additions & 35 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,14 @@ impl PeerState {
self.needs_persist |= true;
}

fn remove_outbound_channel(&mut self, intercept_scid: u64) {
if self.outbound_channels_by_intercept_scid.remove(&intercept_scid).is_some() {
self.intercept_scid_by_user_channel_id.retain(|_, scid| *scid != intercept_scid);
self.intercept_scid_by_channel_id.retain(|_, scid| *scid != intercept_scid);
self.needs_persist |= true;
}
}

fn prune_pending_requests(&mut self) {
self.pending_requests.retain(|_, entry| {
match entry {
Expand Down Expand Up @@ -988,19 +996,19 @@ where
payment_hash: PaymentHash,
) -> Result<(), APIError> {
let event_queue_notifier = self.pending_events.notifier();
let mut should_persist = None;
let should_persist;

if let Some(counterparty_node_id) =
self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid)
{
let counterparty_node_id =
self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid).copied();
if let Some(counterparty_node_id) = counterparty_node_id {
let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
match outer_state_lock.get(&counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state = inner_state_lock.lock().unwrap();
if let Some(jit_channel) =
peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
{
should_persist = Some(*counterparty_node_id);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need all these changes from reference to copy suddenly?

should_persist = Some(counterparty_node_id);
let htlc = InterceptedHTLC {
intercept_id,
expected_outbound_amount_msat,
Expand All @@ -1009,7 +1017,7 @@ where
match jit_channel.htlc_intercepted(htlc) {
Ok(Some(HTLCInterceptedAction::OpenChannel(open_channel_params))) => {
let event = LSPS2ServiceEvent::OpenChannel {
their_network_key: counterparty_node_id.clone(),
their_network_key: counterparty_node_id,
amt_to_forward_msat: open_channel_params.amt_to_forward_msat,
opening_fee_msat: open_channel_params.opening_fee_msat,
user_channel_id: jit_channel.user_channel_id,
Expand All @@ -1021,7 +1029,7 @@ where
self.channel_manager.get_cm().forward_intercepted_htlc(
intercept_id,
&channel_id,
*counterparty_node_id,
counterparty_node_id,
expected_outbound_amount_msat,
)?;
},
Expand All @@ -1038,7 +1046,7 @@ where
self.channel_manager.get_cm().forward_intercepted_htlc(
intercept_id,
&channel_id,
*counterparty_node_id,
counterparty_node_id,
amount_to_forward_msat,
)?;
}
Expand All @@ -1048,13 +1056,36 @@ where
self.channel_manager
.get_cm()
.fail_intercepted_htlc(intercept_id)?;
peer_state
.outbound_channels_by_intercept_scid
peer_state.remove_outbound_channel(intercept_scid);
let err = e.err;
drop(peer_state);
drop(outer_state_lock);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these explicit drops?

self.peer_by_intercept_scid
.write()
.unwrap()
.remove(&intercept_scid);
Comment thread
joostjager marked this conversation as resolved.
// TODO: cleanup peer_by_intercept_scid
return Err(APIError::APIMisuseError { err: e.err });
if let Err(e) = self.persist_peer_state(counterparty_node_id).await

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to eagerly persist here?

{
return Err(APIError::APIMisuseError {
err: format!(
"Failed to persist peer state for {} after HTLC interception failed with '{}': {}",
counterparty_node_id, err, e
),
});
}
return Err(APIError::APIMisuseError { err });
},
}
} else {
// `peer_by_intercept_scid` is a separate index from the per-peer

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the 'per-peer channel map'? In which case would pruning leave an entry in peer_by_intercept_scid?

// channel map, so pruning can leave the SCID pointing here after
// the channel entry was removed.
drop(peer_state);
drop(outer_state_lock);
self.peer_by_intercept_scid.write().unwrap().remove(&intercept_scid);
Comment thread
joostjager marked this conversation as resolved.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two notes on this branch:

  1. Correction to my earlier deadlock comment: I previously flagged peer_by_intercept_scid.write() here as deadlocking against the if let read guard. That is not the case in this code — the guard is created in the separate let statement at line 1002 (...read().unwrap().get(&intercept_scid).copied()), so it is dropped at that statement's ;, well before this block runs. There is no held read guard here. Please disregard the deadlock concern.

  2. Genuine (minor) issue: this branch is reachable — prune_expired_request_state removes entries from outbound_channels_by_intercept_scid but leaves the SCID in the global peer_by_intercept_scid, so an HTLC can arrive for a stale SCID and land here. Unlike the Err path above (which calls fail_intercepted_htlc), this branch returns an error without failing the held HTLC. The HTLC then stays stuck in ChannelManager until CLTV timeout. Since LSPS2 did recognize the SCID, consider failing it back here too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnull what do you think of 2, should we fail back because it did recognize the scid?

return Err(APIError::APIMisuseError {
err: format!("No channel found for scid: {}", intercept_scid),
});
}

peer_state.needs_persist |= should_persist.is_some();
Expand Down Expand Up @@ -1257,6 +1288,8 @@ where
/// This removes the intercept SCID, any outbound channel state, and associated
/// channel‐ID mappings for the specified `user_channel_id`, but only while no payment
/// has been forwarded yet and no channel has been opened on-chain.
/// Any held HTLCs for the pending flow are failed backwards before the local state
/// is removed.
///
/// Returns an error if:
/// - there is no channel matching `user_channel_id`, or
Expand All @@ -1270,7 +1303,7 @@ where
pub async fn channel_open_abandoned(
&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
) -> Result<(), APIError> {
{
let intercept_scid = {
let outer_state_lock = self.per_peer_state.read().unwrap();
let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
APIError::APIMisuseError {
Expand All @@ -1292,32 +1325,35 @@ where

let jit_channel = peer_state
.outbound_channels_by_intercept_scid
.get(&intercept_scid)
.get_mut(&intercept_scid)
.ok_or_else(|| APIError::APIMisuseError {
err: format!(
"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
intercept_scid, user_channel_id,
),
})?;
err: format!(
"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
intercept_scid, user_channel_id,
),
})?;

let is_pending = matches!(
jit_channel.state,
OutboundJITChannelState::PendingInitialPayment { .. }
| OutboundJITChannelState::PendingChannelOpen { .. }
);
let intercepted_htlcs = match &mut jit_channel.state {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need all this seemingly unrelated refactoring?

OutboundJITChannelState::PendingInitialPayment { payment_queue }
| OutboundJITChannelState::PendingChannelOpen { payment_queue, .. } => payment_queue.clear(),
_ => {
return Err(APIError::APIMisuseError {
err: "Cannot abandon channel open after channel creation or payment forwarding"
.to_string(),
});
},
};

if !is_pending {
return Err(APIError::APIMisuseError {
err: "Cannot abandon channel open after channel creation or payment forwarding"
.to_string(),
});
for htlc in intercepted_htlcs {
// A missing intercept has already been released; still remove the LSPS2 state.
let _ = self.channel_manager.get_cm().fail_intercepted_htlc(htlc.intercept_id);
}

peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id);
peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
peer_state.needs_persist |= true;
}
peer_state.remove_outbound_channel(intercept_scid);
intercept_scid
};

self.peer_by_intercept_scid.write().unwrap().remove(&intercept_scid);

self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
APIError::APIMisuseError {
Expand Down
175 changes: 175 additions & 0 deletions lightning-liquidity/tests/lsps2_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,181 @@ fn channel_open_abandoned() {
assert!(result.is_err());
}

fn intercept_htlc_to_scid(
lsps_nodes: &LSPSNodesWithPayer, intercept_scid: u64, cltv_expiry_delta: u32,
payment_size_msat: Option<u64>,
) -> (InterceptId, PaymentHash, u64) {
let service_node = &lsps_nodes.service_node;
let client_node = &lsps_nodes.client_node;
let payer_node = &lsps_nodes.payer_node;

let payer_node_id = payer_node.node.get_our_node_id();
let service_node_id = service_node.inner.node.get_our_node_id();

let invoice = create_jit_invoice(
client_node,
service_node_id,
intercept_scid,
cltv_expiry_delta,
payment_size_msat,
"intercept-htlc",
3600,
)
.unwrap();

payer_node
.node
.pay_for_bolt11_invoice(
&invoice,
PaymentId(invoice.payment_hash().0),
None,
OptionalBolt11PaymentParams::default(),
)
.unwrap();

check_added_monitors(payer_node, 1);
let events = payer_node.node.get_and_clear_pending_msg_events();
let ev = SendEvent::from_event(events[0].clone());
service_node.inner.node.handle_update_add_htlc(payer_node_id, &ev.msgs[0]);
do_commitment_signed_dance(&service_node.inner, payer_node, &ev.commitment_msg, false, true);
service_node.inner.node.process_pending_htlc_forwards();

let events = service_node.inner.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match &events[0] {
Event::HTLCIntercepted {
intercept_id,
requested_next_hop_scid,
payment_hash,
expected_outbound_amount_msat,
..
} => {
assert_eq!(*requested_next_hop_scid, intercept_scid);
(*intercept_id, *payment_hash, *expected_outbound_amount_msat)
},
other => panic!("Expected HTLCIntercepted, got {:?}", other),
}
}

#[test]
fn channel_open_abandoned_releases_intercepted_htlcs() {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let mut service_node_config = test_default_channel_config();
service_node_config.htlc_interception_flags = HTLCInterceptionFlags::ToInterceptSCIDs as u8;

let node_chanmgrs =
create_node_chanmgrs(3, &node_cfgs, &[Some(service_node_config), None, None]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
let (lsps_nodes, promise_secret) = setup_test_lsps2_nodes_with_payer(nodes);
let LSPSNodesWithPayer { ref service_node, ref client_node, ref payer_node } = lsps_nodes;

let client_node_id = client_node.inner.node.get_our_node_id();
let service_handler = service_node.liquidity_manager.lsps2_service_handler().unwrap();
create_chan_between_nodes_with_value(&payer_node, &service_node.inner, 2_000_000, 100_000);

let intercept_scid = service_node.node.get_intercept_scid();
let user_channel_id = 42u128;
let cltv_expiry_delta: u32 = 144;
let payment_size_msat = Some(1_000_000);
let fee_base_msat: u64 = 1_000;

execute_lsps2_dance(
&lsps_nodes,
intercept_scid,
user_channel_id,
cltv_expiry_delta,
promise_secret,
payment_size_msat,
fee_base_msat,
);

let (intercept_id, payment_hash, expected_outbound_amount_msat) =
intercept_htlc_to_scid(&lsps_nodes, intercept_scid, cltv_expiry_delta, payment_size_msat);
service_handler
.htlc_intercepted(intercept_scid, intercept_id, expected_outbound_amount_msat, payment_hash)
.unwrap();

match service_node.liquidity_manager.next_event().unwrap() {
LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::OpenChannel { .. }) => {},
other => panic!("Unexpected event: {:?}", other),
};

service_handler.channel_open_abandoned(&client_node_id, user_channel_id).unwrap();

// The abandon path should have already released the held intercept.
let res = service_node.inner.node.fail_intercepted_htlc(intercept_id);
assert!(
res.is_err(),
"channel_open_abandoned must release the intercepted HTLC via fail_intercepted_htlc, but the entry is still pending: {:?}",
res,
);

let events = service_node.inner.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match &events[0] {
Event::HTLCHandlingFailed {
failure_type: HTLCHandlingFailureType::InvalidForward { requested_forward_scid },
..
} => assert_eq!(*requested_forward_scid, intercept_scid),
other => panic!("Expected HTLCHandlingFailed, got {:?}", other),
}
}

#[test]
fn htlc_intercepted_unknown_scid_does_not_fail_htlc() {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let mut service_node_config = test_default_channel_config();
service_node_config.htlc_interception_flags = HTLCInterceptionFlags::ToInterceptSCIDs as u8;

let node_chanmgrs =
create_node_chanmgrs(3, &node_cfgs, &[Some(service_node_config), None, None]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
let (lsps_nodes, _) = setup_test_lsps2_nodes_with_payer(nodes);
let LSPSNodesWithPayer { ref service_node, ref payer_node, .. } = lsps_nodes;

let service_handler = service_node.liquidity_manager.lsps2_service_handler().unwrap();
create_chan_between_nodes_with_value(&payer_node, &service_node.inner, 2_000_000, 100_000);

let unknown_intercept_scid = service_node.node.get_intercept_scid();
let cltv_expiry_delta: u32 = 144;
let payment_size_msat = Some(1_000_000);
let (intercept_id, payment_hash, expected_outbound_amount_msat) = intercept_htlc_to_scid(
&lsps_nodes,
unknown_intercept_scid,
cltv_expiry_delta,
payment_size_msat,
);

// ChannelManager is holding a real intercept, but LSPS2 never registered this SCID.
let result = service_handler.htlc_intercepted(
unknown_intercept_scid,
intercept_id,
expected_outbound_amount_msat,
payment_hash,
);
assert!(result.is_err());
match result.unwrap_err() {
APIError::APIMisuseError { err } => {
assert!(err.contains("Unknown scid provided"));
},
other => panic!("Unexpected error type: {:?}", other),
}

// LSPS2 must leave HTLCs for unknown SCIDs available to other intercept handlers.
service_node.inner.node.fail_intercepted_htlc(intercept_id).unwrap();
let events = service_node.inner.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match &events[0] {
Event::HTLCHandlingFailed {
failure_type: HTLCHandlingFailureType::InvalidForward { requested_forward_scid },
..
} => assert_eq!(*requested_forward_scid, unknown_intercept_scid),
other => panic!("Expected HTLCHandlingFailed, got {:?}", other),
}
}

#[test]
fn channel_open_abandoned_nonexistent_channel() {
let chanmon_cfgs = create_chanmon_cfgs(2);
Expand Down