From f9179b06f44bb201401e736a0cd0e59b3974e856 Mon Sep 17 00:00:00 2001 From: jonathan343 Date: Thu, 11 Jun 2026 23:19:10 -0400 Subject: [PATCH 1/2] feat: validate transport support for bidirectional streaming Most Python HTTP clients cannot read response data while the request body is still being written, so bidirectional event stream operations fail mid-stream with an opaque connection error. Transports must now opt in by setting SUPPORTS_DUPLEX_STREAMING to True, and RequestPipeline.duplex_stream fails fast with UnsupportedTransportError when the configured transport does not declare support. AWSCRTHTTPClient and MockHTTPClient declare support; AIOHTTPClient explicitly does not. --- .../codegen/HttpProtocolTestGenerator.java | 2 + .../codegen/generators/ConfigGenerator.java | 4 +- designs/http-interfaces.md | 8 + ...ment-318f09ab1c7e425e88c6e8ba970cb2fe.json | 4 + .../smithy-core/src/smithy_core/aio/client.py | 19 +- .../smithy_core/aio/interfaces/__init__.py | 8 + .../smithy-core/src/smithy_core/exceptions.py | 5 + .../smithy-core/tests/unit/aio/test_client.py | 226 ++++++++++++++++++ ...ment-91c7d1ab9ee64c0fbfc3dcb60f52c284.json | 4 + .../src/smithy_http/aio/aiohttp.py | 4 + .../smithy-http/src/smithy_http/aio/crt.py | 4 + .../src/smithy_http/testing/mockhttp.py | 5 + .../tests/unit/aio/test_aiohttp.py | 7 + .../smithy-http/tests/unit/aio/test_crt.py | 4 + .../tests/unit/testing/test_mockhttp.py | 6 + uv.lock | 2 +- 16 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json create mode 100644 packages/smithy-core/tests/unit/aio/test_client.py create mode 100644 packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json create mode 100644 packages/smithy-http/tests/unit/aio/test_aiohttp.py diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java index c432847bb..7e933fa9f 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java @@ -662,6 +662,7 @@ class $3L: ""\"An asynchronous HTTP client solely for testing purposes.""\" TIMEOUT_EXCEPTIONS = () + SUPPORTS_DUPLEX_STREAMING: bool = True def __init__(self, *, client_config: HTTPClientConfiguration | None = None): self._client_config = client_config @@ -677,6 +678,7 @@ class $4L: ""\"An asynchronous HTTP client solely for testing purposes.""\" TIMEOUT_EXCEPTIONS = () + SUPPORTS_DUPLEX_STREAMING: bool = True def __init__( self, diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java index de03d42d0..8738e07d6 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java @@ -147,7 +147,9 @@ private static List getProtocolProperties(GenerationContext cont .namespace("smithy_core.aio.interfaces", ".") .build()) .build()) - .documentation("The transport to use to send requests (e.g. an HTTP client)."); + .documentation("The transport to use to send requests (e.g. an HTTP client). " + + "Operations with bidirectional event streams require a transport that " + + "sets SUPPORTS_DUPLEX_STREAMING to True, such as AWSCRTHTTPClient."); if (context.applicationProtocol().isHttpProtocol()) { properties.addAll(HTTP_PROPERTIES); diff --git a/designs/http-interfaces.md b/designs/http-interfaces.md index 9a41f1268..108e81112 100644 --- a/designs/http-interfaces.md +++ b/designs/http-interfaces.md @@ -258,6 +258,14 @@ which takes a request and some configuration and asynchronously return a respons Having a minimal interface makes it much easier to implement these interfaces on top of a variety http libraries. +Clients that support duplex (bidirectional) event streaming, which in practice +requires HTTP/2, must declare it by setting the `SUPPORTS_DUPLEX_STREAMING` class +attribute to `True`. Clients are assumed not to support it otherwise, and duplex +stream operations invoked with such a client fail fast with an +`UnsupportedTransportError`. This surfaces the configuration problem before any +request is sent instead of letting the stream fail later with an opaque connection +error. + ```python @dataclass(kw_only=True) class HTTPRequestConfiguration: diff --git a/packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json b/packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json new file mode 100644 index 000000000..9bb9ab83f --- /dev/null +++ b/packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json @@ -0,0 +1,4 @@ +{ + "type": "enhancement", + "description": "Added `SUPPORTS_DUPLEX_STREAMING` to `ClientTransport` so transports can declare support for duplex (bidirectional) event streaming. `RequestPipeline.duplex_stream` now fails fast with an `UnsupportedTransportError` when the configured transport does not declare support." +} diff --git a/packages/smithy-core/src/smithy_core/aio/client.py b/packages/smithy-core/src/smithy_core/aio/client.py index 6060727b4..1cc97459d 100644 --- a/packages/smithy-core/src/smithy_core/aio/client.py +++ b/packages/smithy-core/src/smithy_core/aio/client.py @@ -12,7 +12,12 @@ from ..auth import AuthParams from ..deserializers import DeserializeableShape, ShapeDeserializer from ..endpoints import EndpointResolverParams -from ..exceptions import ClientTimeoutError, RetryError, SmithyError +from ..exceptions import ( + ClientTimeoutError, + RetryError, + SmithyError, + UnsupportedTransportError, +) from ..interceptors import ( InputContext, Interceptor, @@ -197,6 +202,18 @@ async def duplex_stream[ :param output_event_type: The event type to receive in the output stream. :param event_deserializer: The method used to deserialize events. """ + # The transport is assumed not to support duplex streaming unless it + # explicitly declares otherwise. + if not getattr(self.transport, "SUPPORTS_DUPLEX_STREAMING", False): + raise UnsupportedTransportError( + f"The configured transport ({type(self.transport).__name__}) does " + f"not support duplex (bidirectional) event streaming, which is " + f"required by the {call.operation.schema.id} operation. Use a " + f"transport that does, such as " + f"smithy_http.aio.crt.AWSCRTHTTPClient. Custom transports that " + f"support duplex streaming must set SUPPORTS_DUPLEX_STREAMING " + f"to True." + ) request_future = Future[RequestContext[I, TRequest]]() execute_task = asyncio.create_task(self._execute_request(call, request_future)) request_context = await request_future diff --git a/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py b/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py index 0c900a8cf..90d63d9ea 100644 --- a/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py +++ b/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py @@ -90,6 +90,14 @@ class ClientTransport[I: Request, O: Response](Protocol): TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] + SUPPORTS_DUPLEX_STREAMING: bool = False + """Whether this transport can read response data while the request body is still + being written (typically requires HTTP/2). + + Transports that support duplex (bidirectional) event streaming must explicitly set + this to True. Transports that don't declare it are assumed not to support it. + """ + async def send(self, request: I) -> O: """Send a request over the transport and receive the response.""" ... diff --git a/packages/smithy-core/src/smithy_core/exceptions.py b/packages/smithy-core/src/smithy_core/exceptions.py index 0a99976f9..80874152d 100644 --- a/packages/smithy-core/src/smithy_core/exceptions.py +++ b/packages/smithy-core/src/smithy_core/exceptions.py @@ -114,5 +114,10 @@ class UnsupportedStreamError(SmithyError): streams are not supported.""" +class UnsupportedTransportError(SmithyError): + """Indicates that an operation requires a transport capability that the configured + transport does not declare support for (e.g. duplex event streaming).""" + + class EndpointResolutionError(SmithyError): """Exception type for all exceptions raised by endpoint resolution.""" diff --git a/packages/smithy-core/tests/unit/aio/test_client.py b/packages/smithy-core/tests/unit/aio/test_client.py new file mode 100644 index 000000000..6cb08e2dc --- /dev/null +++ b/packages/smithy-core/tests/unit/aio/test_client.py @@ -0,0 +1,226 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Any, Self, cast + +import pytest +from smithy_core import URI +from smithy_core.aio.client import ClientCall, RequestPipeline +from smithy_core.aio.eventstream import DuplexEventStream, InputEventStream +from smithy_core.aio.interfaces import ClientProtocol, ClientTransport +from smithy_core.deserializers import ShapeDeserializer +from smithy_core.documents import TypeRegistry +from smithy_core.endpoints import EndpointResolverParams +from smithy_core.exceptions import UnsupportedTransportError +from smithy_core.interceptors import InterceptorChain +from smithy_core.schemas import APIOperation, Schema +from smithy_core.serializers import ShapeSerializer +from smithy_core.shapes import ShapeID, ShapeType +from smithy_core.traits import StreamingTrait +from smithy_core.types import TypedProperties + +_STRING = Schema(id=ShapeID("smithy.api#String"), shape_type=ShapeType.STRING) + +_EVENTS = Schema.collection( + id=ShapeID("com.example#Events"), + shape_type=ShapeType.UNION, + members={"message": {"target": _STRING}}, +) + +_INPUT_SCHEMA = Schema.collection( + id=ShapeID("com.example#StreamingInput"), + members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, +) + +_OUTPUT_SCHEMA = Schema.collection( + id=ShapeID("com.example#StreamingOutput"), + members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, +) + + +class _Input: + def serialize(self, serializer: ShapeSerializer) -> None: + pass + + +class _Output: + @classmethod + def deserialize(cls, deserializer: ShapeDeserializer) -> Self: + return cls() + + +class _Event: + def serialize(self, serializer: ShapeSerializer) -> None: + pass + + @classmethod + def deserialize(cls, deserializer: ShapeDeserializer) -> Self: + return cls() + + +_OPERATION = APIOperation( + input=_Input, + output=_Output, + schema=Schema( + id=ShapeID("com.example#StreamingOperation"), + shape_type=ShapeType.OPERATION, + ), + input_schema=_INPUT_SCHEMA, + output_schema=_OUTPUT_SCHEMA, + error_registry=TypeRegistry({}), + effective_auth_schemes=[], + error_schemas=[], +) + + +class _StubRequest: + def __init__(self) -> None: + self.destination = URI(host="example.com") + self.body = b"" + + async def consume_body_async(self) -> bytes: + return b"" + + def consume_body(self) -> bytes: + return b"" + + +class _StubResponse: + body = b"" + + async def consume_body_async(self) -> bytes: + return b"" + + def consume_body(self) -> bytes: + return b"" + + +class _StubEventPublisher: + async def send(self, event: Any) -> None: + pass + + async def close(self) -> None: + pass + + +class _StubEventReceiver: + async def receive(self) -> Any: + return None + + async def close(self) -> None: + pass + + +class _StubProtocol: + @property + def id(self) -> ShapeID: + return ShapeID("com.example#testProtocol") + + def serialize_request(self, **kwargs: Any) -> _StubRequest: + return _StubRequest() + + def set_service_endpoint(self, *, request: Any, endpoint: Any) -> Any: + return request + + async def deserialize_response(self, **kwargs: Any) -> _Output: + return _Output() + + def create_event_publisher(self, **kwargs: Any) -> _StubEventPublisher: + return _StubEventPublisher() + + def create_event_receiver(self, **kwargs: Any) -> _StubEventReceiver: + return _StubEventReceiver() + + +class _StubEndpoint: + def __init__(self) -> None: + self.uri = URI(host="example.com") + self.properties = TypedProperties() + + +class _StubEndpointResolver: + async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Any: + return _StubEndpoint() + + +class _StubAuthResolver: + def resolve_auth_scheme(self, *, auth_parameters: Any) -> list[Any]: + return [] + + +class _UndeclaredTransport: + """A transport that does not declare whether it supports duplex streaming.""" + + TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] = () + + async def send(self, request: Any) -> _StubResponse: + return _StubResponse() + + +class _NonDuplexTransport(_UndeclaredTransport): + SUPPORTS_DUPLEX_STREAMING = False + + +class _DuplexTransport(_UndeclaredTransport): + SUPPORTS_DUPLEX_STREAMING = True + + +def _pipeline(transport: object) -> RequestPipeline[Any, Any]: + # The stubs are intentionally structural (they don't subclass the + # protocols), so cast them to keep the type checker focused on the + # runtime behavior under test. + return RequestPipeline( + protocol=cast("ClientProtocol[Any, Any]", _StubProtocol()), + transport=cast("ClientTransport[Any, Any]", transport), + ) + + +def _client_call() -> ClientCall[Any, Any]: + return ClientCall( + input=_Input(), + operation=_OPERATION, + context=TypedProperties(), + interceptor=InterceptorChain([]), + auth_scheme_resolver=_StubAuthResolver(), + supported_auth_schemes={}, + endpoint_resolver=_StubEndpointResolver(), + retry_strategy=None, # type: ignore[arg-type] # unused for streaming input + ) + + +async def test_duplex_stream_raises_for_undeclared_transport() -> None: + pipeline = _pipeline(_UndeclaredTransport()) + + with pytest.raises(UnsupportedTransportError) as exc_info: + await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize) + + assert "_UndeclaredTransport" in str(exc_info.value) + assert "com.example#StreamingOperation" in str(exc_info.value) + + +async def test_duplex_stream_raises_for_non_duplex_transport() -> None: + pipeline = _pipeline(_NonDuplexTransport()) + + with pytest.raises(UnsupportedTransportError): + await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize) + + +async def test_duplex_stream_proceeds_for_duplex_transport() -> None: + pipeline = _pipeline(_DuplexTransport()) + + stream = await pipeline.duplex_stream( + _client_call(), _Event, _Event, _Event.deserialize + ) + + assert isinstance(stream, DuplexEventStream) + output, output_stream = await stream.await_output() + assert isinstance(output, _Output) + assert isinstance(output_stream, _StubEventReceiver) + + +async def test_input_stream_does_not_require_duplex_support() -> None: + pipeline = _pipeline(_NonDuplexTransport()) + + stream = await pipeline.input_stream(_client_call(), _Event) + + assert isinstance(stream, InputEventStream) + assert isinstance(await stream.await_output(), _Output) diff --git a/packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json b/packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json new file mode 100644 index 000000000..89b8706ff --- /dev/null +++ b/packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json @@ -0,0 +1,4 @@ +{ + "type": "enhancement", + "description": "Declared duplex (bidirectional) streaming support on `AWSCRTHTTPClient` via `SUPPORTS_DUPLEX_STREAMING`. `AIOHTTPClient` explicitly does not support it." +} diff --git a/packages/smithy-http/src/smithy_http/aio/aiohttp.py b/packages/smithy-http/src/smithy_http/aio/aiohttp.py index 5a330931d..5c11110f7 100644 --- a/packages/smithy-http/src/smithy_http/aio/aiohttp.py +++ b/packages/smithy-http/src/smithy_http/aio/aiohttp.py @@ -53,6 +53,10 @@ class AIOHTTPClient(HTTPClient): TIMEOUT_EXCEPTIONS = (TimeoutError,) + # aiohttp has no HTTP/2 support and this client fully buffers the response + # before returning, so it can never interleave request and response data. + SUPPORTS_DUPLEX_STREAMING = False + def __init__( self, *, diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 1cb5c34b5..12b4ce3e2 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -143,6 +143,10 @@ class AWSCRTHTTPClient(http_aio_interfaces.HTTPClient): TIMEOUT_EXCEPTIONS = (_CRTTimeoutError,) + # True duplex streaming additionally requires the connection to negotiate + # HTTP/2 via ALPN; over HTTP/1.1 the CRT falls back to a buffered body_stream. + SUPPORTS_DUPLEX_STREAMING = True + def __init__( self, eventloop: _AWSCRTEventLoop | None = None, diff --git a/packages/smithy-http/src/smithy_http/testing/mockhttp.py b/packages/smithy-http/src/smithy_http/testing/mockhttp.py index 95d1f758d..1bcc010e4 100644 --- a/packages/smithy-http/src/smithy_http/testing/mockhttp.py +++ b/packages/smithy-http/src/smithy_http/testing/mockhttp.py @@ -22,6 +22,11 @@ class MockHTTPClient(HTTPClient): TIMEOUT_EXCEPTIONS = (TimeoutError,) + # Declared so duplex (bidirectional) stream operations can be unit tested + # against this client. Queued responses are returned without consuming the + # request body, which is compatible with duplex streaming. + SUPPORTS_DUPLEX_STREAMING = True + def __init__( self, *, diff --git a/packages/smithy-http/tests/unit/aio/test_aiohttp.py b/packages/smithy-http/tests/unit/aio/test_aiohttp.py new file mode 100644 index 000000000..8e43414e6 --- /dev/null +++ b/packages/smithy-http/tests/unit/aio/test_aiohttp.py @@ -0,0 +1,7 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from smithy_http.aio.aiohttp import AIOHTTPClient + + +def test_does_not_support_duplex_streaming() -> None: + assert AIOHTTPClient.SUPPORTS_DUPLEX_STREAMING is False diff --git a/packages/smithy-http/tests/unit/aio/test_crt.py b/packages/smithy-http/tests/unit/aio/test_crt.py index 9718b3a2a..089529b3a 100644 --- a/packages/smithy-http/tests/unit/aio/test_crt.py +++ b/packages/smithy-http/tests/unit/aio/test_crt.py @@ -27,6 +27,10 @@ def test_deepcopy_client() -> None: deepcopy(client) +def test_supports_duplex_streaming() -> None: + assert AWSCRTHTTPClient.SUPPORTS_DUPLEX_STREAMING is True + + def test_client_marshal_request() -> None: """Test that HTTPRequest is correctly marshaled to CRT HttpRequest.""" client = AWSCRTHTTPClient() diff --git a/packages/smithy-http/tests/unit/testing/test_mockhttp.py b/packages/smithy-http/tests/unit/testing/test_mockhttp.py index 9efe68589..7784051db 100644 --- a/packages/smithy-http/tests/unit/testing/test_mockhttp.py +++ b/packages/smithy-http/tests/unit/testing/test_mockhttp.py @@ -5,6 +5,12 @@ from smithy_http.testing import MockHTTPClient, MockHTTPClientError, create_test_request +def test_supports_duplex_streaming(): + # Declared True so generated duplex stream operations can be unit tested + # against this client. + assert MockHTTPClient.SUPPORTS_DUPLEX_STREAMING is True + + async def test_default_response(): # Test error when no responses are queued mock_client = MockHTTPClient() diff --git a/uv.lock b/uv.lock index 90e8cea62..e90f42d74 100644 --- a/uv.lock +++ b/uv.lock @@ -753,7 +753,7 @@ awscrt = [ [package.metadata] requires-dist = [ - { name = "aiohttp", marker = "extra == 'aiohttp'", specifier = ">=3.14.0,<4.0" }, + { name = "aiohttp", marker = "extra == 'aiohttp'", specifier = ">=3.11.12,<4.0" }, { name = "awscrt", marker = "extra == 'awscrt'", specifier = "~=0.32.0" }, { name = "smithy-core", editable = "packages/smithy-core" }, { name = "yarl", marker = "extra == 'aiohttp'" }, From 456627a6544abcfad2e2fe653ce88b46acd35aec Mon Sep 17 00:00:00 2001 From: jonathan343 Date: Wed, 17 Jun 2026 22:31:23 -0400 Subject: [PATCH 2/2] extract request pipeline test harness into shared module --- .../tests/unit/aio/_pipeline_harness.py | 207 ++++++++++++++++ .../smithy-core/tests/unit/aio/test_client.py | 223 +++--------------- 2 files changed, 238 insertions(+), 192 deletions(-) create mode 100644 packages/smithy-core/tests/unit/aio/_pipeline_harness.py diff --git a/packages/smithy-core/tests/unit/aio/_pipeline_harness.py b/packages/smithy-core/tests/unit/aio/_pipeline_harness.py new file mode 100644 index 000000000..4091710e5 --- /dev/null +++ b/packages/smithy-core/tests/unit/aio/_pipeline_harness.py @@ -0,0 +1,207 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from dataclasses import dataclass +from typing import Any, Self, cast + +from smithy_core import URI +from smithy_core.aio.client import ClientCall, RequestPipeline +from smithy_core.aio.interfaces import ClientProtocol, ClientTransport +from smithy_core.deserializers import ShapeDeserializer +from smithy_core.documents import TypeRegistry +from smithy_core.endpoints import EndpointResolverParams +from smithy_core.interceptors import InterceptorChain +from smithy_core.schemas import APIOperation, Schema +from smithy_core.serializers import ShapeSerializer +from smithy_core.shapes import ShapeID, ShapeType +from smithy_core.traits import StreamingTrait +from smithy_core.types import TypedProperties + +_STRING = Schema(id=ShapeID("smithy.api#String"), shape_type=ShapeType.STRING) + +_EVENTS = Schema.collection( + id=ShapeID("com.example#Events"), + shape_type=ShapeType.UNION, + members={"message": {"target": _STRING}}, +) + +_INPUT_SCHEMA = Schema.collection( + id=ShapeID("com.example#StreamingInput"), + members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, +) + +_OUTPUT_SCHEMA = Schema.collection( + id=ShapeID("com.example#StreamingOutput"), + members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, +) + + +class StubInput: + def serialize(self, serializer: ShapeSerializer) -> None: + pass + + +class StubOutput: + @classmethod + def deserialize(cls, deserializer: ShapeDeserializer) -> Self: + return cls() + + +class StubEvent: + def serialize(self, serializer: ShapeSerializer) -> None: + pass + + @classmethod + def deserialize(cls, deserializer: ShapeDeserializer) -> Self: + return cls() + + +OPERATION = APIOperation( + input=StubInput, + output=StubOutput, + schema=Schema( + id=ShapeID("com.example#StreamingOperation"), + shape_type=ShapeType.OPERATION, + ), + input_schema=_INPUT_SCHEMA, + output_schema=_OUTPUT_SCHEMA, + error_registry=TypeRegistry({}), + effective_auth_schemes=[], + error_schemas=[], +) + + +class StubRequest: + def __init__(self) -> None: + self.destination = URI(host="example.com") + self.body = b"" + + async def consume_body_async(self) -> bytes: + return b"" + + def consume_body(self) -> bytes: + return b"" + + +class StubResponse: + body = b"" + + async def consume_body_async(self) -> bytes: + return b"" + + def consume_body(self) -> bytes: + return b"" + + +class StubEventPublisher: + async def send(self, event: Any) -> None: + pass + + async def close(self) -> None: + pass + + +class StubEventReceiver: + async def receive(self) -> Any: + return None + + async def close(self) -> None: + pass + + +class StubProtocol: + def __init__(self) -> None: + self.serialize_request_calls = 0 + self.set_service_endpoint_calls = 0 + self.deserialize_response_calls = 0 + self.create_event_publisher_calls = 0 + self.create_event_receiver_calls = 0 + + @property + def id(self) -> ShapeID: + return ShapeID("com.example#testProtocol") + + def serialize_request(self, **kwargs: Any) -> StubRequest: + self.serialize_request_calls += 1 + return StubRequest() + + def set_service_endpoint(self, *, request: Any, endpoint: Any) -> Any: + self.set_service_endpoint_calls += 1 + return request + + async def deserialize_response(self, **kwargs: Any) -> StubOutput: + self.deserialize_response_calls += 1 + return StubOutput() + + def create_event_publisher(self, **kwargs: Any) -> StubEventPublisher: + self.create_event_publisher_calls += 1 + return StubEventPublisher() + + def create_event_receiver(self, **kwargs: Any) -> StubEventReceiver: + self.create_event_receiver_calls += 1 + return StubEventReceiver() + + +class StubEndpoint: + def __init__(self) -> None: + self.uri = URI(host="example.com") + self.properties = TypedProperties() + + +class StubEndpointResolver: + async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Any: + return StubEndpoint() + + +class StubAuthResolver: + def resolve_auth_scheme(self, *, auth_parameters: Any) -> list[Any]: + return [] + + +class UndeclaredTransport: + """A transport that does not declare whether it supports duplex streaming.""" + + TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] = () + + def __init__(self) -> None: + self.send_calls = 0 + + async def send(self, request: Any) -> StubResponse: + self.send_calls += 1 + return StubResponse() + + +class NonDuplexTransport(UndeclaredTransport): + SUPPORTS_DUPLEX_STREAMING = False + + +class DuplexTransport(UndeclaredTransport): + SUPPORTS_DUPLEX_STREAMING = True + + +@dataclass +class PipelineHarness: + protocol: StubProtocol + transport: UndeclaredTransport + pipeline: RequestPipeline[Any, Any] + + +def pipeline_harness(transport: UndeclaredTransport) -> PipelineHarness: + protocol = StubProtocol() + pipeline = RequestPipeline( + protocol=cast("ClientProtocol[Any, Any]", protocol), + transport=cast("ClientTransport[Any, Any]", transport), + ) + return PipelineHarness(protocol=protocol, transport=transport, pipeline=pipeline) + + +def client_call() -> ClientCall[Any, Any]: + return ClientCall( + input=StubInput(), + operation=OPERATION, + context=TypedProperties(), + interceptor=InterceptorChain([]), + auth_scheme_resolver=StubAuthResolver(), + supported_auth_schemes={}, + endpoint_resolver=StubEndpointResolver(), + retry_strategy=None, # type: ignore[arg-type] # unused for streaming input + ) diff --git a/packages/smithy-core/tests/unit/aio/test_client.py b/packages/smithy-core/tests/unit/aio/test_client.py index 6cb08e2dc..e97e613e5 100644 --- a/packages/smithy-core/tests/unit/aio/test_client.py +++ b/packages/smithy-core/tests/unit/aio/test_client.py @@ -1,226 +1,65 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Any, Self, cast import pytest -from smithy_core import URI -from smithy_core.aio.client import ClientCall, RequestPipeline from smithy_core.aio.eventstream import DuplexEventStream, InputEventStream -from smithy_core.aio.interfaces import ClientProtocol, ClientTransport -from smithy_core.deserializers import ShapeDeserializer -from smithy_core.documents import TypeRegistry -from smithy_core.endpoints import EndpointResolverParams from smithy_core.exceptions import UnsupportedTransportError -from smithy_core.interceptors import InterceptorChain -from smithy_core.schemas import APIOperation, Schema -from smithy_core.serializers import ShapeSerializer -from smithy_core.shapes import ShapeID, ShapeType -from smithy_core.traits import StreamingTrait -from smithy_core.types import TypedProperties - -_STRING = Schema(id=ShapeID("smithy.api#String"), shape_type=ShapeType.STRING) - -_EVENTS = Schema.collection( - id=ShapeID("com.example#Events"), - shape_type=ShapeType.UNION, - members={"message": {"target": _STRING}}, -) - -_INPUT_SCHEMA = Schema.collection( - id=ShapeID("com.example#StreamingInput"), - members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, -) - -_OUTPUT_SCHEMA = Schema.collection( - id=ShapeID("com.example#StreamingOutput"), - members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, -) - - -class _Input: - def serialize(self, serializer: ShapeSerializer) -> None: - pass - -class _Output: - @classmethod - def deserialize(cls, deserializer: ShapeDeserializer) -> Self: - return cls() - - -class _Event: - def serialize(self, serializer: ShapeSerializer) -> None: - pass - - @classmethod - def deserialize(cls, deserializer: ShapeDeserializer) -> Self: - return cls() - - -_OPERATION = APIOperation( - input=_Input, - output=_Output, - schema=Schema( - id=ShapeID("com.example#StreamingOperation"), - shape_type=ShapeType.OPERATION, - ), - input_schema=_INPUT_SCHEMA, - output_schema=_OUTPUT_SCHEMA, - error_registry=TypeRegistry({}), - effective_auth_schemes=[], - error_schemas=[], +from ._pipeline_harness import ( + DuplexTransport, + NonDuplexTransport, + StubEvent, + StubEventReceiver, + StubOutput, + UndeclaredTransport, + client_call, + pipeline_harness, ) -class _StubRequest: - def __init__(self) -> None: - self.destination = URI(host="example.com") - self.body = b"" - - async def consume_body_async(self) -> bytes: - return b"" - - def consume_body(self) -> bytes: - return b"" - - -class _StubResponse: - body = b"" - - async def consume_body_async(self) -> bytes: - return b"" - - def consume_body(self) -> bytes: - return b"" - - -class _StubEventPublisher: - async def send(self, event: Any) -> None: - pass - - async def close(self) -> None: - pass - - -class _StubEventReceiver: - async def receive(self) -> Any: - return None - - async def close(self) -> None: - pass - - -class _StubProtocol: - @property - def id(self) -> ShapeID: - return ShapeID("com.example#testProtocol") - - def serialize_request(self, **kwargs: Any) -> _StubRequest: - return _StubRequest() - - def set_service_endpoint(self, *, request: Any, endpoint: Any) -> Any: - return request - - async def deserialize_response(self, **kwargs: Any) -> _Output: - return _Output() - - def create_event_publisher(self, **kwargs: Any) -> _StubEventPublisher: - return _StubEventPublisher() - - def create_event_receiver(self, **kwargs: Any) -> _StubEventReceiver: - return _StubEventReceiver() - - -class _StubEndpoint: - def __init__(self) -> None: - self.uri = URI(host="example.com") - self.properties = TypedProperties() - - -class _StubEndpointResolver: - async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Any: - return _StubEndpoint() - - -class _StubAuthResolver: - def resolve_auth_scheme(self, *, auth_parameters: Any) -> list[Any]: - return [] - - -class _UndeclaredTransport: - """A transport that does not declare whether it supports duplex streaming.""" - - TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] = () - - async def send(self, request: Any) -> _StubResponse: - return _StubResponse() - - -class _NonDuplexTransport(_UndeclaredTransport): - SUPPORTS_DUPLEX_STREAMING = False - - -class _DuplexTransport(_UndeclaredTransport): - SUPPORTS_DUPLEX_STREAMING = True - - -def _pipeline(transport: object) -> RequestPipeline[Any, Any]: - # The stubs are intentionally structural (they don't subclass the - # protocols), so cast them to keep the type checker focused on the - # runtime behavior under test. - return RequestPipeline( - protocol=cast("ClientProtocol[Any, Any]", _StubProtocol()), - transport=cast("ClientTransport[Any, Any]", transport), - ) - - -def _client_call() -> ClientCall[Any, Any]: - return ClientCall( - input=_Input(), - operation=_OPERATION, - context=TypedProperties(), - interceptor=InterceptorChain([]), - auth_scheme_resolver=_StubAuthResolver(), - supported_auth_schemes={}, - endpoint_resolver=_StubEndpointResolver(), - retry_strategy=None, # type: ignore[arg-type] # unused for streaming input - ) - - async def test_duplex_stream_raises_for_undeclared_transport() -> None: - pipeline = _pipeline(_UndeclaredTransport()) + harness = pipeline_harness(UndeclaredTransport()) with pytest.raises(UnsupportedTransportError) as exc_info: - await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize) + await harness.pipeline.duplex_stream( + client_call(), StubEvent, StubEvent, StubEvent.deserialize + ) - assert "_UndeclaredTransport" in str(exc_info.value) + assert "UndeclaredTransport" in str(exc_info.value) assert "com.example#StreamingOperation" in str(exc_info.value) + assert harness.protocol.serialize_request_calls == 0 + assert harness.transport.send_calls == 0 async def test_duplex_stream_raises_for_non_duplex_transport() -> None: - pipeline = _pipeline(_NonDuplexTransport()) + harness = pipeline_harness(NonDuplexTransport()) with pytest.raises(UnsupportedTransportError): - await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize) + await harness.pipeline.duplex_stream( + client_call(), StubEvent, StubEvent, StubEvent.deserialize + ) + + assert harness.protocol.serialize_request_calls == 0 + assert harness.transport.send_calls == 0 async def test_duplex_stream_proceeds_for_duplex_transport() -> None: - pipeline = _pipeline(_DuplexTransport()) + harness = pipeline_harness(DuplexTransport()) - stream = await pipeline.duplex_stream( - _client_call(), _Event, _Event, _Event.deserialize + stream = await harness.pipeline.duplex_stream( + client_call(), StubEvent, StubEvent, StubEvent.deserialize ) assert isinstance(stream, DuplexEventStream) output, output_stream = await stream.await_output() - assert isinstance(output, _Output) - assert isinstance(output_stream, _StubEventReceiver) + assert isinstance(output, StubOutput) + assert isinstance(output_stream, StubEventReceiver) async def test_input_stream_does_not_require_duplex_support() -> None: - pipeline = _pipeline(_NonDuplexTransport()) + harness = pipeline_harness(NonDuplexTransport()) - stream = await pipeline.input_stream(_client_call(), _Event) + stream = await harness.pipeline.input_stream(client_call(), StubEvent) assert isinstance(stream, InputEventStream) - assert isinstance(await stream.await_output(), _Output) + assert isinstance(await stream.await_output(), StubOutput)