Skip to content

Commit 54be4d6

Browse files
felixweinbergerjingx8885surya-prakash-susarla
committed
Fix child process cleanup in stdio termination
When terminating MCP servers, child processes were being orphaned because only the parent process was killed. This caused resource leaks and prevented proper cleanup, especially with tools like npx that spawn child processes for the actual server implementation. This was happening on both POSIX and Windows systems - however because of implementation details, resolving this is non-trivial and requires introducing psutil to introduce cross-platform utilities for dealing with children and process trees. This addresses critical issues where MCP servers using process spawning tools would leave zombie processes running after client shutdown. resolves #850 resolves #729 Co-authored-by: jingx8885 <[email protected]> Co-authored-by: Surya Prakash Susarla <[email protected]>
1 parent c9b43bc commit 54be4d6

File tree

5 files changed

+511
-12
lines changed

5 files changed

+511
-12
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies = [
3232
"pydantic-settings>=2.5.2",
3333
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
3434
"jsonschema>=4.20.0",
35+
"pywin32>=310; sys_platform == 'win32'",
3536
]
3637

3738
[project.optional-dependencies]
@@ -125,4 +126,6 @@ filterwarnings = [
125126
"ignore::DeprecationWarning:websockets",
126127
"ignore:websockets.server.WebSocketServerProtocol is deprecated:DeprecationWarning",
127128
"ignore:Returning str or bytes.*:DeprecationWarning:mcp.server.lowlevel",
129+
# pywin32 internal deprecation warning
130+
"ignore:getargs.*The 'u' format is deprecated:DeprecationWarning"
128131
]

src/mcp/client/stdio/__init__.py

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
import logging
12
import os
3+
import signal
24
import sys
35
from contextlib import asynccontextmanager
46
from pathlib import Path
57
from typing import Literal, TextIO
68

79
import anyio
810
import anyio.lowlevel
11+
from anyio.abc import Process
912
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1013
from anyio.streams.text import TextReceiveStream
1114
from pydantic import BaseModel, Field
@@ -14,10 +17,14 @@
1417
from mcp.shared.message import SessionMessage
1518

1619
from .win32 import (
20+
FallbackProcess,
1721
create_windows_process,
1822
get_windows_executable_command,
23+
terminate_windows_process_tree,
1924
)
2025

