From 9dc8f74443e885b134e04a6a2414baca43f5fd98 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:37:08 -0700 Subject: [PATCH 1/4] :boom: enable lambda worker async configure --- .../contrib/aws/lambda_worker/README.md | 18 + .../contrib/aws/lambda_worker/_run_worker.py | 349 ++++++++++++------ .../aws/lambda_worker/test_lambda_worker.py | 156 +++++++- 3 files changed, 395 insertions(+), 128 deletions(-) diff --git a/temporalio/contrib/aws/lambda_worker/README.md b/temporalio/contrib/aws/lambda_worker/README.md index f9166b13d..c12e7037d 100644 --- a/temporalio/contrib/aws/lambda_worker/README.md +++ b/temporalio/contrib/aws/lambda_worker/README.md @@ -48,6 +48,24 @@ pre-populated with Lambda-appropriate defaults. Override any field directly in the callback. The `task_queue` key in `worker_config` is pre-populated from the `TEMPORAL_TASK_QUEUE` environment variable if set. +### Sync, async, and async-generator configure + +The configure callback runs **once per invocation, inside that invocation's +event loop**, before the client connects. It may be: + +- a plain function `def configure(config) -> None`; +- an `async def configure(config) -> None` coroutine — awaited for setup (pair + with `shutdown_hooks` for teardown); +- an `async def configure(config): ...; yield; ...` async generator — statements + before the single `yield` run before the client connects, the worker runs + while the generator is suspended at the `yield`, and statements after the + `yield` run as teardown once the worker has stopped. + +The callback runs per invocation (rather than once at process start) because +event-loop-bound resources cannot be created before an event loop exists and +cannot be shared across invocations — each invocation runs under a fresh +`asyncio.run` loop. + ## Lambda-tuned worker defaults The package applies conservative concurrency limits suited to Lambda's resource diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py index 6a2cc75a3..52775cda5 100644 --- a/temporalio/contrib/aws/lambda_worker/_run_worker.py +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -1,13 +1,15 @@ from __future__ import annotations import asyncio +import inspect import logging import os import sys -from collections.abc import Awaitable, Callable +from collections.abc import AsyncGenerator, Awaitable, Callable +from contextlib import AbstractAsyncContextManager, asynccontextmanager from dataclasses import dataclass, field from datetime import timedelta -from typing import Any +from typing import Any, TypeAlias import temporalio.client import temporalio.worker @@ -28,6 +30,16 @@ logger = logging.getLogger(__name__) +# A plain, ``async def`` coroutine, ``async def`` generator, or async-context-manager +# callback. See run_worker. +ConfigureCallback: TypeAlias = Callable[ + [LambdaWorkerConfig], + None + | Awaitable[None] + | AsyncGenerator[None, None] + | AbstractAsyncContextManager[None], +] + @dataclass class _WorkerDeps: @@ -62,54 +74,118 @@ def _default_extract_lambda_ctx( return None +def _validate_task_queue(config: LambdaWorkerConfig) -> None: + """Raise if no task queue has been configured.""" + if not config.worker_config.get("task_queue"): + raise ValueError( + "task queue not configured: set " + 'worker_config["task_queue"] or the ' + "TEMPORAL_TASK_QUEUE environment variable" + ) + + def run_worker( version: WorkerDeploymentVersion, - configure: Callable[[LambdaWorkerConfig], None], + configure: ConfigureCallback, ) -> Callable[[Any, Any], None]: """Create a Temporal worker Lambda handler. - Calls the *configure* callback to collect workflow/activity registrations and option overrides, - then returns a Lambda handler function. On each invocation the handler connects to the Temporal - server, starts a worker with Lambda-tuned defaults, polls for tasks until the invocation - deadline approaches, and then gracefully shuts down. - - The *version* parameter identifies this worker's deployment version. ``run_worker`` always - enables Worker Deployment Versioning (``use_worker_versioning=True``). To provide a default - versioning behavior for workflows that do not specify one at registration time, set - ``deployment_config`` in ``worker_config`` in the configure callback. - - The returned handler has the signature ``handler(event, context)`` and should be set as your - Lambda function's handler entry point. + Calls the *configure* callback to collect workflow/activity registrations and option + overrides, then returns a Lambda handler function. On each invocation the handler + connects to the Temporal server, starts a worker with Lambda-tuned defaults, polls for + tasks until the invocation deadline approaches, and then gracefully shuts down. + + The *configure* callback is invoked **once per invocation**, inside that invocation's + event loop, before the client connects. It may be synchronous or asynchronous: + + * **Synchronous** ``def configure(config) -> None`` — runs per invocation. Use for + static worker definition (task queue, registrations, option tuning) and resources + that are not bound to an event loop. + * **Async** ``async def configure(config) -> None`` — awaited per invocation. Use when + setup must ``await`` (for example, opening an async client). Pair with + ``shutdown_hooks`` for teardown. + * **Async generator** ``async def configure(config): ...; yield; ...`` (or an + equivalent ``@contextlib.asynccontextmanager``-decorated function) — entered per + invocation. Statements before the single ``yield`` run before the client connects; + the worker runs while the generator is suspended at the ``yield``; statements after + the ``yield`` run as teardown once the worker has stopped. This is the recommended + shape for event-loop-bound resources that must live for the duration of the + invocation, such as an ``aioboto3`` S3 client backing the external-storage data + converter (see the async example below). + + The callback runs per invocation (not once at cold start) because event-loop-bound + resources cannot be created at cold start (there is no running loop) and cannot be + shared across invocations (each invocation runs under a fresh ``asyncio.run`` loop). + + The *version* parameter identifies this worker's deployment version. ``run_worker`` + always enables Worker Deployment Versioning (``use_worker_versioning=True``). To + provide a default versioning behavior for workflows that do not specify one at + registration time, set ``deployment_config`` in ``worker_config`` in the configure + callback. + + The returned handler has the signature ``handler(event, context)`` and should be set as + your Lambda function's handler entry point. Args: version: The worker deployment version. Required. configure: A callback that receives a :py:class:`LambdaWorkerConfig` (pre-populated with Lambda defaults) and configures workflows, - activities, and options on it. + activities, and options on it. May be sync, async, or an async + generator (see above). Returns: A Lambda handler function. - Example:: + Example: + Synchronous configure (static worker definition):: - from temporalio.common import WorkerDeploymentVersion - from temporalio.contrib.aws.lambda_worker import ( - LambdaWorkerConfig, - run_worker, - ) + from temporalio.common import WorkerDeploymentVersion + from temporalio.contrib.aws.lambda_worker import ( + LambdaWorkerConfig, + run_worker, + ) - def configure(config: LambdaWorkerConfig) -> None: - config.worker_config["task_queue"] = "my-task-queue" - config.worker_config["workflows"] = [MyWorkflow] - config.worker_config["activities"] = [my_activity] - - lambda_handler = run_worker( - WorkerDeploymentVersion( - deployment_name="my-service", - build_id="v1.0", - ), - configure, - ) + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0"), + configure, + ) + + Async generator configure, bracketing an ``aioboto3`` S3 client that backs the + external-storage data converter (opened before connect, closed after the worker + stops):: + + import aioboto3 + import dataclasses + from temporalio.contrib.aws.s3driver import S3StorageDriver + from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client + from temporalio.converter import DataConverter, ExternalStorage + + async def configure(config: LambdaWorkerConfig): + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + async with aioboto3.Session().client("s3") as s3_client: + driver = S3StorageDriver( + client=new_aioboto3_client(s3_client), bucket="my-payloads", + ) + config.client_connect_config["data_converter"] = dataclasses.replace( + DataConverter.default, + external_storage=ExternalStorage(drivers=[driver]), + ) + yield + + lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0"), + configure, + ) """ deps = _WorkerDeps() try: @@ -121,7 +197,7 @@ def configure(config: LambdaWorkerConfig) -> None: def _run_worker_internal( version: WorkerDeploymentVersion, - configure: Callable[[LambdaWorkerConfig], None], + configure: ConfigureCallback, deps: _WorkerDeps, ) -> Callable[[Any, Any], None]: """Core logic with injected dependencies for testability.""" @@ -133,46 +209,38 @@ def _run_worker_internal( # Load client config from envconfig / TOML. load_config = deps.load_config or (lambda: _default_load_config(deps.getenv)) profile = load_config() - connect_config: ClientConnectConfig = {**profile.to_client_connect_config()} + base_connect_config: ClientConnectConfig = {**profile.to_client_connect_config()} - # Build worker config with Lambda defaults. - worker_config: WorkerConfig = {} - apply_lambda_worker_defaults(worker_config) + # Build base worker config with Lambda defaults. + base_worker_config: WorkerConfig = {} + apply_lambda_worker_defaults(base_worker_config) # Always enable deployment versioning. - worker_config["deployment_config"] = WorkerDeploymentConfig( + base_worker_config["deployment_config"] = WorkerDeploymentConfig( version=version, use_worker_versioning=True, ) # Calculate default shutdown buffer. - graceful_timeout = worker_config.get( + graceful_timeout = base_worker_config.get( "graceful_shutdown_timeout", timedelta(seconds=5) ) shutdown_buffer = graceful_timeout + DEFAULT_SHUTDOWN_HOOK_BUFFER - # Pre-populate config with defaults. - config = LambdaWorkerConfig( - client_connect_config=connect_config, - worker_config=worker_config, - shutdown_deadline_buffer=shutdown_buffer, - ) - - # Pre-populate task queue from environment if available. env_tq = deps.getenv("TEMPORAL_TASK_QUEUE") - if env_tq: - config.worker_config["task_queue"] = env_tq - - # Call user configure callback with pre-populated config. - configure(config) - # Validate task queue. - if not config.worker_config.get("task_queue"): - raise ValueError( - "task queue not configured: set " - 'worker_config["task_queue"] or the ' - "TEMPORAL_TASK_QUEUE environment variable" + def _new_config() -> LambdaWorkerConfig: + """Fresh config per invocation; dicts/hooks are copied so nothing leaks across + invocations. + """ + config = LambdaWorkerConfig( + client_connect_config={**base_connect_config}, + worker_config={**base_worker_config}, + shutdown_deadline_buffer=shutdown_buffer, ) + if env_tq: + config.worker_config["task_queue"] = env_tq + return config extract_lambda_ctx = deps.extract_lambda_ctx or _default_extract_lambda_ctx @@ -180,7 +248,8 @@ def _handler(_event: Any, lambda_context: Any) -> None: asyncio.run( _invocation_handler( lambda_context=lambda_context, - config=config, + configure=configure, + new_config=_new_config, deps=deps, extract_lambda_ctx=extract_lambda_ctx, ) @@ -189,71 +258,107 @@ def _handler(_event: Any, lambda_context: Any) -> None: return _handler +@asynccontextmanager +async def _invocation_config_scope( + configure: ConfigureCallback, + new_config: Callable[[], LambdaWorkerConfig], +) -> AsyncGenerator[LambdaWorkerConfig, None]: + """Run *configure* (see run_worker for the forms) against a fresh per-invocation config + and yield it. For the generator / context-manager forms, post-``yield`` teardown runs + when the caller's block exits, including on error. Task queue is validated after + setup. + """ + config = new_config() + if inspect.isasyncgenfunction(configure): + # Wrap the bare async generator so it drives like a context manager: setup on + # enter, teardown on exit. + cm: Any = asynccontextmanager(configure)(config) + else: + result = configure(config) + if inspect.isawaitable(result): + await result + # A @asynccontextmanager-decorated callback returns the context manager directly. + cm = result if result is not None and hasattr(result, "__aenter__") else None + + if cm is not None: + async with cm: + _validate_task_queue(config) + yield config + else: + _validate_task_queue(config) + yield config + + async def _invocation_handler( *, lambda_context: Any, - config: LambdaWorkerConfig, + configure: ConfigureCallback, + new_config: Callable[[], LambdaWorkerConfig], deps: _WorkerDeps, extract_lambda_ctx: Callable[[Any], tuple[str, str] | None], ) -> None: """Handle a single Lambda invocation.""" - shutdown_buffer = config.shutdown_deadline_buffer - - # Check deadline feasibility. - remaining_ms_fn = getattr(lambda_context, "get_remaining_time_in_millis", None) - deadline_available = remaining_ms_fn is not None - if deadline_available: - assert remaining_ms_fn is not None - remaining = timedelta(milliseconds=remaining_ms_fn()) - work_time = remaining - shutdown_buffer - if work_time <= timedelta(seconds=1): - raise RuntimeError( - f"Lambda timeout is too short: {remaining.total_seconds():.1f}s " - f"remaining but {shutdown_buffer.total_seconds():.1f}s is " - f"reserved for shutdown, leaving no time for work. " - f"Increase the function timeout or decrease the shutdown " - f"deadline buffer" - ) - elif work_time < timedelta(seconds=5): - logger.warning( - "Lambda timeout leaves less than 5s for work after " - "shutdown buffer; consider increasing the function " - "timeout or decreasing the shutdown deadline buffer " - "(work_time=%s, shutdown_buffer=%s)", - work_time, - shutdown_buffer, - ) - - # Build per-invocation connect kwargs with identity from Lambda context. - invocation_connect_kwargs: ClientConnectConfig = {**config.client_connect_config} - if "identity" not in invocation_connect_kwargs: - ctx_info = extract_lambda_ctx(lambda_context) - if ctx_info is not None: - request_id, function_arn = ctx_info - invocation_connect_kwargs["identity"] = build_lambda_identity( - request_id, function_arn - ) - - # Connect to Temporal. - client = await deps.connect(**invocation_connect_kwargs) - - # Create the worker. - worker = deps.create_worker(client, **config.worker_config) - - # Run the worker until the deadline approaches or context is done. - if deadline_available: - assert remaining_ms_fn is not None - work_time_secs = ( - timedelta(milliseconds=remaining_ms_fn()) - shutdown_buffer - ).total_seconds() - if work_time_secs > 0: - try: - await asyncio.wait_for(worker.run(), timeout=work_time_secs) - except asyncio.TimeoutError: - pass - else: - # No deadline - run until cancelled. - await worker.run() - - # Run shutdown hooks after worker has stopped. - await _run_shutdown_hooks(config) + async with _invocation_config_scope(configure, new_config) as config: + shutdown_buffer = config.shutdown_deadline_buffer + + # Check deadline feasibility. + remaining_ms_fn = getattr(lambda_context, "get_remaining_time_in_millis", None) + deadline_available = remaining_ms_fn is not None + if deadline_available: + assert remaining_ms_fn is not None + remaining = timedelta(milliseconds=remaining_ms_fn()) + work_time = remaining - shutdown_buffer + if work_time <= timedelta(seconds=1): + raise RuntimeError( + f"Lambda timeout is too short: {remaining.total_seconds():.1f}s " + f"remaining but {shutdown_buffer.total_seconds():.1f}s is " + f"reserved for shutdown, leaving no time for work. " + f"Increase the function timeout or decrease the shutdown " + f"deadline buffer" + ) + elif work_time < timedelta(seconds=5): + logger.warning( + "Lambda timeout leaves less than 5s for work after " + "shutdown buffer; consider increasing the function " + "timeout or decreasing the shutdown deadline buffer " + "(work_time=%s, shutdown_buffer=%s)", + work_time, + shutdown_buffer, + ) + + # Build per-invocation connect kwargs with identity from Lambda context. + invocation_connect_kwargs: ClientConnectConfig = { + **config.client_connect_config + } + if "identity" not in invocation_connect_kwargs: + ctx_info = extract_lambda_ctx(lambda_context) + if ctx_info is not None: + request_id, function_arn = ctx_info + invocation_connect_kwargs["identity"] = build_lambda_identity( + request_id, function_arn + ) + + # Connect to Temporal. + client = await deps.connect(**invocation_connect_kwargs) + + # Create the worker. + worker = deps.create_worker(client, **config.worker_config) + + # Run the worker until the deadline approaches or context is done. + if deadline_available: + assert remaining_ms_fn is not None + work_time_secs = ( + timedelta(milliseconds=remaining_ms_fn()) - shutdown_buffer + ).total_seconds() + if work_time_secs > 0: + try: + await asyncio.wait_for(worker.run(), timeout=work_time_secs) + except asyncio.TimeoutError: + pass + else: + # No deadline - run until cancelled. + await worker.run() + + # Run shutdown hooks after worker has stopped, before any async-generator + # configure teardown (which unwinds on scope exit). + await _run_shutdown_hooks(config) diff --git a/tests/contrib/aws/lambda_worker/test_lambda_worker.py b/tests/contrib/aws/lambda_worker/test_lambda_worker.py index cda1cd12f..9d2ec78a7 100644 --- a/tests/contrib/aws/lambda_worker/test_lambda_worker.py +++ b/tests/contrib/aws/lambda_worker/test_lambda_worker.py @@ -2,6 +2,9 @@ from __future__ import annotations +import dataclasses +import itertools +from contextlib import asynccontextmanager from datetime import timedelta from pathlib import Path from typing import Any @@ -126,8 +129,6 @@ def second_hook() -> None: assert second_called def test_is_dataclass(self) -> None: - import dataclasses - assert dataclasses.is_dataclass(LambdaWorkerConfig) def test_default_field_independence(self) -> None: @@ -278,14 +279,17 @@ def test_configure_callback_error(self) -> None: def bad_configure(_config: LambdaWorkerConfig) -> None: raise RuntimeError("bad config") + # configure runs per invocation, so the error surfaces when the handler is invoked. + handler = _run_worker_internal(TEST_VERSION, bad_configure, deps) with pytest.raises(RuntimeError, match="bad config"): - _run_worker_internal(TEST_VERSION, bad_configure, deps) + handler({}, _make_lambda_context()) def test_missing_task_queue(self) -> None: deps = _make_test_deps() deps.getenv = lambda _: None # type: ignore[assignment] + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) with pytest.raises(ValueError, match="task queue not configured"): - _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler({}, _make_lambda_context()) def test_missing_version(self) -> None: deps = _make_test_deps() @@ -494,7 +498,8 @@ def test_task_queue_pre_populated_from_env(self) -> None: def configure(config: LambdaWorkerConfig) -> None: task_queues.append(config.worker_config.get("task_queue")) - _run_worker_internal(TEST_VERSION, configure, deps) + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) assert task_queues[0] == "test-queue" def test_config_pre_populated_with_defaults(self) -> None: @@ -505,7 +510,8 @@ def test_config_pre_populated_with_defaults(self) -> None: def configure(config: LambdaWorkerConfig) -> None: captured.append(config) - _run_worker_internal(TEST_VERSION, configure, deps) + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) wc = captured[0].worker_config assert wc.get("max_concurrent_activities") == DEFAULT_MAX_CONCURRENT_ACTIVITIES assert wc.get("disable_eager_activity_execution") is True @@ -522,3 +528,141 @@ def test_no_deadline_runs_until_complete(self) -> None: ctx.aws_request_id = "req-123" ctx.invoked_function_arn = "arn:aws:lambda:us-east-1:123:function:f" handler({}, ctx) + + +class TestAsyncConfigure: + """configure may be sync, an async coroutine, or an async generator; all run once per + invocation inside that invocation's event loop.""" + + def test_async_configure_called_per_invocation(self) -> None: + deps = _make_test_deps() + calls = 0 + + async def configure(config: LambdaWorkerConfig) -> None: + nonlocal calls + calls += 1 + config.worker_config["task_queue"] = "async-queue" + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + assert calls == 2 + + def test_async_configure_can_set_data_converter(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + sentinel = object() + + async def configure(config: LambdaWorkerConfig) -> None: + config.client_connect_config["data_converter"] = sentinel # type: ignore[typeddict-item] + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert connect_capture[0]["data_converter"] is sentinel + + def test_async_generator_setup_runs_before_connect_teardown_after(self) -> None: + order: list[str] = [] + deps = _make_test_deps() + + original_connect = deps.connect + + async def tracking_connect(**kwargs: Any) -> Any: + order.append("connect") + return await original_connect(**kwargs) + + deps.connect = tracking_connect + + async def configure(config: LambdaWorkerConfig): + order.append("setup") + config.worker_config["task_queue"] = "gen-queue" + yield + order.append("teardown") + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert order == ["setup", "connect", "teardown"] + + def test_async_generator_teardown_runs_on_error(self) -> None: + """Teardown after the yield runs even when the worker run raises.""" + torn_down = False + deps = _make_test_deps() + + async def failing_run() -> None: + raise RuntimeError("worker boom") + + def fake_create_worker(_client: Any, **_kwargs: Any) -> Any: + w = MagicMock() + w.run = failing_run + return w + + deps.create_worker = fake_create_worker + + async def configure(config: LambdaWorkerConfig): + nonlocal torn_down + config.worker_config["task_queue"] = "gen-queue" + try: + yield + finally: + torn_down = True + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with pytest.raises(RuntimeError, match="worker boom"): + handler({}, _make_lambda_context()) + assert torn_down + + def test_async_generator_resource_per_invocation(self) -> None: + """Each invocation builds and tears down its own resource instance. Tagging each + resource with a construction sequence number proves the instances are distinct + (open-1/close-1, then open-2/close-2) rather than one resource reopened.""" + events: list[str] = [] + counter = itertools.count(1) + deps = _make_test_deps() + + class FakeResource: + def __init__(self) -> None: + self.n = next(counter) + + async def __aenter__(self) -> FakeResource: + events.append(f"open-{self.n}") + return self + + async def __aexit__(self, *exc: Any) -> None: + events.append(f"close-{self.n}") + + async def configure(config: LambdaWorkerConfig): + config.worker_config["task_queue"] = "gen-queue" + async with FakeResource(): + yield + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + assert events == ["open-1", "close-1", "open-2", "close-2"] + + def test_async_configure_validates_task_queue_per_invocation(self) -> None: + deps = _make_test_deps() + deps.getenv = lambda _: None # type: ignore[assignment] + + async def configure(_config: LambdaWorkerConfig) -> None: + pass + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with pytest.raises(ValueError, match="task queue not configured"): + handler({}, _make_lambda_context()) + + def test_asynccontextmanager_decorated_configure_supported(self) -> None: + """A @asynccontextmanager-decorated configure is entered and exited per invocation, + the same as a bare async generator.""" + events: list[str] = [] + deps = _make_test_deps() + + @asynccontextmanager + async def configure(config: LambdaWorkerConfig): + events.append("setup") + config.worker_config["task_queue"] = "gen-queue" + yield + events.append("teardown") + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert events == ["setup", "teardown"] From 7b4e1b46164d6c6328e241b9d7855da75af5ce94 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:58:02 -0700 Subject: [PATCH 2/4] example adjustment --- temporalio/contrib/aws/lambda_worker/_run_worker.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py index 52775cda5..a7bc9ab4e 100644 --- a/temporalio/contrib/aws/lambda_worker/_run_worker.py +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -157,9 +157,9 @@ def configure(config: LambdaWorkerConfig) -> None: configure, ) - Async generator configure, bracketing an ``aioboto3`` S3 client that backs the - external-storage data converter (opened before connect, closed after the worker - stops):: + Async generator configure, bracketing an ``aioboto3`` S3 client. The session + lives at module scope (it is not event-loop-bound and caches credentials across + warm invocations); only the loop-bound client is opened per invocation:: import aioboto3 import dataclasses @@ -167,10 +167,12 @@ def configure(config: LambdaWorkerConfig) -> None: from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client from temporalio.converter import DataConverter, ExternalStorage + session = aioboto3.Session() + async def configure(config: LambdaWorkerConfig): config.worker_config["task_queue"] = "my-task-queue" config.worker_config["workflows"] = [MyWorkflow] - async with aioboto3.Session().client("s3") as s3_client: + async with session.client("s3") as s3_client: driver = S3StorageDriver( client=new_aioboto3_client(s3_client), bucket="my-payloads", ) From 394bc41bf70de99b47a981517e2d26d346bea361 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:14:53 -0700 Subject: [PATCH 3/4] comment updates --- temporalio/contrib/aws/lambda_worker/_run_worker.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py index a7bc9ab4e..f384fb954 100644 --- a/temporalio/contrib/aws/lambda_worker/_run_worker.py +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -95,8 +95,8 @@ def run_worker( connects to the Temporal server, starts a worker with Lambda-tuned defaults, polls for tasks until the invocation deadline approaches, and then gracefully shuts down. - The *configure* callback is invoked **once per invocation**, inside that invocation's - event loop, before the client connects. It may be synchronous or asynchronous: + The *configure* callback is invoked **once per invocation** and may be synchronous or + asynchronous: * **Synchronous** ``def configure(config) -> None`` — runs per invocation. Use for static worker definition (task queue, registrations, option tuning) and resources @@ -108,7 +108,10 @@ def run_worker( equivalent ``@contextlib.asynccontextmanager``-decorated function) — entered per invocation. Statements before the single ``yield`` run before the client connects; the worker runs while the generator is suspended at the ``yield``; statements after - the ``yield`` run as teardown once the worker has stopped. This is the recommended + the ``yield`` run as teardown once the worker has stopped. Any ``shutdown_hooks`` + registered before the ``yield`` run after the worker stops but *before* the + post-``yield`` teardown, so this resource outlives the hooks (e.g. a telemetry + flush hook can still emit before the resource is closed). This is the recommended shape for event-loop-bound resources that must live for the duration of the invocation, such as an ``aioboto3`` S3 client backing the external-storage data converter (see the async example below). From 65855ca493c4854210dec47e84e6819ff567e4bf Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Tue, 16 Jun 2026 09:19:35 -0700 Subject: [PATCH 4/4] update changelog --- CHANGELOG.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c975cfec..0183a74a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,19 @@ to docs, or any other relevant information. ## [Unreleased] +### Changed + +- AWS Lambda worker `configure` parameter supports sync, async, and async + generator style functions. This callback is invoked on the asyncio event + loop. + +### Breaking Changes + +- AWS Lambda worker `configure` parameter has been changed to be invoked + per-invocation of the worker instead of only at startup. It is advised that + any shared, heavy-weight operations are performed outside of the callback + before `run_worker` is invoked. + ## [1.29.0] - 2026-06-17 ### Added