Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions temporalio/contrib/langgraph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,47 @@ await g.ainvoke({...}, context=Context(user_id="alice"))

Your `context` object must be serializable by the configured Temporal payload converter, since it crosses the Activity boundary.

## Summaries

Summaries are short, human-readable labels that show up in the Temporal UI and CLI, making it easier to see what each step of a run is doing.

### Static summary

`summary` is an ordinary Activity option, so a fixed per-node label works today — pass it like any other option:

```python
g.add_node("plan", plan, metadata={"execute_in": "activity", "summary": "Planning step"})
```

It is attached to the node's scheduled-activity event (`execute_in="activity"` only).

### Dynamic summary (`summary_fn`)

To derive the label from the node's input at runtime, supply a `summary_fn`. It receives the node's `(args, kwargs)` and returns a summary string, or `None`/`""` for no summary. For a `StateGraph` node `args[0]` is the state; for a Functional `@task` it is the task's arguments.

```python
def summarize(args, kwargs) -> str | None:
state = args[0]
return f"stage={state['stage']} doc={state['doc_id']}"

# Graph API: per-node
g.add_node("plan", plan, metadata={"execute_in": "activity", "summary_fn": summarize})

# Functional API: per-task
plugin = LangGraphPlugin(
tasks=[plan],
activity_options={"plan": {"execute_in": "activity", "summary_fn": summarize}},
)

# Plugin-wide default, overridable per-node/per-task
plugin = LangGraphPlugin(graphs={"g": g}, default_summary_fn=summarize)
```

- For `execute_in="activity"` nodes the result sets the activity `summary` (one per scheduled-activity event, visible in history).
- For `execute_in="workflow"` nodes there is no activity, so the result updates the workflow's current details via [`workflow.set_current_details()`](https://python.temporal.io/temporalio.workflow.html#set_current_details). This is a single workflow-level slot (last-writer-wins): it reflects the most recent workflow-bound node and is queryable via `__temporal_workflow_metadata`.

`summary_fn` runs in workflow context on every replay, so it **must be deterministic and must not raise** (an exception fails the workflow task). Setting both a static `summary` and a `summary_fn` on the same node raises `ValueError`; a static `summary` on a node takes precedence over `default_summary_fn`.

## Streaming

When `streaming_topic` is set on `LangGraphPlugin`, calls to `langgraph.config.get_stream_writer()` inside a node publish to the named topic on the workflow's [`WorkflowStream`](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams). Activity-side nodes publish via `WorkflowStreamClient` (a signal carrying batched items, controlled by `streaming_batch_interval`); workflow-side nodes publish synchronously to the in-workflow stream (no signal). External subscribers consume the stream with `WorkflowStreamClient.create(...).topic(...).subscribe(...)`.
Expand Down
13 changes: 10 additions & 3 deletions temporalio/contrib/langgraph/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def thread_safe_writer(value: Any) -> None:
def wrap_execute_activity(
afunc: Callable[[ActivityInput], Awaitable[ActivityOutput]],
task_id: str = "",
summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None] | None = None,
**execute_activity_kwargs: Any,
) -> Callable[..., Any]:
"""Wrap an activity function to be called via workflow.execute_activity with caching."""
Expand Down Expand Up @@ -156,9 +157,15 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
input = ActivityInput(
args=args, kwargs=kwargs, langgraph_config=langgraph_config
)
output = await workflow.execute_activity(
afunc, input, **execute_activity_kwargs
)
# Compute a dynamic activity summary (if configured) on the schedule
# path only; a cache hit above returns before reaching here, so no
# activity is scheduled and no summary is needed.
call_kwargs = dict(execute_activity_kwargs)
if summary_fn is not None:
summary = summary_fn(args, kwargs)
if summary:
call_kwargs["summary"] = summary
output = await workflow.execute_activity(afunc, input, **call_kwargs)
if output.langgraph_interrupts is not None:
raise GraphInterrupt(output.langgraph_interrupts)

Expand Down
47 changes: 43 additions & 4 deletions temporalio/contrib/langgraph/_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
_ACTIVITY_OPTION_KEYS: frozenset[str] = frozenset(
{"execute_in", *inspect.signature(workflow.execute_activity).parameters}
)
# Node/task option keys beyond the raw execute_activity parameters:
# 'summary_fn' is a callable consumed in the workflow (not a Temporal
# option), so it must be split out of Graph API metadata too.
_LANGGRAPH_OPTION_KEYS: frozenset[str] = _ACTIVITY_OPTION_KEYS | frozenset(
{"summary_fn"}
)