26+
logger = logging.getLogger("client.stdio")
27+
2128
# Environment variables to inherit by default
2229
DEFAULT_INHERITED_ENV_VARS = (
2330
[
@@ -184,7 +191,7 @@ async def stdin_writer():
184191
await process.wait()
185192
except TimeoutError:
186193
# If process doesn't terminate in time, force kill it
187-
process.kill()
194+
await _terminate_process_tree(process)
188195
except ProcessLookupError:
189196
# Process already exited, which is fine
190197
pass
@@ -219,11 +226,65 @@ async def _create_platform_compatible_process(
219226
):
220227
"""
221228
Creates a subprocess in a platform-compatible way.
222-
Returns a process handle.
229+
230+
Unix: Creates process in a new session/process group for killpg support
231+
Windows: Creates process in a Job Object for reliable child termination
223232
"""
224233
if sys.platform == "win32":
225234
process = await create_windows_process(command, args, env, errlog, cwd)
226235
else:
227-
process = await anyio.open_process([command, *args], env=env, stderr=errlog, cwd=cwd)
236+
process = await anyio.open_process(
237+
[command, *args],
238+
env=env,
239+
stderr=errlog,
240+
cwd=cwd,
241+
start_new_session=True,
242+
)
228243

229244
return process
245+
246+
247+
async def _terminate_process_tree(process: Process | FallbackProcess, timeout: float = 2.0) -> None:
248+
"""
249+
Terminate a process and all its children using platform-specific methods.
250+
251+
Unix: Uses os.killpg() for atomic process group termination
252+
Windows: Uses Job Objects via pywin32 for reliable child process cleanup
253+
"""
254+
if sys.platform == "win32":
255+
await terminate_windows_process_tree(process, timeout)
256+
else:
257+
pid = getattr(process, "pid", None) or getattr(getattr(process, "popen", None), "pid", None)
258+
if not pid:
259+
return
260+
261+
try:
262+
pgid = os.getpgid(pid)
263+
os.killpg(pgid, signal.SIGTERM)
264+
265+
deadline = anyio.current_time() + timeout
266+
while anyio.current_time() < deadline:
267+
try:
268+
# Check if process group still exists (signal 0 = check only)
269+
os.killpg(pgid, 0)
270+
await anyio.sleep(0.1)
271+
except ProcessLookupError:
272+
return
273+
274+
try:
275+
os.killpg(pgid, signal.SIGKILL)
276+
except ProcessLookupError:
277+
pass
278+
279+
except (ProcessLookupError, PermissionError, OSError) as e:
280+
logger.warning(f"Process group termination failed for PID {pid}: {e}, falling back to simple terminate")
281+
try:
282+
process.terminate()
283+
with anyio.fail_after(timeout):
284+
await process.wait()
285+
except Exception as term_error:
286+
logger.warning(f"Process termination failed for PID {pid}: {term_error}, attempting force kill")
287+
try:
288+
process.kill()
289+
except Exception as kill_error:
290+
logger.error(f"Failed to kill process {pid}: {kill_error}")

src/mcp/client/stdio/win32.py

Lines changed: 117 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Windows-specific functionality for stdio client operations.
33
"""
44

5+
import logging
56
import shutil
67
import subprocess
78
import sys
@@ -13,6 +14,23 @@
1314
from anyio.abc import Process
1415
from anyio.streams.file import FileReadStream, FileWriteStream
1516

17+
logger = logging.getLogger("client.stdio.win32")
18+
19+
# Windows-specific imports for Job Objects
20+
if sys.platform == "win32":
21+
import pywintypes
22+
import win32api
23+
import win32con
24+
import win32job
25+
else:
26+
# Type stubs for non-Windows platforms
27+
win32api = None
28+
win32con = None
29+
win32job = None
30+
pywintypes = None
31+
32+
JobHandle = int
33+
1634

1735
def get_windows_executable_command(command: str) -> str:
1836
"""
@@ -103,6 +121,11 @@ def kill(self) -> None:
103121
"""Kill the subprocess immediately (alias for terminate)."""
104122
self.terminate()
105123

124+
@property
125+
def pid(self) -> int:
126+
"""Return the process ID."""
127+
return self.popen.pid
128+
106129

107130
# ------------------------
108131
# Updated function
@@ -117,13 +140,16 @@ async def create_windows_process(
117140
cwd: Path | str | None = None,
118141
) -> Process | FallbackProcess:
119142
"""
120-
Creates a subprocess in a Windows-compatible way.
143+
Creates a subprocess in a Windows-compatible way with Job Object support.
121144
122145
Attempt to use anyio's open_process for async subprocess creation.
123146
In some cases this will throw NotImplementedError on Windows, e.g.
124147
when using the SelectorEventLoop which does not support async subprocesses.
125148
In that case, we fall back to using subprocess.Popen.
126149
150+
The process is automatically added to a Job Object to ensure all child
151+
processes are terminated when the parent is terminated.
152+
127153
Args:
128154
command (str): The executable to run
129155
args (list[str]): List of command line arguments
@@ -132,8 +158,11 @@ async def create_windows_process(
132158
cwd (Path | str | None): Working directory for the subprocess
133159
134160
Returns:
135-
FallbackProcess: Async-compatible subprocess with stdin and stdout streams
161+
Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams
136162
"""
163+
job = _create_job_object()
164+
process = None
165+
137166
try:
138167
# First try using anyio with Windows-specific flags to hide console window
139168
process = await anyio.open_process(
@@ -146,10 +175,9 @@ async def create_windows_process(
146175
stderr=errlog,
147176
cwd=cwd,
148177
)
149-
return process
150178
except NotImplementedError:
151-
# Windows often doesn't support async subprocess creation, use fallback
152-
return await _create_windows_fallback_process(command, args, env, errlog, cwd)
179+
# If Windows doesn't support async subprocess creation, use fallback
180+
process = await _create_windows_fallback_process(command, args, env, errlog, cwd)
153181
except Exception:
154182
# Try again without creation flags
155183
process = await anyio.open_process(
@@ -158,7 +186,9 @@ async def create_windows_process(
158186
stderr=errlog,
159187
cwd=cwd,
160188
)
161-
return process
189+
190+
_maybe_assign_process_to_job(process, job)
191+
return process
162192

163193

164194
async def _create_windows_fallback_process(
@@ -185,8 +215,6 @@ async def _create_windows_fallback_process(
185215
bufsize=0, # Unbuffered output
186216
creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0),
187217
)
188-
return FallbackProcess(popen_obj)
189-
190218
except Exception:
191219
# If creationflags failed, fallback without them
192220
popen_obj = subprocess.Popen(
@@ -198,4 +226,84 @@ async def _create_windows_fallback_process(
198226
cwd=cwd,
199227
bufsize=0,
200228
)
201-
return FallbackProcess(popen_obj)
229+
process = FallbackProcess(popen_obj)
230+
return process
231+
232+
233+
def _create_job_object() -> int | None:
234+
"""
235+
Create a Windows Job Object configured to terminate all processes when closed.
236+
"""
237+
if sys.platform != "win32" or not win32job:
238+
return None
239+
240+
try:
241+
job = win32job.CreateJobObject(None, "")
242+
extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation)
243+
244+
extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
245+
win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info)
246+
return job
247+
except Exception as e:
248+
logger.warning(f"Failed to create Job Object for process tree management: {e}")
249+
return None
250+
251+
252+
def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHandle | None) -> None:
253+
"""
254+
Try to assign a process to a job object. If assignment fails
255+
for any reason, the job handle is closed.
256+
"""
257+
if not job:
258+
return
259+
260+
if sys.platform != "win32" or not win32api or not win32con or not win32job:
261+
return
262+
263+
try:
264+
process_handle = win32api.OpenProcess(
265+
win32con.PROCESS_SET_QUOTA | win32con.PROCESS_TERMINATE, False, process.pid
266+
)
267+
if not process_handle:
268+
raise Exception("Failed to open process handle")
269+
270+
try:
271+
win32job.AssignProcessToJobObject(job, process_handle)
272+
process._job_object = job
273+
finally:
274+
win32api.CloseHandle(process_handle)
275+
except Exception as e:
276+
logger.warning(f"Failed to assign process {process.pid} to Job Object: {e}")
277+
if win32api:
278+
win32api.CloseHandle(job)
279+
280+
281+
async def terminate_windows_process_tree(process: Process | FallbackProcess, timeout: float = 2.0) -> None:
282+
"""
283+
Terminate a process and all its children on Windows.
284+
285+
If the process has an associated job object, it will be terminated.
286+
Otherwise, falls back to basic process termination.
287+
"""
288+
if sys.platform != "win32":
289+
return
290+
291+
job = getattr(process, "_job_object", None)
292+
if job and win32job:
293+
try:
294+
win32job.TerminateJobObject(job, 1)
295+
except Exception:
296+
# Job might already be terminated
297+
pass
298+
finally:
299+
if win32api:
300+
try:
301+
win32api.CloseHandle(job)
302+
except Exception:
303+
pass
304+
305+
# Always try to terminate the process itself as well
306+
try:
307+
process.terminate()
308+
except Exception:
309+
pass

0 commit comments

Comments
 (0)