diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index ba58da7321..987a3ca486 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -1,4 +1,5 @@ import errno +import gc import os import shutil import sys @@ -8,9 +9,16 @@ import anyio import pytest +from anyio.abc import Process from mcp.client.session import ClientSession -from mcp.client.stdio import StdioServerParameters, _create_platform_compatible_process, stdio_client +from mcp.client.stdio import ( + StdioServerParameters, + _create_platform_compatible_process, + _terminate_process_tree, + stdio_client, +) +from mcp.os.win32.utilities import FallbackProcess from mcp.shared.exceptions import McpError from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse @@ -219,6 +227,79 @@ def sigint_handler(signum, frame): raise +async def _wait_for_first_write(path: str) -> None: + """Poll until the file at *path* exists and has grown beyond its initial empty state. + + The marker files below are created empty before the writer is spawned, so any + growth proves the writing process booted and reached its write loop. Polling + replaces fixed startup sleeps, which flake on loaded machines where interpreter + startup can exceed any fixed window. Bounded so a writer that never starts + fails the test instead of hanging it. + """ + with anyio.fail_after(15): + while not os.path.exists(path) or os.path.getsize(path) == 0: + await anyio.sleep(0.05) + + +async def _wait_for_writes_to_stop(path: str) -> None: + """Poll until the file at *path* stops growing. + + Returns once the size is unchanged across three successive 0.3 second gaps + (each three times the writers' 0.1 second write interval), so a writer that + is merely starved of CPU for a single gap is not mistaken for a terminated + one. Any observed growth resets the consecutive-stable counter. The sentinel + forces at least one non-stable iteration before counting starts. If the file + never stops growing, the timeout fails the test: a writer that survives + _terminate_process_tree is a genuine cleanup failure that must not be masked. + """ + last_size = -1 + stable_pairs = 0 + with anyio.fail_after(15): + while True: + current_size = os.path.getsize(path) + if current_size == last_size: + stable_pairs += 1 + else: + stable_pairs = 0 + last_size = current_size + if stable_pairs == 3: + return + await anyio.sleep(0.3) + + +async def _dispose_process(proc: Process | FallbackProcess) -> None: + """Reap a dead process and close its pipe streams inside the test that spawned it. + + Without this, the subprocess transports stay referenced by the per-test event + loop, become garbage only after that loop closes, and their GC-time + ResourceWarnings fire during a later test on the same worker (on Windows + proactor the warning can itself die in __repr__ on a closed pipe). An in-test + gc.collect() cannot catch that, so the process is reaped and closed + deterministically here. Draining stdout to EOF guarantees the event loop has + observed the pipe closure (anyio's reader aclose alone does not close the + underlying transport), which lets asyncio close the subprocess transport + before the test returns. + + Precondition: the WHOLE process tree must already be confirmed dead. wait() + tolerates an already-exited process and returns promptly, but on the Windows + fallback path it runs popen.wait in a thread and is effectively uncancellable, + and stdout only reaches EOF once every tree member that inherited the pipe + handle is gone. The timeout fails the test rather than hanging it if that + precondition is ever violated. + """ + with anyio.fail_after(15): + await proc.wait() + assert proc.stdin is not None + await proc.stdin.aclose() + assert proc.stdout is not None + while True: + try: + await proc.stdout.receive() + except anyio.EndOfStream: + break + await proc.stdout.aclose() + + class TestChildProcessCleanup: """ Tests for child process cleanup functionality using _terminate_process_tree. @@ -259,84 +340,71 @@ async def test_basic_child_process_cleanup(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: parent_marker = f.name - try: - # Parent script that spawns a child process - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import os - - # Mark that parent started - with open({escape_path_for_python(parent_marker)}, 'w') as f: - f.write('parent started\\n') - - # Child script that writes continuously - child_script = f''' - import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"{time.time()}") - f.flush() - time.sleep(0.1) - ''' - - # Start the child process - child = subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent just sleeps + # Parent script that spawns a child process + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import os + + # Mark that parent started + with open({escape_path_for_python(parent_marker)}, 'w') as f: + f.write('parent started\\n') + + # Child script that writes continuously + child_script = f''' + import time + with open({escape_path_for_python(marker_file)}, 'a') as f: while True: + f.write(f"{time.time()}") + f.flush() time.sleep(0.1) - """ - ) + ''' - print("\nStarting child process termination test...") + # Start the child process + child = subprocess.Popen([sys.executable, '-c', child_script]) - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Parent just sleeps + while True: + time.sleep(0.1) + """ + ) - # Wait for processes to start - await anyio.sleep(0.5) + # Start the parent process + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + tree_killed = False - # Verify parent started - assert os.path.exists(parent_marker), "Parent process didn't start" + try: + # Wait for the parent to start and the child to reach its write loop + await _wait_for_first_write(parent_marker) + assert os.path.getsize(parent_marker) > 0, "Parent process didn't start" - # Verify child is writing - if os.path.exists(marker_file): # pragma: no branch - initial_size = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size_after_wait = os.path.getsize(marker_file) - assert size_after_wait > initial_size, "Child process should be writing" - print(f"Child is writing (file grew from {initial_size} to {size_after_wait} bytes)") + await _wait_for_first_write(marker_file) + assert os.path.getsize(marker_file) > 0, "Child process should be writing" # Terminate using our function - print("Terminating process and children...") - from mcp.client.stdio import _terminate_process_tree - await _terminate_process_tree(proc) + tree_killed = True - # Verify processes stopped - await anyio.sleep(0.5) - if os.path.exists(marker_file): # pragma: no branch - size_after_cleanup = os.path.getsize(marker_file) - await anyio.sleep(0.5) - final_size = os.path.getsize(marker_file) - - print(f"After cleanup: file size {size_after_cleanup} -> {final_size}") - assert final_size == size_after_cleanup, ( - f"Child process still running! File grew by {final_size - size_after_cleanup} bytes" - ) - - print("SUCCESS: Child process was properly terminated") + # Verify the child stopped writing; a survivor times out and fails the test + await _wait_for_writes_to_stop(marker_file) + # Tree is dead: reap and close the process so nothing leaks into later tests + await _dispose_process(proc) finally: + if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight + await _terminate_process_tree(proc) + await _dispose_process(proc) # Clean up files for f in [marker_file, parent_marker]: try: os.unlink(f) except OSError: # pragma: no cover pass + # Collect subprocess transports now, while this test's warning filters + # are active, so GC-time ResourceWarnings cannot hit a later test + gc.collect() @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") @@ -353,88 +421,83 @@ async def test_nested_process_tree(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f3: grandchild_file = f3.name - try: - # Simple nested process tree test - # We create parent -> child -> grandchild, each writing to a file - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import os - - # Child will spawn grandchild and write to child file - child_script = f'''import subprocess - import sys - import time - - # Grandchild just writes to file - grandchild_script = \"\"\"import time - with open({escape_path_for_python(grandchild_file)}, 'a') as f: - while True: - f.write(f"gc {{time.time()}}") - f.flush() - time.sleep(0.1)\"\"\" - - # Spawn grandchild - subprocess.Popen([sys.executable, '-c', grandchild_script]) - - # Child writes to its file - with open({escape_path_for_python(child_file)}, 'a') as f: - while True: - f.write(f"c {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Spawn child process - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent writes to its file - with open({escape_path_for_python(parent_file)}, 'a') as f: - while True: - f.write(f"p {time.time()}") - f.flush() - time.sleep(0.1) - """ - ) + # Simple nested process tree test + # We create parent -> child -> grandchild, each writing to a file + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import os + + # Child will spawn grandchild and write to child file + child_script = f'''import subprocess + import sys + import time + + # Grandchild just writes to file + grandchild_script = \"\"\"import time + with open({escape_path_for_python(grandchild_file)}, 'a') as f: + while True: + f.write(f"gc {{time.time()}}") + f.flush() + time.sleep(0.1)\"\"\" - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Spawn grandchild + subprocess.Popen([sys.executable, '-c', grandchild_script]) - # Let all processes start - await anyio.sleep(1.0) + # Child writes to its file + with open({escape_path_for_python(child_file)}, 'a') as f: + while True: + f.write(f"c {time.time()}") + f.flush() + time.sleep(0.1)''' - # Verify all are writing - for file_path, name in [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")]: - if os.path.exists(file_path): # pragma: no branch - initial_size = os.path.getsize(file_path) - await anyio.sleep(0.3) - new_size = os.path.getsize(file_path) - assert new_size > initial_size, f"{name} process should be writing" + # Spawn child process + subprocess.Popen([sys.executable, '-c', child_script]) - # Terminate the whole tree - from mcp.client.stdio import _terminate_process_tree + # Parent writes to its file + with open({escape_path_for_python(parent_file)}, 'a') as f: + while True: + f.write(f"p {time.time()}") + f.flush() + time.sleep(0.1) + """ + ) - await _terminate_process_tree(proc) + # Start the parent process + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + tree_killed = False - # Verify all stopped - await anyio.sleep(0.5) + try: + # Wait for every level of the tree to reach its write loop for file_path, name in [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")]: - if os.path.exists(file_path): # pragma: no branch - size1 = os.path.getsize(file_path) - await anyio.sleep(0.3) - size2 = os.path.getsize(file_path) - assert size1 == size2, f"{name} still writing after cleanup!" + await _wait_for_first_write(file_path) + assert os.path.getsize(file_path) > 0, f"{name} process should be writing" + + # Terminate the whole tree + await _terminate_process_tree(proc) + tree_killed = True - print("SUCCESS: All processes in tree terminated") + # Verify every level stopped writing; a survivor times out and fails the test + for file_path in (parent_file, child_file, grandchild_file): + await _wait_for_writes_to_stop(file_path) + # Tree is dead: reap and close the process so nothing leaks into later tests + await _dispose_process(proc) finally: + if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight + await _terminate_process_tree(proc) + await _dispose_process(proc) # Clean up all marker files for f in [parent_file, child_file, grandchild_file]: try: os.unlink(f) except OSError: # pragma: no cover pass + # Collect subprocess transports now, while this test's warning filters + # are active, so GC-time ResourceWarnings cannot hit a later test + gc.collect() @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") @@ -448,72 +511,67 @@ async def test_early_parent_exit(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: marker_file = f.name - try: - # Parent that spawns child and waits briefly - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import signal - - # Child that continues running - child_script = f'''import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"child {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Start child in same process group - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent waits a bit then exits on SIGTERM - def handle_term(sig, frame): - sys.exit(0) - - signal.signal(signal.SIGTERM, handle_term) - - # Wait + # Parent that spawns child and waits briefly + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import signal + + # Child that continues running + child_script = f'''import time + with open({escape_path_for_python(marker_file)}, 'a') as f: while True: - time.sleep(0.1) - """ - ) + f.write(f"child {time.time()}") + f.flush() + time.sleep(0.1)''' - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Start child in same process group + subprocess.Popen([sys.executable, '-c', child_script]) - # Let child start writing - await anyio.sleep(0.5) + # Parent waits a bit then exits on SIGTERM + def handle_term(sig, frame): + sys.exit(0) - # Verify child is writing - if os.path.exists(marker_file): # pragma: no cover - size1 = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size2 = os.path.getsize(marker_file) - assert size2 > size1, "Child should be writing" + signal.signal(signal.SIGTERM, handle_term) - # Terminate - this will kill the process group even if parent exits first - from mcp.client.stdio import _terminate_process_tree + # Wait + while True: + time.sleep(0.1) + """ + ) - await _terminate_process_tree(proc) + # Start the parent process + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + tree_killed = False + + try: + # Wait for the child to reach its write loop + await _wait_for_first_write(marker_file) + assert os.path.getsize(marker_file) > 0, "Child should be writing" - # Verify child stopped - await anyio.sleep(0.5) - if os.path.exists(marker_file): # pragma: no branch - size3 = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size4 = os.path.getsize(marker_file) - assert size3 == size4, "Child should be terminated" + # Terminate - this will kill the process group even if parent exits first + await _terminate_process_tree(proc) + tree_killed = True - print("SUCCESS: Child terminated even with parent exit during cleanup") + # Verify the child stopped writing; a survivor times out and fails the test + await _wait_for_writes_to_stop(marker_file) + # Tree is dead: reap and close the process so nothing leaks into later tests + await _dispose_process(proc) finally: + if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight + await _terminate_process_tree(proc) + await _dispose_process(proc) # Clean up marker file try: os.unlink(marker_file) except OSError: # pragma: no cover pass + # Collect subprocess transports now, while this test's warning filters + # are active, so GC-time ResourceWarnings cannot hit a later test + gc.collect() @pytest.mark.anyio