Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 251 additions & 1 deletion e2e-tests/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ldk_node::lightning::offers::offer::Offer;
use ldk_node::lightning_invoice::Bolt11Invoice;
use ldk_server_client::client::EventStream;
use ldk_server_client::ldk_server_grpc::api::{
Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest, OpenChannelRequest,
Bolt11ReceiveRequest, Bolt12ReceiveRequest, EventKind, OnchainReceiveRequest, OpenChannelRequest,
};
use ldk_server_client::ldk_server_grpc::events::event_envelope::Event;
use ldk_server_client::ldk_server_grpc::events::{
Expand Down Expand Up @@ -586,6 +586,256 @@ async fn test_subscribe_events_channel_state_lifecycle_pending_ready_closed() {
assert_eq!(closed_b.closure_initiator, ChannelClosureInitiator::Remote as i32);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_subscribe_events_no_filter() {
let bitcoind = TestBitcoind::new();
let server = LdkServerHandle::start(&bitcoind).await;

let addr = server.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr, 1.0);
mine_and_sync(&bitcoind, &[&server], 6).await;
wait_for_onchain_balance(server.client(), Duration::from_secs(30)).await;

// Subscribe with no filter => all events delivered
let mut events = server.client().subscribe_events().await.unwrap();

let invoice = server
.client()
.bolt11_receive(Bolt11ReceiveRequest {
amount_msat: Some(10_000),
description: Some(Bolt11InvoiceDescription {
description: Some("test".to_string()),
description_hash: None,
}),
expiry_secs: 3600,
})
.await
.unwrap();

// Trigger a payment from an external node
let payment_node = ldk_node::Builder::from_config(ldk_node::config::Config {
network: ldk_node::bitcoin::Network::Regtest,
..Default::default()
})
.set_chain_source_bitcoind_rpc(
bitcoind.rpc_host(),
bitcoind.rpc_port(),
bitcoind.rpc_user(),
bitcoind.rpc_password(),
)
.set_log_facade_logger()
.build(ldk_node::entropy::generate_entropy_mnemonic(None))
.unwrap();
payment_node.start().unwrap();
payment_node.sync_wallets().unwrap();
payment_node
.bolt11_payment()
.send(&invoice.invoice.parse().unwrap(), None, None)
.unwrap();

// Should receive a payment event (no filter = all events)
tokio::time::timeout(Duration::from_secs(15), async {
while let Some(Ok(ev)) = events.next_message().await {
if matches!(&ev.event, Some(Event::PaymentReceived(_))) {
return;
}
}
panic!("Event stream ended without PaymentReceived");
})
.await
.expect("Timed out waiting for PaymentReceived");

payment_node.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_subscribe_events_payment_only_filter() {
let bitcoind = TestBitcoind::new();
let server = LdkServerHandle::start(&bitcoind).await;

let addr = server.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr, 1.0);
mine_and_sync(&bitcoind, &[&server], 6).await;
wait_for_onchain_balance(server.client(), Duration::from_secs(30)).await;

// Subscribe with payment-only filter
let mut events = server.client().subscribe_events_filtered(&[EventKind::Payment]).await.unwrap();

let invoice = server
.client()
.bolt11_receive(Bolt11ReceiveRequest {
amount_msat: Some(10_000),
description: Some(Bolt11InvoiceDescription {
description: Some("test".to_string()),
description_hash: None,
}),
expiry_secs: 3600,
})
.await
.unwrap();

// Trigger a payment from an external node
let payment_node = ldk_node::Builder::from_config(ldk_node::config::Config {
network: ldk_node::bitcoin::Network::Regtest,
..Default::default()
})
.set_chain_source_bitcoind_rpc(
bitcoind.rpc_host(),
bitcoind.rpc_port(),
bitcoind.rpc_user(),
bitcoind.rpc_password(),
)
.set_log_facade_logger()
.build(ldk_node::entropy::generate_entropy_mnemonic(None))
.unwrap();
payment_node.start().unwrap();
payment_node.sync_wallets().unwrap();
payment_node
.bolt11_payment()
.send(&invoice.invoice.parse().unwrap(), None, None)
.unwrap();

// Should receive a payment event
tokio::time::timeout(Duration::from_secs(15), async {
while let Some(Ok(ev)) = events.next_message().await {
if matches!(&ev.event, Some(Event::PaymentReceived(_))) {
return;
}
}
panic!("Event stream ended without PaymentReceived");
})
.await
.expect("Timed out waiting for PaymentReceived");

payment_node.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_subscribe_events_channel_only_filter_receives_channel_events() {
let bitcoind = TestBitcoind::new();
let server_a = LdkServerHandle::start(&bitcoind).await;
let server_b = LdkServerHandle::start(&bitcoind).await;

let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr_a, 1.0);
bitcoind.fund_address(&addr_b, 0.1);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;

// Subscribe with channel-only filter
let mut events_a = server_a.client().subscribe_events_filtered(&[EventKind::Channel]).await.unwrap();
let mut events_b = server_b.client().subscribe_events_filtered(&[EventKind::Channel]).await.unwrap();

let open_resp = server_a
.client()
.open_channel(OpenChannelRequest {
node_pubkey: server_b.node_id().to_string(),
address: format!("127.0.0.1:{}", server_b.p2p_port),
channel_amount_sats: 100_000,
push_to_counterparty_msat: None,
channel_config: None,
announce_channel: true,
disable_counterparty_reserve: false,
})
.await
.unwrap();

let pending_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
assert!(matches!(
pending_a.event,
Some(Event::ChannelStateChanged(_))
));

// Drain remaining events and verify none are payment events
tokio::time::timeout(Duration::from_secs(3), async {
loop {
match events_a.next_message().await {
Some(Ok(ev)) => {
assert!(!matches!(&ev.event, Some(Event::PaymentReceived(_))));
assert!(!matches!(&ev.event, Some(Event::PaymentSuccessful(_))));
assert!(!matches!(&ev.event, Some(Event::PaymentFailed(_))));
},
_ => break,
}
}
})
.ok();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_subscribe_events_multiple_filters() {
let bitcoind = TestBitcoind::new();
let server_a = LdkServerHandle::start(&bitcoind).await;
let server_b = LdkServerHandle::start(&bitcoind).await;

let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr_a, 1.0);
bitcoind.fund_address(&addr_b, 0.1);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;

// Subscribe with both payment and channel filters
let mut events_a = server_a.client()
.subscribe_events_filtered(&[EventKind::Payment, EventKind::Channel])
.await
.unwrap();
let mut events_b = server_b.client()
.subscribe_events_filtered(&[EventKind::Payment, EventKind::Channel])
.await
.unwrap();

let open_resp = server_a
.client()
.open_channel(OpenChannelRequest {
node_pubkey: server_b.node_id().to_string(),
address: format!("127.0.0.1:{}", server_b.p2p_port),
channel_amount_sats: 100_000,
push_to_counterparty_msat: None,
channel_config: None,
announce_channel: true,
disable_counterparty_reserve: false,
})
.await
.unwrap();

// We should receive channel events with multiple filters
let pending_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
assert!(matches!(
pending_a.event,
Some(Event::ChannelStateChanged(_))
));

// Also verify server_b receives the channel event
let pending_b = wait_for_event(&mut events_b, |e| {
matches!(e, Event::ChannelStateChanged(_))
})
.await;
assert!(matches!(
pending_b.event,
Some(Event::ChannelStateChanged(_))
));
}

#[tokio::test]
async fn test_subscribe_events_channel_state_lifecycle_pending_ready_force_closed() {
let bitcoind = TestBitcoind::new();
Expand Down
22 changes: 19 additions & 3 deletions ldk-server-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use ldk_server_grpc::api::{
Bolt12ReceiveRequest, Bolt12ReceiveResponse, Bolt12SendRequest, Bolt12SendResponse,
CloseChannelRequest, CloseChannelResponse, ConnectPeerRequest, ConnectPeerResponse,
DecodeInvoiceRequest, DecodeInvoiceResponse, DecodeOfferRequest, DecodeOfferResponse,
DisconnectPeerRequest, DisconnectPeerResponse, ExportPathfindingScoresRequest,
DisconnectPeerRequest, DisconnectPeerResponse, EventKind, ExportPathfindingScoresRequest,
ExportPathfindingScoresResponse, ForceCloseChannelRequest, ForceCloseChannelResponse,
GetBalancesRequest, GetBalancesResponse, GetNodeInfoRequest, GetNodeInfoResponse,
GetPaymentDetailsRequest, GetPaymentDetailsResponse, GraphGetChannelRequest,
Expand Down Expand Up @@ -424,11 +424,27 @@ impl LdkServerClient {
self.grpc_unary(&request, GRAPH_GET_NODE_PATH).await
}

/// Subscribe to a stream of server events via server-streaming gRPC.
/// Subscribe to a stream of all server events via server-streaming gRPC.
///
/// Returns an [`EventStream`] that yields [`EventEnvelope`] messages as they arrive.
/// All event types are delivered.
pub async fn subscribe_events(&self) -> Result<EventStream, LdkServerError> {
self.grpc_server_streaming(&SubscribeEventsRequest {}, SUBSCRIBE_EVENTS_PATH).await
self.subscribe_events_filtered(&[]).await
}

/// Subscribe to a filtered stream of server events via server-streaming gRPC.
///
/// Returns an [`EventStream`] that yields [`EventEnvelope`] messages as they arrive.
///
/// If `event_kinds` is empty, all events are delivered (backward compatible).
/// If non-empty, only events matching the given kinds are delivered.
pub async fn subscribe_events_filtered(
&self, event_kinds: &[EventKind],
) -> Result<EventStream, LdkServerError> {
let request = SubscribeEventsRequest {
event_kinds: event_kinds.iter().map(|k| *k as i32).collect(),
};
self.grpc_server_streaming(&request, SUBSCRIBE_EVENTS_PATH).await
}

/// Send a unary gRPC request and decode the response.
Expand Down
3 changes: 3 additions & 0 deletions ldk-server-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub mod error;
/// Request/Response structs required for interacting with the client.
pub use ldk_server_grpc;

/// Re-export EventKind for convenient use by consumers.
pub use ldk_server_grpc::api::EventKind;

/// Default maximum total CLTV expiry delta for payment routing.
pub const DEFAULT_MAX_TOTAL_CLTV_EXPIRY_DELTA: u32 = 1008;
/// Default maximum number of payment paths.
Expand Down
47 changes: 46 additions & 1 deletion ldk-server-grpc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1233,10 +1233,55 @@ pub struct DecodeOfferResponse {
#[prost(bool, tag = "12")]
pub is_expired: bool,
}
// NOTE: This section (EventKind + SubscribeEventsRequest) is hand-synced with
// ldk-server-grpc/src/proto/api.proto. If you modify the proto definition, re-run
// RUSTFLAGS="--cfg genproto" cargo build -p ldk-server-grpc
// to regenerate this file. Manual edits will be overwritten.
Comment on lines +1236 to +1239

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you doing this? This file is supposed to be generated


/// EventKind represents the category of events a client can subscribe to.
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum EventKind {
/// Default value, not used for filtering.
Unspecified = 0,
/// Payment events (received, successful, failed, forwarded, claimable).
Payment = 1,
/// Channel lifecycle events (pending, ready, closed).
Channel = 2,
}
impl EventKind {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
EventKind::Unspecified => "EVENT_KIND_UNSPECIFIED",
EventKind::Payment => "EVENT_KIND_PAYMENT",
EventKind::Channel => "EVENT_KIND_CHANNEL",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"EVENT_KIND_UNSPECIFIED" => Some(Self::Unspecified),
"EVENT_KIND_PAYMENT" => Some(Self::Payment),
"EVENT_KIND_CHANNEL" => Some(Self::Channel),
_ => None,
}
}
}
/// Subscribe to a stream of server events.
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))]
#[cfg_attr(feature = "serde", serde(default))]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeEventsRequest {}
pub struct SubscribeEventsRequest {
/// If empty, all events are subscribed to (backward-compatible default).
/// If non-empty, only events matching these kinds will be delivered.
#[prost(enumeration = "EventKind", repeated, tag = "1")]
pub event_kinds: ::prost::alloc::vec::Vec<i32>,
}
16 changes: 15 additions & 1 deletion ldk-server-grpc/src/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,22 @@ message DecodeOfferResponse {
bool is_expired = 12;
}

// EventKind represents the category of events a client can subscribe to.
enum EventKind {
// Default value, not used for filtering.
EVENT_KIND_UNSPECIFIED = 0;
// Payment events (received, successful, failed, forwarded, claimable).
EVENT_KIND_PAYMENT = 1;
// Channel lifecycle events (pending, ready, closed).
EVENT_KIND_CHANNEL = 2;
}

// Subscribe to a stream of server events.
message SubscribeEventsRequest {}
message SubscribeEventsRequest {
// If empty, all events are subscribed to (backward-compatible default).
// If non-empty, only events matching these kinds will be delivered.
repeated EventKind event_kinds = 1;
}

service LightningNode {
// Retrieve the latest node info.
Expand Down
Loading