Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
# other than the SDK team. For each one, we add the owning team,
# as well as @temporalio/sdk, so the SDK team can continue to
# manage repo-wide concerns.
/temporalio/contrib/common/ @temporalio/ai-sdk @temporalio/sdk

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, forgot to remove this in a previous PR. This doesn't exist.

/temporalio/contrib/google_adk_agents/ @temporalio/ai-sdk @temporalio/sdk
/temporalio/contrib/langgraph/ @temporalio/ai-sdk @temporalio/sdk
/temporalio/contrib/langsmith/ @temporalio/ai-sdk @temporalio/sdk
/temporalio/contrib/openai_agents/ @temporalio/ai-sdk @temporalio/sdk
/temporalio/contrib/strands/ @temporalio/ai-sdk @temporalio/sdk
/temporalio/contrib/workflow_streams/ @temporalio/ai-sdk @temporalio/sdk
/tests/contrib/google_adk_agents/ @temporalio/ai-sdk @temporalio/sdk
/tests/contrib/langgraph/ @temporalio/ai-sdk @temporalio/sdk
/tests/contrib/langsmith/ @temporalio/ai-sdk @temporalio/sdk
/tests/contrib/openai_agents/ @temporalio/ai-sdk @temporalio/sdk
/tests/contrib/strands/ @temporalio/ai-sdk @temporalio/sdk
/tests/contrib/workflow_streams/ @temporalio/ai-sdk @temporalio/sdk
62 changes: 52 additions & 10 deletions temporalio/contrib/workflow_streams/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
from temporalio.api.common.v1 import Payload
from temporalio.client import (
Client,
WorkflowExecutionDescription,
WorkflowExecutionStatus,
WorkflowHandle,
WorkflowUpdateFailedError,
WorkflowUpdateRPCTimeoutOrCancelledError,
WorkflowUpdateStage,
)
from temporalio.converter import DataConverter, PayloadConverter
from temporalio.service import RPCError, RPCStatusCode

