Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ class $3L:
""\"An asynchronous HTTP client solely for testing purposes.""\"
TIMEOUT_EXCEPTIONS = ()
SUPPORTS_DUPLEX_STREAMING: bool = True
def __init__(self, *, client_config: HTTPClientConfiguration | None = None):
self._client_config = client_config
Expand All @@ -677,6 +678,7 @@ class $4L:
""\"An asynchronous HTTP client solely for testing purposes.""\"
TIMEOUT_EXCEPTIONS = ()
SUPPORTS_DUPLEX_STREAMING: bool = True
def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ private static List<ConfigProperty> getProtocolProperties(GenerationContext cont
.namespace("smithy_core.aio.interfaces", ".")
.build())
.build())
.documentation("The transport to use to send requests (e.g. an HTTP client).");
.documentation("The transport to use to send requests (e.g. an HTTP client). "
+ "Operations with bidirectional event streams require a transport that "
+ "sets SUPPORTS_DUPLEX_STREAMING to True, such as AWSCRTHTTPClient.");

if (context.applicationProtocol().isHttpProtocol()) {
properties.addAll(HTTP_PROPERTIES);
Expand Down
8 changes: 8 additions & 0 deletions designs/http-interfaces.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ which takes a request and some configuration and asynchronously return a respons
Having a minimal interface makes it much easier to implement these interfaces on top of
a variety http libraries.

Clients that support duplex (bidirectional) event streaming, which in practice
requires HTTP/2, must declare it by setting the `SUPPORTS_DUPLEX_STREAMING` class
attribute to `True`. Clients are assumed not to support it otherwise, and duplex
stream operations invoked with such a client fail fast with an
`UnsupportedTransportError`. This surfaces the configuration problem before any
request is sent instead of letting the stream fail later with an opaque connection
error.

```python
@dataclass(kw_only=True)
class HTTPRequestConfiguration:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "enhancement",
"description": "Added `SUPPORTS_DUPLEX_STREAMING` to `ClientTransport` so transports can declare support for duplex (bidirectional) event streaming. `RequestPipeline.duplex_stream` now fails fast with an `UnsupportedTransportError` when the configured transport does not declare support."
}
19 changes: 18 additions & 1 deletion packages/smithy-core/src/smithy_core/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from ..auth import AuthParams
from ..deserializers import DeserializeableShape, ShapeDeserializer
from ..endpoints import EndpointResolverParams
from ..exceptions import ClientTimeoutError, RetryError, SmithyError
from ..exceptions import (
ClientTimeoutError,
RetryError,
SmithyError,
UnsupportedTransportError,
)
from ..interceptors import (
InputContext,
Interceptor,
Expand Down Expand Up @@ -197,6 +202,18 @@ async def duplex_stream[
:param output_event_type: The event type to receive in the output stream.
:param event_deserializer: The method used to deserialize events.
"""
# The transport is assumed not to support duplex streaming unless it
# explicitly declares otherwise.
if not getattr(self.transport, "SUPPORTS_DUPLEX_STREAMING", False):

@SamRemis SamRemis Jun 17, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting - it's kind of a blind "set this attribute". Is there a reason we aren't just adding this as a default on the HTTPClient class? Or ifwe don't want to tie it to HTTP, the ClientTransport protocol?

Adding a default makes more sense to me than doing a getattr call. What do you think?

See the python docs for an example of how to do this (look at the value key in the second example)

@jonathan343 jonathan343 Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does add the default on ClientTransport. I kept the getattr(..., False) intentionally because ClientTransport is a structural Protocol, so Python does not enforce that custom transports actually inherit the protocol or have the new attribute at runtime. Existing transports may satisfy the old transport shape but not define SUPPORTS_DUPLEX_STREAMING. In that case, missing should mean unsupported, and we still want to raise UnsupportedTransportError rather than AttributeError.

raise UnsupportedTransportError(
f"The configured transport ({type(self.transport).__name__}) does "
f"not support duplex (bidirectional) event streaming, which is "
f"required by the {call.operation.schema.id} operation. Use a "
f"transport that does, such as "
f"smithy_http.aio.crt.AWSCRTHTTPClient. Custom transports that "
f"support duplex streaming must set SUPPORTS_DUPLEX_STREAMING "
f"to True."
)
request_future = Future[RequestContext[I, TRequest]]()
execute_task = asyncio.create_task(self._execute_request(call, request_future))
request_context = await request_future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ class ClientTransport[I: Request, O: Response](Protocol):

TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...]

SUPPORTS_DUPLEX_STREAMING: bool = False
"""Whether this transport can read response data while the request body is still
being written (typically requires HTTP/2).
Transports that support duplex (bidirectional) event streaming must explicitly set
this to True. Transports that don't declare it are assumed not to support it.
"""

async def send(self, request: I) -> O:
"""Send a request over the transport and receive the response."""
...
Expand Down
5 changes: 5 additions & 0 deletions packages/smithy-core/src/smithy_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,10 @@ class UnsupportedStreamError(SmithyError):
streams are not supported."""


class UnsupportedTransportError(SmithyError):
"""Indicates that an operation requires a transport capability that the configured
transport does not declare support for (e.g. duplex event streaming)."""


class EndpointResolutionError(SmithyError):
"""Exception type for all exceptions raised by endpoint resolution."""
207 changes: 207 additions & 0 deletions packages/smithy-core/tests/unit/aio/_pipeline_harness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from typing import Any, Self, cast

from smithy_core import URI
from smithy_core.aio.client import ClientCall, RequestPipeline
from smithy_core.aio.interfaces import ClientProtocol, ClientTransport
from smithy_core.deserializers import ShapeDeserializer
from smithy_core.documents import TypeRegistry
from smithy_core.endpoints import EndpointResolverParams
from smithy_core.interceptors import InterceptorChain
from smithy_core.schemas import APIOperation, Schema
from smithy_core.serializers import ShapeSerializer
from smithy_core.shapes import ShapeID, ShapeType
from smithy_core.traits import StreamingTrait
from smithy_core.types import TypedProperties

_STRING = Schema(id=ShapeID("smithy.api#String"), shape_type=ShapeType.STRING)

_EVENTS = Schema.collection(
id=ShapeID("com.example#Events"),
shape_type=ShapeType.UNION,
members={"message": {"target": _STRING}},
)

_INPUT_SCHEMA = Schema.collection(
id=ShapeID("com.example#StreamingInput"),
members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}},
)

_OUTPUT_SCHEMA = Schema.collection(
id=ShapeID("com.example#StreamingOutput"),
members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}},
)


class StubInput:
def serialize(self, serializer: ShapeSerializer) -> None:
pass


class StubOutput:
@classmethod
def deserialize(cls, deserializer: ShapeDeserializer) -> Self:
return cls()


class StubEvent:
def serialize(self, serializer: ShapeSerializer) -> None:
pass

@classmethod
def deserialize(cls, deserializer: ShapeDeserializer) -> Self:
return cls()


OPERATION = APIOperation(
input=StubInput,
output=StubOutput,
schema=Schema(
id=ShapeID("com.example#StreamingOperation"),
shape_type=ShapeType.OPERATION,
),
input_schema=_INPUT_SCHEMA,
output_schema=_OUTPUT_SCHEMA,
error_registry=TypeRegistry({}),
effective_auth_schemes=[],
error_schemas=[],
)


class StubRequest:
def __init__(self) -> None:
self.destination = URI(host="example.com")
self.body = b""

async def consume_body_async(self) -> bytes:
return b""

def consume_body(self) -> bytes:
return b""


class StubResponse:
body = b""

async def consume_body_async(self) -> bytes:
return b""

def consume_body(self) -> bytes:
return b""


class StubEventPublisher:
async def send(self, event: Any) -> None:
pass

async def close(self) -> None:
pass


class StubEventReceiver:
async def receive(self) -> Any:
return None

async def close(self) -> None:
pass


class StubProtocol:
def __init__(self) -> None:
self.serialize_request_calls = 0
self.set_service_endpoint_calls = 0
self.deserialize_response_calls = 0
self.create_event_publisher_calls = 0
self.create_event_receiver_calls = 0

@property
def id(self) -> ShapeID:
return ShapeID("com.example#testProtocol")

def serialize_request(self, **kwargs: Any) -> StubRequest:
self.serialize_request_calls += 1
return StubRequest()

def set_service_endpoint(self, *, request: Any, endpoint: Any) -> Any:
self.set_service_endpoint_calls += 1
return request

async def deserialize_response(self, **kwargs: Any) -> StubOutput:
self.deserialize_response_calls += 1
return StubOutput()

def create_event_publisher(self, **kwargs: Any) -> StubEventPublisher:
self.create_event_publisher_calls += 1
return StubEventPublisher()

def create_event_receiver(self, **kwargs: Any) -> StubEventReceiver:
self.create_event_receiver_calls += 1
return StubEventReceiver()


class StubEndpoint:
def __init__(self) -> None:
self.uri = URI(host="example.com")
self.properties = TypedProperties()


class StubEndpointResolver:
async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Any:
return StubEndpoint()


class StubAuthResolver:
def resolve_auth_scheme(self, *, auth_parameters: Any) -> list[Any]:
return []


class UndeclaredTransport:
"""A transport that does not declare whether it supports duplex streaming."""

TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] = ()

def __init__(self) -> None:
self.send_calls = 0

async def send(self, request: Any) -> StubResponse:
self.send_calls += 1
return StubResponse()


class NonDuplexTransport(UndeclaredTransport):
SUPPORTS_DUPLEX_STREAMING = False


class DuplexTransport(UndeclaredTransport):
SUPPORTS_DUPLEX_STREAMING = True


@dataclass
class PipelineHarness:
protocol: StubProtocol
transport: UndeclaredTransport
pipeline: RequestPipeline[Any, Any]


def pipeline_harness(transport: UndeclaredTransport) -> PipelineHarness:
protocol = StubProtocol()
pipeline = RequestPipeline(
protocol=cast("ClientProtocol[Any, Any]", protocol),
transport=cast("ClientTransport[Any, Any]", transport),
)
return PipelineHarness(protocol=protocol, transport=transport, pipeline=pipeline)


def client_call() -> ClientCall[Any, Any]:
return ClientCall(
input=StubInput(),
operation=OPERATION,
context=TypedProperties(),
interceptor=InterceptorChain([]),
auth_scheme_resolver=StubAuthResolver(),
supported_auth_schemes={},
endpoint_resolver=StubEndpointResolver(),
retry_strategy=None, # type: ignore[arg-type] # unused for streaming input
)
65 changes: 65 additions & 0 deletions packages/smithy-core/tests/unit/aio/test_client.py

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: We should definitely look at making a shared testing harness to wire up these mocks rather than needing to create them all inline. I can see cases where we need to test pipeline behavior in the future and will need to recreate this all over again. Doesn't block this PR but something we should consider soon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 456627a

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import pytest
from smithy_core.aio.eventstream import DuplexEventStream, InputEventStream
from smithy_core.exceptions import UnsupportedTransportError

from ._pipeline_harness import (
DuplexTransport,
NonDuplexTransport,
StubEvent,
StubEventReceiver,
StubOutput,
UndeclaredTransport,
client_call,
pipeline_harness,
)


async def test_duplex_stream_raises_for_undeclared_transport() -> None:
harness = pipeline_harness(UndeclaredTransport())

with pytest.raises(UnsupportedTransportError) as exc_info:
await harness.pipeline.duplex_stream(
client_call(), StubEvent, StubEvent, StubEvent.deserialize
)

assert "UndeclaredTransport" in str(exc_info.value)
assert "com.example#StreamingOperation" in str(exc_info.value)
assert harness.protocol.serialize_request_calls == 0
assert harness.transport.send_calls == 0


async def test_duplex_stream_raises_for_non_duplex_transport() -> None:
harness = pipeline_harness(NonDuplexTransport())

with pytest.raises(UnsupportedTransportError):
await harness.pipeline.duplex_stream(
client_call(), StubEvent, StubEvent, StubEvent.deserialize
)

assert harness.protocol.serialize_request_calls == 0
assert harness.transport.send_calls == 0


async def test_duplex_stream_proceeds_for_duplex_transport() -> None:
harness = pipeline_harness(DuplexTransport())

stream = await harness.pipeline.duplex_stream(
client_call(), StubEvent, StubEvent, StubEvent.deserialize
)

assert isinstance(stream, DuplexEventStream)
output, output_stream = await stream.await_output()
assert isinstance(output, StubOutput)
assert isinstance(output_stream, StubEventReceiver)


async def test_input_stream_does_not_require_duplex_support() -> None:
harness = pipeline_harness(NonDuplexTransport())

stream = await harness.pipeline.input_stream(client_call(), StubEvent)

assert isinstance(stream, InputEventStream)
assert isinstance(await stream.await_output(), StubOutput)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "enhancement",
"description": "Declared duplex (bidirectional) streaming support on `AWSCRTHTTPClient` via `SUPPORTS_DUPLEX_STREAMING`. `AIOHTTPClient` explicitly does not support it."
}
4 changes: 4 additions & 0 deletions packages/smithy-http/src/smithy_http/aio/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class AIOHTTPClient(HTTPClient):

TIMEOUT_EXCEPTIONS = (TimeoutError,)

# aiohttp has no HTTP/2 support and this client fully buffers the response
# before returning, so it can never interleave request and response data.
SUPPORTS_DUPLEX_STREAMING = False

def __init__(
self,
*,
Expand Down
Loading
Loading