Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/mcp/client/stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,24 @@ async def stdout_reader():
message = types.jsonrpc_message_adapter.validate_json(line, by_name=False)
except Exception as exc: # pragma: no cover
logger.exception("Failed to parse JSONRPC message from server")
await read_stream_writer.send(exc)
try:
await read_stream_writer.send(exc)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
return
Comment on lines +156 to +159
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try:
await read_stream_writer.send(exc)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
return
await read_stream_writer.send(exc)

continue

session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: lax no cover
try:
await read_stream_writer.send(session_message)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# The caller exited the stdio_client context while the
# subprocess was still writing; the reader stream has been
# closed (ClosedResourceError) or the closure raced our
# in-flight send (BrokenResourceError). Either way there
# is nowhere to deliver this message, so shut down
# cleanly. Fixes #1960.
return
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover
Comment on lines +163 to +173
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try:
await read_stream_writer.send(session_message)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# The caller exited the stdio_client context while the
# subprocess was still writing; the reader stream has been
# closed (ClosedResourceError) or the closure raced our
# in-flight send (BrokenResourceError). Either way there
# is nowhere to deliver this message, so shut down
# cleanly. Fixes #1960.
return
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover
await read_stream_writer.send(session_message)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# BrokenResourceError: read_stream (receiver) was closed during cleanup while
# send() was blocked waiting for a consumer.

await anyio.lowlevel.checkpoint()

async def stdin_writer():
Expand Down
39 changes: 39 additions & 0 deletions tests/client/test_stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,45 @@ async def test_stdio_context_manager_exiting():
pass


@pytest.mark.anyio
async def test_stdio_client_exits_cleanly_while_server_still_writing():
"""Regression test for #1960.

Exiting the ``stdio_client`` context while the subprocess is still writing to
stdout used to surface ``anyio.BrokenResourceError`` through the task group
(as an ``ExceptionGroup``). The ``finally`` block closes
``read_stream_writer`` while the background ``stdout_reader`` task is
mid-``send``.

The fix makes ``stdout_reader`` catch both ``ClosedResourceError`` and
``BrokenResourceError`` and return cleanly, so exiting the context is a
no-op no matter what the subprocess is doing.
"""
# A server that emits a large burst of valid JSON-RPC notifications without
# ever reading stdin. When we exit the context below, the subprocess is
# still in the middle of that burst, which is the exact shape of the race.
noisy_script = textwrap.dedent(
"""
import sys
for i in range(1000):
sys.stdout.write(
'{"jsonrpc":"2.0","method":"notifications/message",'
'"params":{"level":"info","data":"line ' + str(i) + '"}}\\n'
)
sys.stdout.flush()
"""
)

server_params = StdioServerParameters(command=sys.executable, args=["-c", noisy_script])

# The ``async with`` must complete without an ``ExceptionGroup`` /
# ``BrokenResourceError`` propagating. ``anyio.fail_after`` prevents a
# regression from hanging CI.
with anyio.fail_after(5.0):
async with stdio_client(server_params) as (_, _):
pass
Comment on lines +41 to +76
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def test_stdio_client_exits_cleanly_while_server_still_writing():
"""Regression test for #1960.
Exiting the ``stdio_client`` context while the subprocess is still writing to
stdout used to surface ``anyio.BrokenResourceError`` through the task group
(as an ``ExceptionGroup``). The ``finally`` block closes
``read_stream_writer`` while the background ``stdout_reader`` task is
mid-``send``.
The fix makes ``stdout_reader`` catch both ``ClosedResourceError`` and
``BrokenResourceError`` and return cleanly, so exiting the context is a
no-op no matter what the subprocess is doing.
"""
# A server that emits a large burst of valid JSON-RPC notifications without
# ever reading stdin. When we exit the context below, the subprocess is
# still in the middle of that burst, which is the exact shape of the race.
noisy_script = textwrap.dedent(
"""
import sys
for i in range(1000):
sys.stdout.write(
'{"jsonrpc":"2.0","method":"notifications/message",'
'"params":{"level":"info","data":"line ' + str(i) + '"}}\\n'
)
sys.stdout.flush()
"""
)
server_params = StdioServerParameters(command=sys.executable, args=["-c", noisy_script])
# The ``async with`` must complete without an ``ExceptionGroup`` /
# ``BrokenResourceError`` propagating. ``anyio.fail_after`` prevents a
# regression from hanging CI.
with anyio.fail_after(5.0):
async with stdio_client(server_params) as (_, _):
pass
async def test_stdio_client_exits_cleanly_with_unread_server_output():
"""Exiting the stdio_client context while stdout_reader is blocked in send() must
not raise. Cleanup closes read_stream (the receiver) first, which wakes the
blocked send() with BrokenResourceError; stdout_reader should swallow it.
"""
script = textwrap.dedent(
"""
import sys
sys.stdout.write('{"jsonrpc":"2.0","method":"notifications/message","params":{}}\\n')
sys.stdout.flush()
sys.stdin.read()
"""
)
server_params = StdioServerParameters(command=sys.executable, args=["-c", script])
with anyio.fail_after(5.0):
async with stdio_client(server_params) as (read_stream, _):
# Wait until stdout_reader is parked inside send() on the zero-buffer
# stream. statistics() makes this a positive check rather than a sleep.
while read_stream.statistics().tasks_waiting_send == 0:
await anyio.sleep(0)



@pytest.mark.anyio
@pytest.mark.skipif(tee is None, reason="could not find tee command")
async def test_stdio_client():
Expand Down
Loading