from ._topic_handle import TopicHandle
from ._types import (
STREAM_DRAINING_ERROR_TYPE,
TRUNCATED_OFFSET_ERROR_TYPE,
PollInput,
PollResult,
PublishEntry,
Expand Down Expand Up @@ -127,6 +131,10 @@ def __init__(
self._pending_seq: int = 0
self._pending_since: float | None = None
self._topic_types: dict[str, type[Any]] = {}
# Run id the most recent poll's update was admitted to. Captured before
# waiting for the outcome so a mid-poll continue-as-new can be detected by
# describing that specific run. None until the first poll is admitted.
self._polled_run_id: str | None = None

@classmethod
def create(
Expand Down Expand Up @@ -504,9 +512,11 @@ async def subscribe(
``Payload`` — useful for heterogeneous topics where
the caller dispatches on ``Payload.metadata`` or wants
to forward the bytes without decoding.
poll_cooldown: Minimum interval between polls to avoid
overwhelming the workflow when items arrive faster
than the poll round-trip. Defaults to 100ms.
poll_cooldown: Minimum interval between polls when caught
up (backlogs always drain at full speed). Defaults to
100ms. Avoid ``timedelta(0)``: an idle subscriber
busy-loops, and each poll grows workflow history toward
its limit. Use 0 only in tests.

Yields:
:class:`WorkflowStreamItem` for each matching item.
Expand All @@ -528,22 +538,37 @@ async def subscribe(
offset = from_offset
while True:
try:
result: PollResult = await self._handle.execute_update(
# Wait only for ACCEPTED so the handle (and the run id it was
# admitted to) is available before we block on the outcome; if
# the run continues-as-new mid-poll, result() fails but we still
# know which run to inspect.
handle = await self._handle.start_update(
"__temporal_workflow_stream_poll",
PollInput(topics=topic_filter, from_offset=offset),
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
result_type=PollResult,
)
self._polled_run_id = handle.workflow_run_id
result: PollResult = await handle.result()
except asyncio.CancelledError:
return
except WorkflowUpdateFailedError as e:
cause_type = getattr(e.cause, "type", None)
if cause_type == "TruncatedOffset":
if cause_type == TRUNCATED_OFFSET_ERROR_TYPE:
# Subscriber fell behind truncation. Retry from
# offset 0 which the stream treats as "from the
# beginning of whatever exists" (i.e., from
# base_offset).
offset = 0
continue
if cause_type == STREAM_DRAINING_ERROR_TYPE:
# Workflow is detaching for continue-as-new. Back off and
# retry; the poll lands on the successor run once the
# rollover completes.
cooldown_secs = poll_cooldown.total_seconds()
if cooldown_secs > 0:
await asyncio.sleep(cooldown_secs)
continue
if cause_type == "AcceptedUpdateCompletedWorkflow":
# Workflow returned (or continued-as-new) before
# this poll's update completed. Either follow the
Expand Down Expand Up @@ -586,15 +611,32 @@ async def subscribe(
if not result.more_ready and cooldown_secs > 0:
await asyncio.sleep(cooldown_secs)

async def _describe_polled_run(self) -> WorkflowExecutionDescription:
"""Describe the specific run the most recent poll was admitted to.

Describing that run (rather than the latest) is what lets a
continue-as-new be detected: a rolled-over run is closed with status
CONTINUED_AS_NEW, whereas the latest run would report RUNNING. Falls
back to the latest run when no run id has been captured yet, or when no
client is available to target a specific run.
"""
if self._client is not None:
return await self._client.get_workflow_handle(
self._workflow_id, run_id=self._polled_run_id
).describe()
return await self._handle.describe()

async def _follow_continue_as_new(self) -> bool:
"""Check if the workflow continued-as-new and re-target the handle.
"""Check if the polled run continued-as-new and re-target the handle.

Returns True if the handle was updated (caller should retry).
Returns True if the handle was updated (caller should retry). The
successor run id is not needed — re-targeting to an unpinned handle
makes the next poll address the latest (successor) run.
"""
if self._client is None:
return False
try:
desc = await self._handle.describe()
desc = await self._describe_polled_run()
except Exception:
return False
if desc.status == WorkflowExecutionStatus.CONTINUED_AS_NEW:
Expand All @@ -603,14 +645,14 @@ async def _follow_continue_as_new(self) -> bool:
return False

async def _workflow_in_terminal_state(self) -> bool:
"""Return True if the workflow has reached a terminal state.
"""Return True if the polled run has reached a terminal state.

Used by ``subscribe()`` to distinguish "workflow finished —
stream is done" from "wrong workflow id" when a poll RPC
returns NOT_FOUND.
"""
try:
desc = await self._handle.describe()
desc = await self._describe_polled_run()
except Exception:
return False
return desc.status in (
Expand Down
18 changes: 13 additions & 5 deletions temporalio/contrib/workflow_streams/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

from ._topic_handle import WorkflowTopicHandle
from ._types import (
STREAM_DRAINING_ERROR_TYPE,
TRUNCATED_OFFSET_ERROR_TYPE,
PollInput,
PollResult,
PublisherState,
Expand Down Expand Up @@ -357,7 +359,6 @@ def truncate(self, up_to_offset: int) -> None:
f"Cannot truncate to offset {up_to_offset}: "
f"valid range is [{self._base_offset}, {self._base_offset + len(self._log)})",
type="TruncateOutOfRange",
non_retryable=True,
)
self._log = self._log[log_index:]
self._base_offset = up_to_offset
Expand Down Expand Up @@ -419,8 +420,7 @@ async def _on_poll(self, payload: PollInput) -> PollResult:
raise ApplicationError(
f"Requested offset {payload.from_offset} has been truncated. "
f"Current base offset is {self._base_offset}.",
type="TruncatedOffset",
non_retryable=True,
type=TRUNCATED_OFFSET_ERROR_TYPE,
)
all_new = self._log[log_offset:]
if payload.topics:
Expand Down Expand Up @@ -460,9 +460,17 @@ async def _on_poll(self, payload: PollInput) -> PollResult:
)

def _validate_poll(self, _payload: PollInput) -> None:
"""Reject new polls when pollers are detached for continue-as-new."""
"""Reject new polls when pollers are detached for continue-as-new.

Uses the well-known ``StreamDraining`` type so a subscriber recognizes
the rollover-in-progress and retries until its poll lands on the
successor run, rather than surfacing the rejection as an error.
"""
if self._detaching:
raise RuntimeError("Workflow pollers are detached for continue-as-new")
raise ApplicationError(
"Workflow pollers are detached for continue-as-new",
type=STREAM_DRAINING_ERROR_TYPE,
)
Comment thread
brianstrauch marked this conversation as resolved.

def _on_offset(self) -> int:
"""Return the current global offset (base_offset + log length)."""
Expand Down
7 changes: 7 additions & 0 deletions temporalio/contrib/workflow_streams/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@

T = TypeVar("T")

# Well-known ``ApplicationError.type`` values the stream workflow uses to reject
# polls, and which ``WorkflowStreamClient.subscribe`` recognizes to drive retry
# behavior. Defined here so the raise sites (``_stream.py``) and the handling
# sites (``_client.py``) cannot diverge.
STREAM_DRAINING_ERROR_TYPE = "StreamDraining"
TRUNCATED_OFFSET_ERROR_TYPE = "TruncatedOffset"


# basedpyright flags _-prefixed module-level functions as unused even when
# sibling modules import them (_stream.py, _client.py). Vanilla pyright does
Expand Down
Loading
Loading