class LangGraphPlugin(SimplePlugin):
Expand Down Expand Up @@ -70,6 +76,18 @@ class LangGraphPlugin(SimplePlugin):
default_activity_options: Activity options applied to every
activity-bound node and task, overridable per-node (Graph API
``metadata``) or per-task (``activity_options[name]``).
default_summary_fn: Callable applied to every node and task to
compute a summary, overridable per-node (Graph API
``metadata['summary_fn']``) or per-task
(``activity_options[name]['summary_fn']``). It receives the
node's ``(args, kwargs)`` and returns a summary string (or
``None`` for no summary). For ``execute_in='activity'`` nodes
the result sets the activity ``summary`` (shown on each
scheduled-activity event); for ``execute_in='workflow'`` nodes
it updates the workflow's current details (last-writer-wins).
Must be deterministic and must not raise, as it runs in
workflow context on every replay. Cannot be combined with a
static ``summary`` on the same node.
streaming_topic: When set, ``langgraph.config.get_stream_writer()``
inside a node publishes to this topic on the workflow's
:class:`WorkflowStream`. The workflow must construct
Expand Down Expand Up @@ -103,6 +121,8 @@ def __init__(
# TODO: Remove activity_options when we have support for @task(metadata=...)
activity_options: dict[str, dict[str, Any]] | None = None,
default_activity_options: dict[str, Any] | None = None,
default_summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None]
| None = None,
streaming_topic: str | None = None,
streaming_batch_interval: timedelta = timedelta(milliseconds=100),
):
Expand Down Expand Up @@ -133,6 +153,7 @@ def __init__(
self.activities: list = []
self._streaming_topic = streaming_topic
self._streaming_batch_interval = streaming_batch_interval
self._default_summary_fn = default_summary_fn

# Graph API: Wrap graph nodes as Temporal Activities.
if graphs:
Expand Down Expand Up @@ -168,12 +189,14 @@ def __init__(
# the node function via config["metadata"].
node_meta = node.metadata or {}
node_opts = {
k: v for k, v in node_meta.items() if k in _ACTIVITY_OPTION_KEYS
k: v
for k, v in node_meta.items()
if k in _LANGGRAPH_OPTION_KEYS
}
node.metadata = {
k: v
for k, v in node_meta.items()
if k not in _ACTIVITY_OPTION_KEYS
if k not in _LANGGRAPH_OPTION_KEYS
}
if "execute_in" not in node_opts:
raise ValueError(
Expand Down Expand Up @@ -253,6 +276,18 @@ def execute(
"""Prepare a node or task to execute as an activity or inline in the workflow."""
opts = kwargs or {}
execute_in = opts.pop("execute_in")
# Remove control keys before opts is splatted into execute_activity
# below; summary_fn is consumed here, not a Temporal activity option.
node_summary_fn = opts.pop("summary_fn", None)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pop from the opts if we're already passing the opts? Should this be a get()?

@DABH DABH Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I believe it has to be pop, not get: a few lines down the leftover opts is splatted into wrap_execute_activity(..., **opts), which forwards them to workflow.execute_activity(...). summary_fn isn't a valid execute_activity kwarg, and we also pass it explicitly as summary_fn=summary_fn, so leaving it in opts would give some error about an unexpected/duplicate keyword. It's the same reason execute_in is popped on the line right above.

Pushed a comment to make that explicit in case that's helpful!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is summary a valid kwarg? I wonder if we could normalize summary and summary_fn into the relevant kwarg

if node_summary_fn is not None and opts.get("summary") is not None:
raise ValueError(
f"{activity_name}: set either 'summary' or 'summary_fn', not both."
)
# Per-node summary_fn wins; a static summary suppresses the plugin
# default; otherwise fall back to the plugin-wide default_summary_fn.
summary_fn = node_summary_fn or (
None if opts.get("summary") is not None else self._default_summary_fn
)

if execute_in == "activity":
wrapped = wrap_activity(
Expand All @@ -262,9 +297,13 @@ def execute(
)
a = activity.defn(name=activity_name)(wrapped)
self.activities.append(a)
return wrap_execute_activity(a, task_id=task_id(func), **opts)
return wrap_execute_activity(
a, task_id=task_id(func), summary_fn=summary_fn, **opts
)
elif execute_in == "workflow":
return wrap_workflow(func, streaming_topic=self._streaming_topic)
return wrap_workflow(
func, streaming_topic=self._streaming_topic, summary_fn=summary_fn
)
else:
raise ValueError(f"Invalid execute_in value: {execute_in}")

Expand Down
10 changes: 10 additions & 0 deletions temporalio/contrib/langgraph/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def wrap_workflow(
func: Callable[..., Any],
*,
streaming_topic: str | None = None,
summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None] | None = None,
) -> Callable[..., Awaitable[Any]]:
"""Wrap a function as a workflow-side LangGraph node.

Expand All @@ -28,9 +29,18 @@ def wrap_workflow(
function with the writer installed. Workflow-side nodes publish
synchronously to the in-workflow ``WorkflowStream`` (no signal
round-trip); activity-side nodes go through ``WorkflowStreamClient``.

Workflow-side nodes have no activity to carry a summary, so a
truthy ``summary_fn`` result updates the workflow's current details
via :func:`temporalio.workflow.set_current_details` (last-writer-wins).
"""

async def wrapper(*args: Any, **kwargs: Any) -> Any:
if summary_fn is not None:
summary = summary_fn(args, kwargs)
if summary:
workflow.set_current_details(summary)
Comment on lines +33 to +42

async def run(stream_writer: Callable[[Any], None] | None) -> Any:
token = None
if stream_writer is not None:
Expand Down
Loading
Loading