From 897cb250c8f2bc2ecc62b9a1de6512f2a77fb191 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Mon, 15 Jun 2026 11:29:21 -0700 Subject: [PATCH 1/5] Docs: replace non-existent WaitForExternalEvent API with OnEvent pattern DTFx has no WaitForExternalEvent helper. Update the docs samples and API references to use the real OnEvent + TaskCompletionSource pattern, and add a FIFO job queue sample to the eternal orchestrations guide. --- docs/concepts/deterministic-constraints.md | 1 - docs/concepts/orchestrations.md | 105 ++++++++----- docs/features/eternal-orchestrations.md | 166 ++++++++++++++++++--- docs/features/timers.md | 64 +++++--- docs/getting-started/quickstart.md | 1 - 5 files changed, 249 insertions(+), 88 deletions(-) diff --git a/docs/concepts/deterministic-constraints.md b/docs/concepts/deterministic-constraints.md index 8ef4ff3b3..7f22296d1 100644 --- a/docs/concepts/deterministic-constraints.md +++ b/docs/concepts/deterministic-constraints.md @@ -10,7 +10,6 @@ Durable operations include: - `ScheduleTask` / `ScheduleWithRetry` - `CreateTimer` -- `WaitForExternalEvent` - `CreateSubOrchestrationInstance` - `ContinueAsNew` diff --git a/docs/concepts/orchestrations.md b/docs/concepts/orchestrations.md index 148386e0a..f2a4cfa64 100644 --- a/docs/concepts/orchestrations.md +++ b/docs/concepts/orchestrations.md @@ -79,28 +79,40 @@ await context.CreateTimer(context.CurrentUtcDateTime.AddMinutes(5), true); ### Waiting for External Events -Orchestrations can pause and wait for external events sent from client code or other orchestrations. +Orchestrations can pause and wait for external events sent from client code or other orchestrations. This is done by overriding `OnEvent` and completing a `TaskCompletionSource`. See [External Events](../features/external-events.md) for the full pattern. ```csharp -// Wait indefinitely for an event -var approvalData = await context.WaitForExternalEvent("ApprovalReceived"); +public class ApprovalOrchestration : TaskOrchestration +{ + TaskCompletionSource approvalHandle; -// Wait with timeout -using var cts = new CancellationTokenSource(); -var timerTask = context.CreateTimer(context.CurrentUtcDateTime.AddDays(1), true, cts.Token); -var eventTask = context.WaitForExternalEvent("ApprovalReceived"); + public override async Task RunTask(OrchestrationContext context, string input) + { + this.approvalHandle = new TaskCompletionSource(); + var eventTask = this.approvalHandle.Task; -var winner = await Task.WhenAny(timerTask, eventTask); -if (winner == eventTask) -{ - // Timer cancelled since event was received (this is important) - cts.Cancel(); - var approval = await eventTask; - // Process approval -} -else -{ - // Timeout - escalate or reject + // Wait for the event with a 1-day timeout + using var cts = new CancellationTokenSource(); + var timerTask = context.CreateTimer(context.CurrentUtcDateTime.AddDays(1), true, cts.Token); + + var winner = await Task.WhenAny(timerTask, eventTask); + if (winner == eventTask) + { + cts.Cancel(); // Cancel the timer since the event was received (this is important) + return await eventTask; // Process approval + } + + // Timeout - escalate or reject + return null; + } + + public override void OnEvent(OrchestrationContext context, string name, string input) + { + if (name == "ApprovalReceived") + { + this.approvalHandle?.SetResult(context.MessageDataConverter.Deserialize(input)); + } + } } ``` @@ -162,30 +174,45 @@ public override async Task RunTask(OrchestrationContext context, int[] in ### Human Interaction ```csharp -public override async Task RunTask( - OrchestrationContext context, - ApprovalRequest request) +public class ApprovalOrchestration : TaskOrchestration { - // Send notification to approver - await context.ScheduleTask(typeof(SendApprovalRequestActivity), request); - - // Wait for approval with timeout - using var cts = new CancellationTokenSource(); - var approvalTask = context.WaitForExternalEvent("Approved"); - var timeoutTask = context.CreateTimer( - context.CurrentUtcDateTime.AddDays(7), - true, - cts.Token); - - var winner = await Task.WhenAny(approvalTask, timeoutTask); - - if (winner == approvalTask) + TaskCompletionSource approvalHandle; + + public override async Task RunTask( + OrchestrationContext context, + ApprovalRequest request) + { + // Send notification to approver + await context.ScheduleTask(typeof(SendApprovalRequestActivity), request); + + // Wait for approval with timeout + this.approvalHandle = new TaskCompletionSource(); + var approvalTask = this.approvalHandle.Task; + + using var cts = new CancellationTokenSource(); + var timeoutTask = context.CreateTimer( + context.CurrentUtcDateTime.AddDays(7), + true, + cts.Token); + + var winner = await Task.WhenAny(approvalTask, timeoutTask); + + if (winner == approvalTask) + { + cts.Cancel(); + return new ApprovalResult { Approved = await approvalTask }; + } + + return new ApprovalResult { Approved = false, TimedOut = true }; + } + + public override void OnEvent(OrchestrationContext context, string name, string input) { - cts.Cancel(); - return new ApprovalResult { Approved = await approvalTask }; + if (name == "Approved") + { + this.approvalHandle?.SetResult(context.MessageDataConverter.Deserialize(input)); + } } - - return new ApprovalResult { Approved = false, TimedOut = true }; } ``` diff --git a/docs/features/eternal-orchestrations.md b/docs/features/eternal-orchestrations.md index 5e2747ade..990c43202 100644 --- a/docs/features/eternal-orchestrations.md +++ b/docs/features/eternal-orchestrations.md @@ -164,6 +164,9 @@ public override async Task RunTask( ```csharp public class AggregatorOrchestration : TaskOrchestration { + // Completed by OnEvent when a "NewData" event arrives + TaskCompletionSource dataHandle; + public override async Task RunTask( OrchestrationContext context, AggregatorState state) @@ -172,8 +175,10 @@ public class AggregatorOrchestration : TaskOrchestration(); + var eventTask = this.dataHandle.Task; + using var cts = new CancellationTokenSource(); - var eventTask = context.WaitForExternalEvent("NewData"); var saveTask = context.CreateTimer( context.CurrentUtcDateTime.AddMinutes(5), true, @@ -211,6 +216,14 @@ public class AggregatorOrchestration : TaskOrchestration(input)); + } + } } ``` @@ -257,36 +270,143 @@ public override async Task RunTask( } ``` +## FIFO Job Queues + +You can use orchestrations to implement FIFO job queues. Use one orchestration instance per logical "queue" to serialize jobs. Because an orchestration runs its own logic sequentially, only one job per queue is ever active and jobs run in FIFO order, while different queues run in parallel. Incoming jobs are buffered via the [External Events](external-events.md) pattern. `ContinueAsNew` carries any unprocessed jobs forward so none are lost when history is reset. + +```csharp +// One instance per resource ID applies each update to a primary store then a replica, +// strictly in FIFO order. Expected event: "Enqueue" (ResourceUpdate). +public class ResourceUpdateQueueOrchestration : TaskOrchestration +{ + const int MaxUpdatesPerGeneration = 100; // Bound history; ContinueAsNew after this many. + + // Rebuilt deterministically on replay, since OnEvent replays in original enqueue order. + readonly Queue inbox = new Queue(); + TaskCompletionSource newWork; // Wakes RunTask when an update arrives while parked. + + public override async Task RunTask(OrchestrationContext context, QueueState state) + { + // Re-seed with updates carried over from the previous generation (oldest first, so + // they stay ahead of any newly arriving events and FIFO order is preserved). + foreach (ResourceUpdate update in state?.Backlog ?? Enumerable.Empty()) + { + this.inbox.Enqueue(update); + } + + // Block until the first update arrives. + if (this.inbox.Count == 0) + { + this.newWork = new TaskCompletionSource(); + await this.newWork.Task; + this.newWork = null; + } + + // Drain in FIFO order. Limit the number processed per generation to bound history size; if there are more, we'll pick up the rest in the next generation. + for (int processed = 0; this.inbox.Count > 0 && processed < MaxUpdatesPerGeneration; processed++) + { + ResourceUpdate next = this.inbox.Dequeue(); + await context.ScheduleTask(typeof(UpdatePrimaryStoreActivity), next); + await context.ScheduleTask(typeof(UpdateReplicaStoreActivity), next); + } + + // Reset history, carrying any updates that arrived while we were busy. + context.ContinueAsNew(new QueueState { Backlog = this.inbox.ToArray() }); + return null; + } + + public override void OnEvent(OrchestrationContext context, string name, string input) + { + if (name == "Enqueue") + { + this.inbox.Enqueue(context.MessageDataConverter.Deserialize(input)); + this.newWork?.TrySetResult(true); + } + } +} + +public class QueueState +{ + // Updates carried over from the previous generation, oldest first. + public ResourceUpdate[] Backlog { get; set; } = Array.Empty(); +} +``` + +Producers enqueue by sending an `Enqueue` event to the per-resource instance. The first update atomically creates the queue and delivers the event, so there's no window where it could be lost: + +```csharp +static string InstanceIdFor(string resourceId) => $"resource-update-queue:{resourceId}"; + +public static async Task EnqueueAsync(TaskHubClient client, ResourceUpdate update) +{ + try + { + // First update: atomically create the queue and deliver the update. + await client.CreateOrchestrationInstanceWithRaisedEventAsync( + typeof(ResourceUpdateQueueOrchestration), + InstanceIdFor(update.ResourceId), + new QueueState(), + eventName: "Enqueue", + eventData: update); + } + catch (OrchestrationAlreadyExistsException) + { + // Queue already running for this resource ID; just append the update. + await client.RaiseEventAsync( + new OrchestrationInstance { InstanceId = InstanceIdFor(update.ResourceId) }, + "Enqueue", + update); + } +} +``` + +> [!NOTE] +> On Azure Storage (the default `Carryover` behavior), events that arrive during the brief `ContinueAsNew` transition are automatically re-delivered to the next generation, so nothing is lost. The Service Fabric provider instead uses `Ignore` and drops them — there, treat the producer as the source of truth and re-drive any unacknowledged update (using `UpdateId` for idempotency). + ## Graceful Termination ### Using External Events ```csharp -public override async Task RunTask( - OrchestrationContext context, - Config config) +public class WorkerOrchestration : TaskOrchestration { - using var cts = new CancellationTokenSource(); - - // Check for stop signal - Task stopTask = context.WaitForExternalEvent("Stop"); - Task workTask = DoWorkAsync(context, config); - Task timerTask = context.CreateTimer( - context.CurrentUtcDateTime.AddMinutes(1), - true, - cts.Token); - - Task winner = await Task.WhenAny(stopTask, workTask, timerTask); - cts.Cancel(); - - if (winner == stopTask) + // Completed by OnEvent when a "Stop" event arrives. + TaskCompletionSource stopHandle; + + public override async Task RunTask( + OrchestrationContext context, + Config config) { - // Graceful shutdown - return new Result { StoppedGracefully = true }; + this.stopHandle = new TaskCompletionSource(); + Task stopTask = this.stopHandle.Task; + + using var cts = new CancellationTokenSource(); + Task workTask = DoWorkAsync(context, config); + Task timerTask = context.CreateTimer( + context.CurrentUtcDateTime.AddMinutes(1), + true, + cts.Token); + + Task winner = await Task.WhenAny(stopTask, workTask, timerTask); + cts.Cancel(); + + if (winner == stopTask) + { + // Graceful shutdown + return new Result { StoppedGracefully = true }; + } + + context.ContinueAsNew(config); + return null; + } + + public override void OnEvent(OrchestrationContext context, string name, string input) + { + if (name == "Stop") + { + this.stopHandle?.SetResult(true); + } } - - context.ContinueAsNew(config); - return null; } ``` diff --git a/docs/features/timers.md b/docs/features/timers.md index d413947fd..ad0190a5a 100644 --- a/docs/features/timers.md +++ b/docs/features/timers.md @@ -67,33 +67,49 @@ public override async Task RunTask(OrchestrationContext context, Input i ### Approval with Deadline ```csharp -public override async Task RunTask( - OrchestrationContext context, - ApprovalRequest request) +public class ApprovalOrchestration : TaskOrchestration { - // Send approval request - await context.ScheduleTask(typeof(SendApprovalEmail), request); - - using var cts = new CancellationTokenSource(); - - // Wait for approval event or 7-day timeout - var approvalTask = context.WaitForExternalEvent("Approved"); - var deadlineTask = context.CreateTimer( - context.CurrentUtcDateTime.AddDays(7), - true, - cts.Token); - - var winner = await Task.WhenAny(approvalTask, deadlineTask); - - if (winner == approvalTask) + // Completed by OnEvent when an "Approved" event arrives + TaskCompletionSource approvalHandle; + + public override async Task RunTask( + OrchestrationContext context, + ApprovalRequest request) { - cts.Cancel(); - var approved = await approvalTask; - return new ApprovalResult { Approved = approved }; + // Send approval request + await context.ScheduleTask(typeof(SendApprovalEmail), request); + + this.approvalHandle = new TaskCompletionSource(); + var approvalTask = this.approvalHandle.Task; + + using var cts = new CancellationTokenSource(); + + // Wait for approval event or 7-day timeout + var deadlineTask = context.CreateTimer( + context.CurrentUtcDateTime.AddDays(7), + true, + cts.Token); + + var winner = await Task.WhenAny(approvalTask, deadlineTask); + + if (winner == approvalTask) + { + cts.Cancel(); + var approved = await approvalTask; + return new ApprovalResult { Approved = approved }; + } + else + { + return new ApprovalResult { Approved = false, Expired = true }; + } } - else + + public override void OnEvent(OrchestrationContext context, string name, string input) { - return new ApprovalResult { Approved = false, Expired = true }; + if (name == "Approved") + { + this.approvalHandle?.SetResult(context.MessageDataConverter.Deserialize(input)); + } } } ``` @@ -257,7 +273,7 @@ await context.CreateTimer(DateTime.UtcNow.AddMinutes(5), true); ```csharp using var cts = new CancellationTokenSource(); var timer = context.CreateTimer(deadline, true, cts.Token); -var work = context.WaitForExternalEvent("Event"); +var work = this.eventHandle.Task; // Completed by OnEvent; see External Events var winner = await Task.WhenAny(timer, work); if (winner == work) diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index e4f74c959..ade061b43 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -157,7 +157,6 @@ await context.ScheduleTask(typeof(GreetActivity), input); - `ScheduleTask` — Schedule an activity - `CreateSubOrchestrationInstance` — Start a sub-orchestration - `CreateTimer` — Create a durable timer -- `WaitForExternalEvent` — Wait for an external event ### TaskHubWorker and TaskHubClient From d8c44a550d517040abce9e07899ccbd1d5fa1b64 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Mon, 15 Jun 2026 15:58:15 -0700 Subject: [PATCH 2/5] Fix external event samples: typed OnEvent payloads and TrySetResult Address Copilot review feedback on PR #1368. Use the 4-parameter TaskOrchestration base so the framework deserializes event payloads and OnEvent receives a typed value (the prior string + manual MessageDataConverter.Deserialize pattern would throw for object payloads). Use TrySetResult instead of SetResult for idempotency against duplicate/late events, return default instead of null for generic results, and make the Cancel Unused Timers snippet self-contained. --- docs/concepts/orchestrations.md | 18 ++++++---- docs/features/eternal-orchestrations.md | 24 +++++++++----- docs/features/external-events.md | 44 ++++++++++++------------- docs/features/timers.md | 10 +++--- 4 files changed, 54 insertions(+), 42 deletions(-) diff --git a/docs/concepts/orchestrations.md b/docs/concepts/orchestrations.md index f2a4cfa64..05e1c29be 100644 --- a/docs/concepts/orchestrations.md +++ b/docs/concepts/orchestrations.md @@ -82,7 +82,9 @@ await context.CreateTimer(context.CurrentUtcDateTime.AddMinutes(5), true); Orchestrations can pause and wait for external events sent from client code or other orchestrations. This is done by overriding `OnEvent` and completing a `TaskCompletionSource`. See [External Events](../features/external-events.md) for the full pattern. ```csharp -public class ApprovalOrchestration : TaskOrchestration +// The 4-type-parameter base declares ApprovalData as the event payload type, so the +// framework deserializes incoming events and passes OnEvent a typed value. +public class ApprovalOrchestration : TaskOrchestration { TaskCompletionSource approvalHandle; @@ -103,14 +105,14 @@ public class ApprovalOrchestration : TaskOrchestration } // Timeout - escalate or reject - return null; + return default; } - public override void OnEvent(OrchestrationContext context, string name, string input) + public override void OnEvent(OrchestrationContext context, string name, ApprovalData input) { if (name == "ApprovalReceived") { - this.approvalHandle?.SetResult(context.MessageDataConverter.Deserialize(input)); + this.approvalHandle?.TrySetResult(input); } } } @@ -174,7 +176,9 @@ public override async Task RunTask(OrchestrationContext context, int[] in ### Human Interaction ```csharp -public class ApprovalOrchestration : TaskOrchestration +// The 4-type-parameter base declares bool as the event payload type, so the framework +// deserializes incoming events and passes OnEvent a typed value. +public class ApprovalOrchestration : TaskOrchestration { TaskCompletionSource approvalHandle; @@ -206,11 +210,11 @@ public class ApprovalOrchestration : TaskOrchestration(input)); + this.approvalHandle?.TrySetResult(input); } } } diff --git a/docs/features/eternal-orchestrations.md b/docs/features/eternal-orchestrations.md index 990c43202..25379a8ca 100644 --- a/docs/features/eternal-orchestrations.md +++ b/docs/features/eternal-orchestrations.md @@ -162,7 +162,9 @@ public override async Task RunTask( ### Stateful Aggregator ```csharp -public class AggregatorOrchestration : TaskOrchestration +// The 4-type-parameter base declares the event payload type (DataPoint), so the framework +// deserializes incoming events for us and hands OnEvent a typed value. +public class AggregatorOrchestration : TaskOrchestration { // Completed by OnEvent when a "NewData" event arrives TaskCompletionSource dataHandle; @@ -217,11 +219,12 @@ public class AggregatorOrchestration : TaskOrchestration(input)); + this.dataHandle?.TrySetResult(input); } } } @@ -277,7 +280,9 @@ You can use orchestrations to implement FIFO job queues. Use one orchestration i ```csharp // One instance per resource ID applies each update to a primary store then a replica, // strictly in FIFO order. Expected event: "Enqueue" (ResourceUpdate). -public class ResourceUpdateQueueOrchestration : TaskOrchestration +// The 4-type-parameter base declares ResourceUpdate as the event payload type, so the +// framework deserializes incoming events and passes OnEvent a typed value. +public class ResourceUpdateQueueOrchestration : TaskOrchestration { const int MaxUpdatesPerGeneration = 100; // Bound history; ContinueAsNew after this many. @@ -315,11 +320,11 @@ public class ResourceUpdateQueueOrchestration : TaskOrchestration(input)); + this.inbox.Enqueue(input); this.newWork?.TrySetResult(true); } } @@ -368,7 +373,7 @@ public static async Task EnqueueAsync(TaskHubClient client, ResourceUpdate updat ### Using External Events ```csharp -public class WorkerOrchestration : TaskOrchestration +public class WorkerOrchestration : TaskOrchestration { // Completed by OnEvent when a "Stop" event arrives. TaskCompletionSource stopHandle; @@ -400,11 +405,12 @@ public class WorkerOrchestration : TaskOrchestration return null; } - public override void OnEvent(OrchestrationContext context, string name, string input) + // TrySetResult keeps a duplicate "Stop" event from faulting the orchestration. + public override void OnEvent(OrchestrationContext context, string name, bool input) { if (name == "Stop") { - this.stopHandle?.SetResult(true); + this.stopHandle?.TrySetResult(true); } } } diff --git a/docs/features/external-events.md b/docs/features/external-events.md index 7703f4492..c3b3f6f1b 100644 --- a/docs/features/external-events.md +++ b/docs/features/external-events.md @@ -43,17 +43,21 @@ public class SignalOrchestration : TaskOrchestration public override void OnEvent(OrchestrationContext context, string name, string input) { // Complete the pending task when event arrives - this.resumeHandle?.SetResult(input); + this.resumeHandle?.TrySetResult(input); } } ``` ### Typed Event Data -For strongly-typed event data, deserialize in `OnEvent`: +For strongly-typed event data, declare the payload type as the third type parameter +(`TEvent`). The framework deserializes the incoming event and passes a typed value to +`OnEvent` — no manual deserialization required: ```csharp -public class ApprovalOrchestration : TaskOrchestration +// TaskOrchestration: the third type parameter +// declares the external event payload type. +public class ApprovalOrchestration : TaskOrchestration { TaskCompletionSource approvalHandle; @@ -76,12 +80,11 @@ public class ApprovalOrchestration : TaskOrchestration(input); - this.approvalHandle.SetResult(response); + this.approvalHandle?.TrySetResult(input); } } } @@ -130,7 +133,7 @@ public class TimedApprovalOrchestration : TaskOrchestration { if (name == "UserResponse") { - this.eventHandle?.SetResult(input); + this.eventHandle?.TrySetResult(input); } } } @@ -181,7 +184,7 @@ public class MultiEventOrchestration : TaskOrchestration if (this.eventHandle != null && (name == "Approve" || name == "Reject" || name == "Cancel")) { - this.eventHandle.SetResult((name, input)); + this.eventHandle.TrySetResult((name, input)); } } } @@ -292,7 +295,7 @@ public class WebhookController : ControllerBase ### Human Approval Workflow ```csharp -public class ApprovalWorkflow : TaskOrchestration +public class ApprovalWorkflow : TaskOrchestration { TaskCompletionSource approvalHandle; @@ -352,12 +355,11 @@ public class ApprovalWorkflow : TaskOrchestration(input); - this.approvalHandle.SetResult(response); + this.approvalHandle?.TrySetResult(input); } } } @@ -402,7 +404,7 @@ public class MultiStepOrchestration : TaskOrchestration { if (name == this.currentEventName && this.currentEventHandle != null) { - this.currentEventHandle.SetResult(input); + this.currentEventHandle.TrySetResult(input); } } } @@ -476,20 +478,18 @@ public override async Task RunTask(OrchestrationContext context, Request ### 4. Validate Event Data ```csharp -public override void OnEvent(OrchestrationContext context, string name, string input) +public override void OnEvent(OrchestrationContext context, string name, ApprovalResponse input) { - if (name == "Approval" && this.approvalHandle != null) + if (name == "Approval") { - var response = context.MessageDataConverter.Deserialize(input); - // Validate before completing - if (string.IsNullOrEmpty(response.ApprovedBy)) + if (string.IsNullOrEmpty(input.ApprovedBy)) { // Could log warning or ignore invalid event return; } - - this.approvalHandle.SetResult(response); + + this.approvalHandle?.TrySetResult(input); } } ``` diff --git a/docs/features/timers.md b/docs/features/timers.md index ad0190a5a..a47ac7df9 100644 --- a/docs/features/timers.md +++ b/docs/features/timers.md @@ -67,7 +67,9 @@ public override async Task RunTask(OrchestrationContext context, Input i ### Approval with Deadline ```csharp -public class ApprovalOrchestration : TaskOrchestration +// The 4-type-parameter base declares bool as the event payload type, so the framework +// deserializes incoming events and passes OnEvent a typed value. +public class ApprovalOrchestration : TaskOrchestration { // Completed by OnEvent when an "Approved" event arrives TaskCompletionSource approvalHandle; @@ -104,11 +106,11 @@ public class ApprovalOrchestration : TaskOrchestration(input)); + this.approvalHandle?.TrySetResult(input); } } } @@ -273,7 +275,7 @@ await context.CreateTimer(DateTime.UtcNow.AddMinutes(5), true); ```csharp using var cts = new CancellationTokenSource(); var timer = context.CreateTimer(deadline, true, cts.Token); -var work = this.eventHandle.Task; // Completed by OnEvent; see External Events +var work = context.ScheduleTask(typeof(DoWorkActivity), input); var winner = await Task.WhenAny(timer, work); if (winner == work) From a5b0575391c398c2f3bed5cfbcd455a53a0e2091 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Mon, 15 Jun 2026 16:37:27 -0700 Subject: [PATCH 3/5] Fix external event samples: capture early events and propagate typed payload Address round 2 Copilot review feedback on PR #1368. Create the TaskCompletionSource before awaiting any activity so events raised while an activity runs are captured by OnEvent instead of dropped. Propagate the typed event value in the graceful-termination sample, and correct the MaxUpdatesPerGeneration comment to reflect that ContinueAsNew always runs each generation. --- docs/concepts/orchestrations.md | 9 +++++---- docs/features/eternal-orchestrations.md | 4 ++-- docs/features/external-events.md | 11 ++++++++--- docs/features/timers.md | 8 +++++--- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/docs/concepts/orchestrations.md b/docs/concepts/orchestrations.md index 05e1c29be..2531b71e3 100644 --- a/docs/concepts/orchestrations.md +++ b/docs/concepts/orchestrations.md @@ -186,13 +186,14 @@ public class ApprovalOrchestration : TaskOrchestration(typeof(SendApprovalRequestActivity), request); - - // Wait for approval with timeout + // Create the event handle before any awaited work so an "Approved" event that + // arrives while the activity runs is captured instead of dropped. this.approvalHandle = new TaskCompletionSource(); var approvalTask = this.approvalHandle.Task; + // Send notification to approver + await context.ScheduleTask(typeof(SendApprovalRequestActivity), request); + using var cts = new CancellationTokenSource(); var timeoutTask = context.CreateTimer( context.CurrentUtcDateTime.AddDays(7), diff --git a/docs/features/eternal-orchestrations.md b/docs/features/eternal-orchestrations.md index 25379a8ca..bf0c9a4f6 100644 --- a/docs/features/eternal-orchestrations.md +++ b/docs/features/eternal-orchestrations.md @@ -284,7 +284,7 @@ You can use orchestrations to implement FIFO job queues. Use one orchestration i // framework deserializes incoming events and passes OnEvent a typed value. public class ResourceUpdateQueueOrchestration : TaskOrchestration { - const int MaxUpdatesPerGeneration = 100; // Bound history; ContinueAsNew after this many. + const int MaxUpdatesPerGeneration = 100; // Max updates processed per generation // Rebuilt deterministically on replay, since OnEvent replays in original enqueue order. readonly Queue inbox = new Queue(); @@ -410,7 +410,7 @@ public class WorkerOrchestration : TaskOrchestration(); + // Send approval request await context.ScheduleTask(typeof(SendApprovalEmailActivity), request); // Wait for approval response - this.approvalHandle = new TaskCompletionSource(); var response = await this.approvalHandle.Task; this.approvalHandle = null; @@ -303,6 +306,10 @@ public class ApprovalWorkflow : TaskOrchestration(); + // Step 1: Send approval request email await context.ScheduleTask(typeof(SendApprovalEmailActivity), new EmailData { @@ -312,8 +319,6 @@ public class ApprovalWorkflow : TaskOrchestration(); - using var cts = new CancellationTokenSource(); var approvalTask = this.approvalHandle.Task; var timeoutTask = context.CreateTimer( diff --git a/docs/features/timers.md b/docs/features/timers.md index a47ac7df9..220cff510 100644 --- a/docs/features/timers.md +++ b/docs/features/timers.md @@ -78,12 +78,14 @@ public class ApprovalOrchestration : TaskOrchestration(typeof(SendApprovalEmail), request); - + // Create the event handle before any awaited work so an "Approved" event that + // arrives while the activity runs is captured instead of dropped. this.approvalHandle = new TaskCompletionSource(); var approvalTask = this.approvalHandle.Task; + // Send approval request + await context.ScheduleTask(typeof(SendApprovalEmail), request); + using var cts = new CancellationTokenSource(); // Wait for approval event or 7-day timeout From fe2a00ef7ee073457ef5415abe75986715afd292 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Tue, 23 Jun 2026 20:53:21 -0700 Subject: [PATCH 4/5] Additional doc fixes --- docs/features/eternal-orchestrations.md | 10 +- samples/DurableTask.Samples/README.md | 158 ++++++++---------------- 2 files changed, 58 insertions(+), 110 deletions(-) diff --git a/docs/features/eternal-orchestrations.md b/docs/features/eternal-orchestrations.md index bf0c9a4f6..d3f2e5371 100644 --- a/docs/features/eternal-orchestrations.md +++ b/docs/features/eternal-orchestrations.md @@ -342,6 +342,13 @@ Producers enqueue by sending an `Enqueue` event to the per-resource instance. Th ```csharp static string InstanceIdFor(string resourceId) => $"resource-update-queue:{resourceId}"; +static readonly OrchestrationStatus[] ActiveQueueStatuses = +{ + OrchestrationStatus.Pending, + OrchestrationStatus.Running, + OrchestrationStatus.ContinuedAsNew, +}; + public static async Task EnqueueAsync(TaskHubClient client, ResourceUpdate update) { try @@ -351,12 +358,13 @@ public static async Task EnqueueAsync(TaskHubClient client, ResourceUpdate updat typeof(ResourceUpdateQueueOrchestration), InstanceIdFor(update.ResourceId), new QueueState(), + dedupeStatuses: ActiveQueueStatuses, eventName: "Enqueue", eventData: update); } catch (OrchestrationAlreadyExistsException) { - // Queue already running for this resource ID; just append the update. + // Queue already active for this resource ID; just append the update. await client.RaiseEventAsync( new OrchestrationInstance { InstanceId = InstanceIdFor(update.ResourceId) }, "Enqueue", diff --git a/samples/DurableTask.Samples/README.md b/samples/DurableTask.Samples/README.md index 9da5433ce..f9d13b808 100644 --- a/samples/DurableTask.Samples/README.md +++ b/samples/DurableTask.Samples/README.md @@ -42,134 +42,65 @@ The worker automatically starts and waits for the orchestration to complete. ## Available Samples +Each sample's full source is linked below. For deeper explanations of the patterns +they demonstrate, see the [DTFx documentation](../../docs/README.md). + ### Greetings -A simple "Hello World" orchestration that calls greeting activities. - -```csharp -public class GreetingsOrchestration : TaskOrchestration -{ - public override async Task RunTask(OrchestrationContext context, string input) - { - string greeting = await context.ScheduleTask(typeof(GetUserTask)); - string result = await context.ScheduleTask(typeof(SendGreetingTask), greeting); - return result; - } -} -``` +Two activities run in sequence: `GetUserTask` produces a name, then `SendGreetingTask` +greets it. A minimal introduction to orchestrations and activities. -**Run:** `DurableTask.Samples.exe -s Greetings` +- Source: [Greetings/GreetingsOrchestration.cs](Greetings/GreetingsOrchestration.cs) +- Docs: [Orchestrations](../../docs/concepts/orchestrations.md), [Activities](../../docs/concepts/activities.md) +- **Run:** `DurableTask.Samples.exe -s Greetings` ### Greetings2 -Demonstrates parameterized orchestrations with a configurable number of greetings. +A variation that races the `GetUserTask` activity against a durable timer using +`Task.WhenAny`, so the orchestration proceeds when the user responds or the timeout +elapses, whichever comes first. The parameter is the timeout in seconds. -**Run:** `DurableTask.Samples.exe -s Greetings2 -p 5` +- Source: [Greetings2/GreetingsOrchestration.cs](Greetings2/GreetingsOrchestration.cs) +- Docs: [Timers](../../docs/features/timers.md) +- **Run:** `DurableTask.Samples.exe -s Greetings2 -p 5` ### Cron -An eternal orchestration that runs on a schedule using `CreateTimer` and `ContinueAsNew`. - -```csharp -public class CronOrchestration : TaskOrchestration -{ - public override async Task RunTask(OrchestrationContext context, string schedule) - { - // Execute the scheduled task - await context.ScheduleTask(typeof(CronTask)); - - // Wait until next scheduled time - DateTime nextRun = CalculateNextRun(context.CurrentUtcDateTime, schedule); - await context.CreateTimer(nextRun, true); - - // Continue as new instance - context.ContinueAsNew(schedule); - return "Completed cycle"; - } -} -``` +Schedules `CronTask` to run repeatedly on a [cron](https://en.wikipedia.org/wiki/Cron) +expression (via NCrontab), using `CreateTimer` to wait between runs. The parameter is the +cron schedule and is optional (it falls back to a fixed interval when omitted). -**Run:** `DurableTask.Samples.exe -s Cron -p "0 12 * * *"` - -### AverageCalculator - -Fan-out/fan-in pattern that distributes computation across multiple activities. - -```csharp -public class AverageCalculatorOrchestration : TaskOrchestration -{ - public override async Task RunTask(OrchestrationContext context, int[] numbers) - { - // Fan-out: process chunks in parallel - var tasks = new List>(); - foreach (var chunk in numbers.Chunk(10)) - { - tasks.Add(context.ScheduleTask(typeof(ComputeSumTask), chunk)); - } - - // Fan-in: aggregate results - int[] sums = await Task.WhenAll(tasks); - return sums.Sum() / (double)numbers.Length; - } -} -``` +- Source: [Cron/CronOrchestration.cs](Cron/CronOrchestration.cs) +- Docs: [Timers](../../docs/features/timers.md), [Eternal Orchestrations](../../docs/features/eternal-orchestrations.md) +- **Run:** `DurableTask.Samples.exe -s Cron -p "0 12 * */2 Mon"` + +### Average -**Run:** `DurableTask.Samples.exe -s Average -p "1 50 10"` +A fan-out/fan-in pattern that splits a numeric range into chunks, computes a partial sum +for each chunk in parallel via `ComputeSumTask`, then aggregates the results into an +average. Parameters are ` `. -Parameters: ` ` +- Source: [AverageCalculator/AverageCalculatorOrchestration.cs](AverageCalculator/AverageCalculatorOrchestration.cs) +- Docs: [Orchestrations](../../docs/concepts/orchestrations.md) +- **Run:** `DurableTask.Samples.exe -s Average -p "1 50 10"` ### ErrorHandling -Demonstrates retry policies and exception handling patterns. - -```csharp -public override async Task RunTask(OrchestrationContext context, string input) -{ - var retryOptions = new RetryOptions( - firstRetryInterval: TimeSpan.FromSeconds(5), - maxNumberOfAttempts: 3); - - try - { - return await context.ScheduleWithRetry( - typeof(UnreliableActivity), - retryOptions, - input); - } - catch (TaskFailedException ex) - { - // Handle permanent failure - return $"Failed after retries: {ex.Message}"; - } -} -``` +Demonstrates exception handling with `try`/`catch` around activities and a compensating +`CleanupTask` when an activity fails. -**Run:** `DurableTask.Samples.exe -s ErrorHandling` +- Source: [ErrorHandling/ErrorHandlingOrchestration.cs](ErrorHandling/ErrorHandlingOrchestration.cs) +- Docs: [Error Handling](../../docs/features/error-handling.md), [Retries](../../docs/features/retries.md) +- **Run:** `DurableTask.Samples.exe -s ErrorHandling` ### Signal -Demonstrates external events and human interaction patterns. - -```csharp -public override async Task RunTask(OrchestrationContext context, ApprovalRequest input) -{ - // Send notification - await context.ScheduleTask(typeof(SendApprovalRequest), input); - - // Wait for external event - var approval = await context.WaitForExternalEvent("ApprovalResult"); - - if (approval.IsApproved) - { - await context.ScheduleTask(typeof(ProcessApproval), input); - return "Approved and processed"; - } - - return "Rejected"; -} -``` +Demonstrates external events using the `OnEvent` + `TaskCompletionSource` pattern: the +orchestration waits for an external signal and then sends a greeting with the signaled value. -**Run:** `DurableTask.Samples.exe -s Signal` +- Source: [Signal/SignalOrchestration.cs](Signal/SignalOrchestration.cs) +- Docs: [External Events](../../docs/features/external-events.md) +- **Run:** `DurableTask.Samples.exe -s Signal` To raise an event to a running instance: @@ -177,11 +108,20 @@ To raise an event to a running instance: DurableTask.Samples.exe -n -i -p ``` +You can also start an instance and raise its first event in one step with the +`SignalAndRaise` sample: + +- **Run:** `DurableTask.Samples.exe -s SignalAndRaise -n -p ` + ### SumOfSquares -Another fan-out/fan-in example computing sum of squares from a JSON input file. +A recursive fan-out/fan-in example that walks a nested JSON array +([BagofNumbers.json](SumOfSquares/BagofNumbers.json)), squaring integers via +`SumOfSquaresTask` and recursing into nested arrays as sub-orchestrations. -**Run:** `DurableTask.Samples.exe -s SumOfSquares` +- Source: [SumOfSquares/SumOfSquaresOrchestration.cs](SumOfSquares/SumOfSquaresOrchestration.cs) +- Docs: [Sub-Orchestrations](../../docs/features/sub-orchestrations.md) +- **Run:** `DurableTask.Samples.exe -s SumOfSquares` ## Command Line Options From e6b0f0208b0f02e8489f96383c13104ea8c05414 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Wed, 24 Jun 2026 10:41:38 -0700 Subject: [PATCH 5/5] PR feedback --- docs/concepts/orchestrations.md | 2 +- docs/features/eternal-orchestrations.md | 7 ++++++- samples/DurableTask.Samples/README.md | 9 +++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/concepts/orchestrations.md b/docs/concepts/orchestrations.md index 2531b71e3..bdaeab6e9 100644 --- a/docs/concepts/orchestrations.md +++ b/docs/concepts/orchestrations.md @@ -178,7 +178,7 @@ public override async Task RunTask(OrchestrationContext context, int[] in ```csharp // The 4-type-parameter base declares bool as the event payload type, so the framework // deserializes incoming events and passes OnEvent a typed value. -public class ApprovalOrchestration : TaskOrchestration +public class HumanApprovalOrchestration : TaskOrchestration { TaskCompletionSource approvalHandle; diff --git a/docs/features/eternal-orchestrations.md b/docs/features/eternal-orchestrations.md index d3f2e5371..943f37369 100644 --- a/docs/features/eternal-orchestrations.md +++ b/docs/features/eternal-orchestrations.md @@ -354,6 +354,9 @@ public static async Task EnqueueAsync(TaskHubClient client, ResourceUpdate updat try { // First update: atomically create the queue and deliver the update. + // Note: the dedupeStatuses overload is supported by Azure Storage, Service Bus, and others. + // The Service Fabric provider throws NotSupportedException for dedupeStatuses—omit it + // there and rely on the OrchestrationAlreadyExistsException from a plain create. await client.CreateOrchestrationInstanceWithRaisedEventAsync( typeof(ResourceUpdateQueueOrchestration), InstanceIdFor(update.ResourceId), @@ -374,7 +377,9 @@ public static async Task EnqueueAsync(TaskHubClient client, ResourceUpdate updat ``` > [!NOTE] -> On Azure Storage (the default `Carryover` behavior), events that arrive during the brief `ContinueAsNew` transition are automatically re-delivered to the next generation, so nothing is lost. The Service Fabric provider instead uses `Ignore` and drops them — there, treat the producer as the source of truth and re-drive any unacknowledged update (using `UpdateId` for idempotency). +> Provider differences apply to this sample. The `dedupeStatuses` overload used above is supported by the Azure Storage, Service Bus, and some other providers; the Service Fabric provider does not support it (it throws `NotSupportedException`), so omit that argument and rely on the `OrchestrationAlreadyExistsException` from a plain create instead. +> +> Behavior during the brief `ContinueAsNew` transition also differs: on Azure Storage (the default `Carryover` behavior), events that arrive mid-transition are automatically re-delivered to the next generation, so nothing is lost. The Service Fabric provider instead uses `Ignore` and drops them — there, treat the producer as the source of truth and re-drive any unacknowledged update (using `UpdateId` for idempotency). ## Graceful Termination diff --git a/samples/DurableTask.Samples/README.md b/samples/DurableTask.Samples/README.md index f9d13b808..75836c44d 100644 --- a/samples/DurableTask.Samples/README.md +++ b/samples/DurableTask.Samples/README.md @@ -66,12 +66,13 @@ elapses, whichever comes first. The parameter is the timeout in seconds. ### Cron -Schedules `CronTask` to run repeatedly on a [cron](https://en.wikipedia.org/wiki/Cron) -expression (via NCrontab), using `CreateTimer` to wait between runs. The parameter is the -cron schedule and is optional (it falls back to a fixed interval when omitted). +Runs `CronTask` on a schedule, using `CreateTimer` to wait between runs. The sample loops a +fixed number of times (currently 4) and then returns — it is not an eternal orchestration and +does not use `ContinueAsNew`. The parameter is a [cron](https://en.wikipedia.org/wiki/Cron) +expression (via NCrontab) and is optional (it falls back to a fixed interval when omitted). - Source: [Cron/CronOrchestration.cs](Cron/CronOrchestration.cs) -- Docs: [Timers](../../docs/features/timers.md), [Eternal Orchestrations](../../docs/features/eternal-orchestrations.md) +- Docs: [Timers](../../docs/features/timers.md) - **Run:** `DurableTask.Samples.exe -s Cron -p "0 12 * */2 Mon"` ### Average