Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
78adb72
backport: copy tests/interaction/ verbatim from main (phase 0)
maxisbey May 29, 2026
680b736
backport: phase-1.5 mechanical types sweep (snake→camel kwargs, type=…
maxisbey May 29, 2026
92f2bb4
backport: harness H1 — _helpers.py: local ReadStream/WriteStream alia…
maxisbey May 29, 2026
a507771
backport: harness H2 — _connect.py imports/annotations + conftest mar…
maxisbey May 29, 2026
f7daf85
backport: harness H3 — connect_in_memory, build_sse_app, connect_over…
maxisbey May 29, 2026
7a936a2
backport: harness H4 — build_streamable_http_app (no-auth), connect_o…
maxisbey May 29, 2026
e55b40e
backport: harness H5 — build_streamable_http_app auth branch + mounte…
maxisbey May 29, 2026
7d9881d
backport: harness H-auth-1 — connect_with_oauth body (hand-assembled …
maxisbey May 29, 2026
b603626
backport: phase-4 wave 1 (proof) — 25/25 pass across lowlevel/mcpserv…
maxisbey May 29, 2026
1211e79
backport: phase-4 wave 2 + S1 — 51 pass / 1 deferred
maxisbey May 29, 2026
7251516
backport: phase-4 waves 3+4 + S2-S9 (Workflow, 43 agents) — ~174 pass…
maxisbey May 29, 2026
6c7cbcc
backport: phase-5 cleanup — apply pending deferrals, test_coverage green
maxisbey May 29, 2026
c59a57d
backport: phase-5 fixup — port roots:list-changed (audit-1), scope wa…
maxisbey May 29, 2026
1f981e5
backport: scrub local-path reference from conftest comment; clarify p…
maxisbey May 29, 2026
361bb9d
backport: coverage — recognize 'lax no cover'; pragma harness scaffol…
maxisbey May 29, 2026
04980e3
backport: coverage — raise dev-dep floors for tests/interaction (pyte…
maxisbey May 29, 2026
52a79c4
backport: coverage — reset sse-starlette's module-global exit Event b…
maxisbey May 29, 2026
67b3b2c
backport: coverage — lax-no-cover post-unwind asserts on 3.11+lowest-…
maxisbey May 29, 2026
2ca7f4c
Merge remote-tracking branch 'origin/v1.x' into maxisbey/v1x-interact…
maxisbey May 29, 2026
03fdaed
tests: expect SSE session-entry cleanup on disconnect
maxisbey May 29, 2026
a3234b6
[v1.x] Backport transport test deflakes from main (tests-only) (#2837)
maxisbey Jun 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ required-version = ">=0.9.5"
[dependency-groups]
dev = [
"pyright>=1.1.400",
"pytest>=8.3.4",
"pytest>=8.4.0",
"ruff>=0.8.5",
"trio>=0.26.2",
"pytest-flakefinder>=1.1.0",
Expand Down Expand Up @@ -95,6 +95,9 @@ packages = ["src/mcp"]
[tool.pyright]
typeCheckingMode = "strict"
include = ["src/mcp", "tests", "examples/servers", "examples/snippets"]
# tests/interaction is backported from `main` and uses v1's runtime API; strict-mode type-checking
# of the suite is tracked separately from this tests-only backport.
exclude = ["tests/interaction"]
venvPath = "."
venv = ".venv"
# The FastAPI style of using decorators in tests gives a `reportUnusedFunction` error.
Expand Down Expand Up @@ -153,6 +156,11 @@ members = ["examples/clients/*", "examples/servers/*", "examples/snippets"]
[tool.uv.sources]
mcp = { workspace = true }

[tool.inline-snapshot]
# `snapshot(x)` becomes the identity (plain `==`); regenerate with `--inline-snapshot=fix`.
default-flags = ["disable"]
format-command = "ruff format --stdin-filename {filename}"

[tool.pytest.ini_options]
log_cli = true
xfail_strict = true
Expand Down Expand Up @@ -205,6 +213,7 @@ ignore_errors = true
precision = 2
exclude_lines = [
"pragma: no cover",
"pragma: lax no cover",
"if TYPE_CHECKING:",
"@overload",
"raise NotImplementedError",
Expand Down
314 changes: 153 additions & 161 deletions tests/client/test_http_unicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,80 @@
(server→client and client→server) using the streamable HTTP transport.
"""

import multiprocessing
import socket
from collections.abc import Generator
import gc
from collections.abc import AsyncIterator, Iterator
from contextlib import asynccontextmanager
from typing import Any

import httpx
import pytest
from sse_starlette.sse import AppStatus
from starlette.applications import Starlette
from starlette.routing import Mount

import mcp.types as types
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from tests.test_helpers import wait_for_server
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import CallToolResult, TextContent, Tool
from tests.interaction.transports import StreamingASGITransport

# The in-process app is mounted at this origin purely so URLs are well-formed; nothing listens here.
BASE_URL = "http://127.0.0.1:8000"

# v1's streamable-HTTP server transport leaks a handful of anyio memory streams on teardown when
# run in process; the old subprocess harness never observed them. The interaction suite registers
# the same two scoped filters globally from tests/interaction/conftest.py (see the comment there),
# but they only take effect when that package's conftest is loaded; these markers keep the tests
# themselves passing in isolated runs. Markers are item-scoped, so the autouse
# `_collect_leaked_streams` fixture below garbage-collects each test's leaks inside its own
# teardown, where these filters apply; without it, leaks GC'd at session cleanup escape the
# scoped ignores. The filters are scoped to anyio's MemoryObject*Stream leak signature so an
# unrelated leak still fails the suite.
pytestmark = [
pytest.mark.filterwarnings("ignore:.*MemoryObject(Send|Receive)Stream:pytest.PytestUnraisableExceptionWarning"),
pytest.mark.filterwarnings("ignore:.*MemoryObject(Send|Receive)Stream:ResourceWarning"),
]


@pytest.fixture(autouse=True)
def _collect_leaked_streams() -> Iterator[None]:
"""Garbage-collect each test's leaked memory streams inside its own teardown.

The filterwarnings marks above only apply while a test in this file is the
active warning context. The leaked streams sit in reference cycles, so without
a forced collection their deallocator warnings fire wherever the garbage
collector happens to run next: during an unrelated test (failing it, since the
global ``filterwarnings = ["error"]`` has no ignore there) or at pytest's
session-unconfigure unraisable sweep (exit code 1 after all tests passed when
running without xdist, e.g. ``-n 0`` for ``--pdb`` debugging).
"""
yield
gc.collect()


@pytest.fixture(autouse=True)
def _reset_sse_starlette_exit_event() -> Iterator[None]:
"""Reset sse-starlette's module-global exit Event around each test.

sse-starlette <3.0 (allowed by this branch's dependency floor; CI's lowest-direct leg
installs it) stores an `anyio.Event` on the `AppStatus` class the first time an
`EventSourceResponse` runs; that Event is bound to the test's event loop and breaks every
subsequent in-process SSE response (and `json_response=False` below means every request
in this module is served as one). sse-starlette 3.x switched to a ContextVar and has no
such attribute. Resetting on both sides of the test keeps this module immune to a stale
Event left behind by an earlier test on the same worker as well as cleaning up after its
own. This mirrors the autouse fixtures in tests/shared/test_sse.py and
tests/interaction/conftest.py.
"""
if hasattr(AppStatus, "should_exit_event"): # pragma: no branch
# setattr keeps pyright happy: the locked sse-starlette 3.x has no such attribute.
setattr(AppStatus, "should_exit_event", None) # pragma: lax no cover
yield
if hasattr(AppStatus, "should_exit_event"): # pragma: no branch
setattr(AppStatus, "should_exit_event", None) # pragma: lax no cover


# Test constants with various Unicode characters
UNICODE_TEST_STRINGS = {
Expand All @@ -35,28 +100,12 @@
}


def run_unicode_server(port: int) -> None: # pragma: no cover
"""Run the Unicode test server in a separate process."""
# Import inside the function since this runs in a separate process
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any

import uvicorn
from starlette.applications import Starlette
from starlette.routing import Mount

import mcp.types as types
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import TextContent, Tool

# Need to recreate the server setup in this process
server = Server(name="unicode_test_server")
def make_unicode_server() -> Server[object, object]:
"""The Unicode echo server: tool and prompt contents that exercise non-ASCII round trips."""
server: Server[object, object] = Server(name="unicode_test_server")

@server.list_tools()
async def list_tools() -> list[Tool]:
"""List tools with Unicode descriptions."""
async def handle_list_tools() -> list[Tool]:
return [
Tool(
name="echo_unicode",
Expand All @@ -72,22 +121,12 @@ async def list_tools() -> list[Tool]:
]

@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any] | None) -> list[TextContent]:
"""Handle tool calls with Unicode content."""
if name == "echo_unicode":
text = arguments.get("text", "") if arguments else ""
return [
TextContent(
type="text",
text=f"Echo: {text}",
)
]
else:
raise ValueError(f"Unknown tool: {name}")
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> CallToolResult:
assert name == "echo_unicode"
return CallToolResult(content=[TextContent(type="text", text=f"Echo: {arguments['text']}")])

@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
"""List prompts with Unicode names and descriptions."""
async def handle_list_prompts() -> list[types.Prompt]:
return [
types.Prompt(
name="unicode_prompt",
Expand All @@ -97,137 +136,90 @@ async def list_prompts() -> list[types.Prompt]:
]

@server.get_prompt()
async def get_prompt(name: str, arguments: dict[str, Any] | None) -> types.GetPromptResult:
"""Get a prompt with Unicode content."""
if name == "unicode_prompt":
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text="Hello世界🌍Привет안녕مرحباשלום",
),
)
]
)
raise ValueError(f"Unknown prompt: {name}")

# Create the session manager
session_manager = StreamableHTTPSessionManager(
app=server,
json_response=False, # Use SSE for testing
)

@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncGenerator[None, None]:
async with session_manager.run():
yield

# Create an ASGI application
app = Starlette(
debug=True,
routes=[
Mount("/mcp", app=session_manager.handle_request),
],
lifespan=lifespan,
)

# Run the server
config = uvicorn.Config(
app=app,
host="127.0.0.1",
port=port,
log_level="error",
)
uvicorn_server = uvicorn.Server(config)
uvicorn_server.run()


@pytest.fixture
def unicode_server_port() -> int:
"""Find an available port for the Unicode test server."""
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]


@pytest.fixture
def running_unicode_server(unicode_server_port: int) -> Generator[str, None, None]:
"""Start a Unicode test server in a separate process."""
proc = multiprocessing.Process(target=run_unicode_server, kwargs={"port": unicode_server_port}, daemon=True)
proc.start()

# Wait for server to be ready
wait_for_server(unicode_server_port)

try:
yield f"http://127.0.0.1:{unicode_server_port}"
finally:
# Clean up - try graceful termination first
proc.terminate()
proc.join(timeout=2)
if proc.is_alive(): # pragma: no cover
proc.kill()
proc.join(timeout=1)
async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult:
assert name == "unicode_prompt"
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(type="text", text="Hello世界🌍Привет안녕مرحباשלום"),
)
]
)

return server


@asynccontextmanager
async def unicode_session() -> AsyncIterator[ClientSession]:
"""Yield an initialized ClientSession speaking streamable HTTP (SSE responses) to the
Unicode test server, entirely in process."""
# SSE response mode, so Unicode rides the SSE event encoding rather than a plain JSON body.
session_manager = StreamableHTTPSessionManager(app=make_unicode_server(), json_response=False)
app = Starlette(routes=[Mount("/mcp", app=session_manager.handle_request)])

async with (
session_manager.run(),
# follow_redirects matches the SDK's own client factory; Starlette's Mount 307-redirects
# the bare /mcp path to /mcp/.
httpx.AsyncClient(
transport=StreamingASGITransport(app), base_url=BASE_URL, follow_redirects=True
) as http_client,
streamable_http_client(f"{BASE_URL}/mcp", http_client=http_client) as (
read_stream,
write_stream,
_get_session_id,
),
ClientSession(read_stream, write_stream) as session,
):
await session.initialize()
yield session


@pytest.mark.anyio
async def test_streamable_http_client_unicode_tool_call(running_unicode_server: str) -> None:
async def test_streamable_http_client_unicode_tool_call() -> None:
"""Test that Unicode text is correctly handled in tool calls via streamable HTTP."""
base_url = running_unicode_server
endpoint_url = f"{base_url}/mcp"

async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

# Test 1: List tools (server→client Unicode in descriptions)
tools = await session.list_tools()
assert len(tools.tools) == 1
async with unicode_session() as session:
# Test 1: List tools (server→client Unicode in descriptions)
tools = await session.list_tools()
assert len(tools.tools) == 1

# Check Unicode in tool descriptions
echo_tool = tools.tools[0]
assert echo_tool.name == "echo_unicode"
assert echo_tool.description is not None
assert "🔤" in echo_tool.description
assert "👋" in echo_tool.description
# Check Unicode in tool descriptions
echo_tool = tools.tools[0]
assert echo_tool.name == "echo_unicode"
assert echo_tool.description is not None
assert "🔤" in echo_tool.description
assert "👋" in echo_tool.description

# Test 2: Send Unicode text in tool call (client→server→client)
for test_name, test_string in UNICODE_TEST_STRINGS.items():
result = await session.call_tool("echo_unicode", arguments={"text": test_string})
# Test 2: Send Unicode text in tool call (client→server→client)
for test_name, test_string in UNICODE_TEST_STRINGS.items():
result = await session.call_tool("echo_unicode", arguments={"text": test_string})

# Verify server correctly received and echoed back Unicode
assert len(result.content) == 1
content = result.content[0]
assert content.type == "text"
assert f"Echo: {test_string}" == content.text, f"Failed for {test_name}"
# Verify server correctly received and echoed back Unicode
assert len(result.content) == 1
content = result.content[0]
assert content.type == "text"
assert f"Echo: {test_string}" == content.text, f"Failed for {test_name}"


@pytest.mark.anyio
async def test_streamable_http_client_unicode_prompts(running_unicode_server: str) -> None:
async def test_streamable_http_client_unicode_prompts() -> None:
"""Test that Unicode text is correctly handled in prompts via streamable HTTP."""
base_url = running_unicode_server
endpoint_url = f"{base_url}/mcp"

async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

# Test 1: List prompts (server→client Unicode in descriptions)
prompts = await session.list_prompts()
assert len(prompts.prompts) == 1

prompt = prompts.prompts[0]
assert prompt.name == "unicode_prompt"
assert prompt.description is not None
assert "Слой хранилища, где располагаются" in prompt.description

# Test 2: Get prompt with Unicode content (server→client)
result = await session.get_prompt("unicode_prompt", arguments={})
assert len(result.messages) == 1

message = result.messages[0]
assert message.role == "user"
assert message.content.type == "text"
assert message.content.text == "Hello世界🌍Привет안녕مرحباשלום"
async with unicode_session() as session:
# Test 1: List prompts (server→client Unicode in descriptions)
prompts = await session.list_prompts()
assert len(prompts.prompts) == 1

prompt = prompts.prompts[0]
assert prompt.name == "unicode_prompt"
assert prompt.description is not None
assert "Слой хранилища, где располагаются" in prompt.description

# Test 2: Get prompt with Unicode content (server→client)
result = await session.get_prompt("unicode_prompt", arguments={})
assert len(result.messages) == 1

message = result.messages[0]
assert message.role == "user"
assert message.content.type == "text"
assert message.content.text == "Hello世界🌍Привет안녕مرحباשלום"
Loading
Loading