From 9938ae94025cac8374bf5d9d1e3984967b5b782e Mon Sep 17 00:00:00 2001 From: BashNetCorp Date: Wed, 15 Apr 2026 10:11:02 -0400 Subject: [PATCH] fix(stdio): handle BrokenResourceError in stdout_reader race `stdio_client` has a race: `read_stream_writer.aclose()` in the context's `finally` block can close the receiver while the background `stdout_reader` task is mid-`send`. anyio then raises `BrokenResourceError`, which the outer `except` does not cover (`ClosedResourceError` is the sibling class, raised on already-closed streams, not streams closed during an in-flight send). The exception propagates through the task group as `ExceptionGroup` and fails every caller that exits the context while the subprocess is still writing to stdout. Wrap both `read_stream_writer.send(...)` sites in `try`/`except (ClosedResourceError, BrokenResourceError): return` so `stdout_reader` shuts down cleanly, and widen the outer `except` to the same union for defense in depth. No API changes. Adds `test_stdio_client_exits_cleanly_while_server_still_writing`: spawns a subprocess that emits a burst of JSONRPC notifications, exits the `stdio_client` context immediately, and asserts no exception propagates. Fails before the fix (ExceptionGroup / BrokenResourceError), passes after. Github-Issue:#1960 --- src/mcp/client/stdio.py | 18 +++++++++++++++--- tests/client/test_stdio.py | 39 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/src/mcp/client/stdio.py b/src/mcp/client/stdio.py index 902dc8576..38dc645c0 100644 --- a/src/mcp/client/stdio.py +++ b/src/mcp/client/stdio.py @@ -153,12 +153,24 @@ async def stdout_reader(): message = types.jsonrpc_message_adapter.validate_json(line, by_name=False) except Exception as exc: # pragma: no cover logger.exception("Failed to parse JSONRPC message from server") - await read_stream_writer.send(exc) + try: + await read_stream_writer.send(exc) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + return continue session_message = SessionMessage(message) - await read_stream_writer.send(session_message) - except anyio.ClosedResourceError: # pragma: lax no cover + try: + await read_stream_writer.send(session_message) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + # The caller exited the stdio_client context while the + # subprocess was still writing; the reader stream has been + # closed (ClosedResourceError) or the closure raced our + # in-flight send (BrokenResourceError). Either way there + # is nowhere to deliver this message, so shut down + # cleanly. Fixes #1960. + return + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover await anyio.lowlevel.checkpoint() async def stdin_writer(): diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 06e2cba4b..9e87aac22 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -37,6 +37,45 @@ async def test_stdio_context_manager_exiting(): pass +@pytest.mark.anyio +async def test_stdio_client_exits_cleanly_while_server_still_writing(): + """Regression test for #1960. + + Exiting the ``stdio_client`` context while the subprocess is still writing to + stdout used to surface ``anyio.BrokenResourceError`` through the task group + (as an ``ExceptionGroup``). The ``finally`` block closes + ``read_stream_writer`` while the background ``stdout_reader`` task is + mid-``send``. + + The fix makes ``stdout_reader`` catch both ``ClosedResourceError`` and + ``BrokenResourceError`` and return cleanly, so exiting the context is a + no-op no matter what the subprocess is doing. + """ + # A server that emits a large burst of valid JSON-RPC notifications without + # ever reading stdin. When we exit the context below, the subprocess is + # still in the middle of that burst, which is the exact shape of the race. + noisy_script = textwrap.dedent( + """ + import sys + for i in range(1000): + sys.stdout.write( + '{"jsonrpc":"2.0","method":"notifications/message",' + '"params":{"level":"info","data":"line ' + str(i) + '"}}\\n' + ) + sys.stdout.flush() + """ + ) + + server_params = StdioServerParameters(command=sys.executable, args=["-c", noisy_script]) + + # The ``async with`` must complete without an ``ExceptionGroup`` / + # ``BrokenResourceError`` propagating. ``anyio.fail_after`` prevents a + # regression from hanging CI. + with anyio.fail_after(5.0): + async with stdio_client(server_params) as (_, _): + pass + + @pytest.mark.anyio @pytest.mark.skipif(tee is None, reason="could not find tee command") async def test_stdio_client():