-
Notifications
You must be signed in to change notification settings - Fork 30
Validate Transport Support For Bidirectional Streaming #717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| { | ||
| "type": "enhancement", | ||
| "description": "Added `SUPPORTS_DUPLEX_STREAMING` to `ClientTransport` so transports can declare support for duplex (bidirectional) event streaming. `RequestPipeline.duplex_stream` now fails fast with an `UnsupportedTransportError` when the configured transport does not declare support." | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| from dataclasses import dataclass | ||
| from typing import Any, Self, cast | ||
|
|
||
| from smithy_core import URI | ||
| from smithy_core.aio.client import ClientCall, RequestPipeline | ||
| from smithy_core.aio.interfaces import ClientProtocol, ClientTransport | ||
| from smithy_core.deserializers import ShapeDeserializer | ||
| from smithy_core.documents import TypeRegistry | ||
| from smithy_core.endpoints import EndpointResolverParams | ||
| from smithy_core.interceptors import InterceptorChain | ||
| from smithy_core.schemas import APIOperation, Schema | ||
| from smithy_core.serializers import ShapeSerializer | ||
| from smithy_core.shapes import ShapeID, ShapeType | ||
| from smithy_core.traits import StreamingTrait | ||
| from smithy_core.types import TypedProperties | ||
|
|
||
| _STRING = Schema(id=ShapeID("smithy.api#String"), shape_type=ShapeType.STRING) | ||
|
|
||
| _EVENTS = Schema.collection( | ||
| id=ShapeID("com.example#Events"), | ||
| shape_type=ShapeType.UNION, | ||
| members={"message": {"target": _STRING}}, | ||
| ) | ||
|
|
||
| _INPUT_SCHEMA = Schema.collection( | ||
| id=ShapeID("com.example#StreamingInput"), | ||
| members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, | ||
| ) | ||
|
|
||
| _OUTPUT_SCHEMA = Schema.collection( | ||
| id=ShapeID("com.example#StreamingOutput"), | ||
| members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, | ||
| ) | ||
|
|
||
|
|
||
| class StubInput: | ||
| def serialize(self, serializer: ShapeSerializer) -> None: | ||
| pass | ||
|
|
||
|
|
||
| class StubOutput: | ||
| @classmethod | ||
| def deserialize(cls, deserializer: ShapeDeserializer) -> Self: | ||
| return cls() | ||
|
|
||
|
|
||
| class StubEvent: | ||
| def serialize(self, serializer: ShapeSerializer) -> None: | ||
| pass | ||
|
|
||
| @classmethod | ||
| def deserialize(cls, deserializer: ShapeDeserializer) -> Self: | ||
| return cls() | ||
|
|
||
|
|
||
| OPERATION = APIOperation( | ||
| input=StubInput, | ||
| output=StubOutput, | ||
| schema=Schema( | ||
| id=ShapeID("com.example#StreamingOperation"), | ||
| shape_type=ShapeType.OPERATION, | ||
| ), | ||
| input_schema=_INPUT_SCHEMA, | ||
| output_schema=_OUTPUT_SCHEMA, | ||
| error_registry=TypeRegistry({}), | ||
| effective_auth_schemes=[], | ||
| error_schemas=[], | ||
| ) | ||
|
|
||
|
|
||
| class StubRequest: | ||
| def __init__(self) -> None: | ||
| self.destination = URI(host="example.com") | ||
| self.body = b"" | ||
|
|
||
| async def consume_body_async(self) -> bytes: | ||
| return b"" | ||
|
|
||
| def consume_body(self) -> bytes: | ||
| return b"" | ||
|
|
||
|
|
||
| class StubResponse: | ||
| body = b"" | ||
|
|
||
| async def consume_body_async(self) -> bytes: | ||
| return b"" | ||
|
|
||
| def consume_body(self) -> bytes: | ||
| return b"" | ||
|
|
||
|
|
||
| class StubEventPublisher: | ||
| async def send(self, event: Any) -> None: | ||
| pass | ||
|
|
||
| async def close(self) -> None: | ||
| pass | ||
|
|
||
|
|
||
| class StubEventReceiver: | ||
| async def receive(self) -> Any: | ||
| return None | ||
|
|
||
| async def close(self) -> None: | ||
| pass | ||
|
|
||
|
|
||
| class StubProtocol: | ||
| def __init__(self) -> None: | ||
| self.serialize_request_calls = 0 | ||
| self.set_service_endpoint_calls = 0 | ||
| self.deserialize_response_calls = 0 | ||
| self.create_event_publisher_calls = 0 | ||
| self.create_event_receiver_calls = 0 | ||
|
|
||
| @property | ||
| def id(self) -> ShapeID: | ||
| return ShapeID("com.example#testProtocol") | ||
|
|
||
| def serialize_request(self, **kwargs: Any) -> StubRequest: | ||
| self.serialize_request_calls += 1 | ||
| return StubRequest() | ||
|
|
||
| def set_service_endpoint(self, *, request: Any, endpoint: Any) -> Any: | ||
| self.set_service_endpoint_calls += 1 | ||
| return request | ||
|
|
||
| async def deserialize_response(self, **kwargs: Any) -> StubOutput: | ||
| self.deserialize_response_calls += 1 | ||
| return StubOutput() | ||
|
|
||
| def create_event_publisher(self, **kwargs: Any) -> StubEventPublisher: | ||
| self.create_event_publisher_calls += 1 | ||
| return StubEventPublisher() | ||
|
|
||
| def create_event_receiver(self, **kwargs: Any) -> StubEventReceiver: | ||
| self.create_event_receiver_calls += 1 | ||
| return StubEventReceiver() | ||
|
|
||
|
|
||
| class StubEndpoint: | ||
| def __init__(self) -> None: | ||
| self.uri = URI(host="example.com") | ||
| self.properties = TypedProperties() | ||
|
|
||
|
|
||
| class StubEndpointResolver: | ||
| async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Any: | ||
| return StubEndpoint() | ||
|
|
||
|
|
||
| class StubAuthResolver: | ||
| def resolve_auth_scheme(self, *, auth_parameters: Any) -> list[Any]: | ||
| return [] | ||
|
|
||
|
|
||
| class UndeclaredTransport: | ||
| """A transport that does not declare whether it supports duplex streaming.""" | ||
|
|
||
| TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] = () | ||
|
|
||
| def __init__(self) -> None: | ||
| self.send_calls = 0 | ||
|
|
||
| async def send(self, request: Any) -> StubResponse: | ||
| self.send_calls += 1 | ||
| return StubResponse() | ||
|
|
||
|
|
||
| class NonDuplexTransport(UndeclaredTransport): | ||
| SUPPORTS_DUPLEX_STREAMING = False | ||
|
|
||
|
|
||
| class DuplexTransport(UndeclaredTransport): | ||
| SUPPORTS_DUPLEX_STREAMING = True | ||
|
|
||
|
|
||
| @dataclass | ||
| class PipelineHarness: | ||
| protocol: StubProtocol | ||
| transport: UndeclaredTransport | ||
| pipeline: RequestPipeline[Any, Any] | ||
|
|
||
|
|
||
| def pipeline_harness(transport: UndeclaredTransport) -> PipelineHarness: | ||
| protocol = StubProtocol() | ||
| pipeline = RequestPipeline( | ||
| protocol=cast("ClientProtocol[Any, Any]", protocol), | ||
| transport=cast("ClientTransport[Any, Any]", transport), | ||
| ) | ||
| return PipelineHarness(protocol=protocol, transport=transport, pipeline=pipeline) | ||
|
|
||
|
|
||
| def client_call() -> ClientCall[Any, Any]: | ||
| return ClientCall( | ||
| input=StubInput(), | ||
| operation=OPERATION, | ||
| context=TypedProperties(), | ||
| interceptor=InterceptorChain([]), | ||
| auth_scheme_resolver=StubAuthResolver(), | ||
| supported_auth_schemes={}, | ||
| endpoint_resolver=StubEndpointResolver(), | ||
| retry_strategy=None, # type: ignore[arg-type] # unused for streaming input | ||
| ) |
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Non-blocking: We should definitely look at making a shared testing harness to wire up these mocks rather than needing to create them all inline. I can see cases where we need to test pipeline behavior in the future and will need to recreate this all over again. Doesn't block this PR but something we should consider soon.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in 456627a |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import pytest | ||
| from smithy_core.aio.eventstream import DuplexEventStream, InputEventStream | ||
| from smithy_core.exceptions import UnsupportedTransportError | ||
|
|
||
| from ._pipeline_harness import ( | ||
| DuplexTransport, | ||
| NonDuplexTransport, | ||
| StubEvent, | ||
| StubEventReceiver, | ||
| StubOutput, | ||
| UndeclaredTransport, | ||
| client_call, | ||
| pipeline_harness, | ||
| ) | ||
|
|
||
|
|
||
| async def test_duplex_stream_raises_for_undeclared_transport() -> None: | ||
| harness = pipeline_harness(UndeclaredTransport()) | ||
|
|
||
| with pytest.raises(UnsupportedTransportError) as exc_info: | ||
| await harness.pipeline.duplex_stream( | ||
| client_call(), StubEvent, StubEvent, StubEvent.deserialize | ||
| ) | ||
|
|
||
| assert "UndeclaredTransport" in str(exc_info.value) | ||
| assert "com.example#StreamingOperation" in str(exc_info.value) | ||
| assert harness.protocol.serialize_request_calls == 0 | ||
| assert harness.transport.send_calls == 0 | ||
|
|
||
|
|
||
| async def test_duplex_stream_raises_for_non_duplex_transport() -> None: | ||
| harness = pipeline_harness(NonDuplexTransport()) | ||
|
|
||
| with pytest.raises(UnsupportedTransportError): | ||
| await harness.pipeline.duplex_stream( | ||
| client_call(), StubEvent, StubEvent, StubEvent.deserialize | ||
| ) | ||
|
|
||
| assert harness.protocol.serialize_request_calls == 0 | ||
| assert harness.transport.send_calls == 0 | ||
|
|
||
|
|
||
| async def test_duplex_stream_proceeds_for_duplex_transport() -> None: | ||
| harness = pipeline_harness(DuplexTransport()) | ||
|
|
||
| stream = await harness.pipeline.duplex_stream( | ||
| client_call(), StubEvent, StubEvent, StubEvent.deserialize | ||
| ) | ||
|
|
||
| assert isinstance(stream, DuplexEventStream) | ||
| output, output_stream = await stream.await_output() | ||
| assert isinstance(output, StubOutput) | ||
| assert isinstance(output_stream, StubEventReceiver) | ||
|
|
||
|
|
||
| async def test_input_stream_does_not_require_duplex_support() -> None: | ||
| harness = pipeline_harness(NonDuplexTransport()) | ||
|
|
||
| stream = await harness.pipeline.input_stream(client_call(), StubEvent) | ||
|
|
||
| assert isinstance(stream, InputEventStream) | ||
| assert isinstance(await stream.await_output(), StubOutput) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| { | ||
| "type": "enhancement", | ||
| "description": "Declared duplex (bidirectional) streaming support on `AWSCRTHTTPClient` via `SUPPORTS_DUPLEX_STREAMING`. `AIOHTTPClient` explicitly does not support it." | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting - it's kind of a blind "set this attribute". Is there a reason we aren't just adding this as a default on the HTTPClient class? Or ifwe don't want to tie it to HTTP, the ClientTransport protocol?
Adding a default makes more sense to me than doing a getattr call. What do you think?
See the python docs for an example of how to do this (look at the value key in the second example)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR does add the default on
ClientTransport. I kept thegetattr(..., False)intentionally becauseClientTransportis a structuralProtocol, so Python does not enforce that custom transports actually inherit the protocol or have the new attribute at runtime. Existing transports may satisfy the old transport shape but not defineSUPPORTS_DUPLEX_STREAMING. In that case, missing should mean unsupported, and we still want to raiseUnsupportedTransportErrorrather thanAttributeError.