Skip to content

Webhooks #777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mcp/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ async def call_tool(
arguments: dict[str, Any] | None = None,
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
webhooks: list[types.Webhook] | None = None,
) -> types.CallToolResult:
"""Send a tools/call request with optional progress callback support."""

Expand All @@ -281,6 +282,7 @@ async def call_tool(
params=types.CallToolRequestParams(
name=name,
arguments=arguments,
webhooks=webhooks,
),
)
),
Expand Down
24 changes: 17 additions & 7 deletions src/mcp/server/fastmcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class Settings(BaseSettings, Generic[LifespanResultT]):
stateless_http: bool = (
False # If True, uses true stateless mode (new transport per request)
)
webhooks_supported: bool = False

# resource settings
warn_on_duplicate_resources: bool = True
Expand Down Expand Up @@ -150,11 +151,10 @@ def __init__(
self._mcp_server = MCPServer(
name=name or "FastMCP",
instructions=instructions,
lifespan=(
lifespan_wrapper(self, self.settings.lifespan)
if self.settings.lifespan
else default_lifespan
),
lifespan=lifespan_wrapper(self, self.settings.lifespan)
if self.settings.lifespan
else default_lifespan,
webhooks_supported=self.settings.webhooks_supported,
)
self._tool_manager = ToolManager(
tools=tools, warn_on_duplicate_tools=self.settings.warn_on_duplicate_tools
Expand All @@ -165,6 +165,7 @@ def __init__(
self._prompt_manager = PromptManager(
warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts
)

if (self.settings.auth is not None) != (auth_server_provider is not None):
# TODO: after we support separate authorization servers (see
# https://github.com/modelcontextprotocol/modelcontextprotocol/pull/284)
Expand Down Expand Up @@ -272,11 +273,17 @@ def get_context(self) -> Context[ServerSession, object]:
return Context(request_context=request_context, fastmcp=self)

async def call_tool(
self, name: str, arguments: dict[str, Any]
self,
name: str,
arguments: dict[str, Any],
) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Call a tool by name with arguments."""
context = self.get_context()
result = await self._tool_manager.call_tool(name, arguments, context=context)
result = await self._tool_manager.call_tool(
name,
arguments,
context=context,
)
converted_result = _convert_to_content(result)
return converted_result

Expand Down Expand Up @@ -777,6 +784,8 @@ def streamable_http_app(self) -> Starlette:
event_store=self._event_store,
json_response=self.settings.json_response,
stateless=self.settings.stateless_http, # Use the stateless setting
webhooks_supported=self.settings.webhooks_supported,
# Use the webhooks supported setting
)

# Create the ASGI handler
Expand Down Expand Up @@ -929,6 +938,7 @@ def my_tool(x: int, ctx: Context) -> str:

_request_context: RequestContext[ServerSessionT, LifespanContextT] | None
_fastmcp: FastMCP | None
has_webhook: bool = False

def __init__(
self,
Expand Down
6 changes: 6 additions & 0 deletions src/mcp/server/fastmcp/tools/tool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,10 @@ async def call_tool(
if not tool:
raise ToolError(f"Unknown tool: {name}")

if context is not None:
try:
context.has_webhook = context.request_context.has_webhook
except Exception:
logger.debug("Request context is not available.")

return await tool.run(arguments, context=context)
7 changes: 6 additions & 1 deletion src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def __init__(
lifespan: Callable[
[Server[LifespanResultT]], AbstractAsyncContextManager[LifespanResultT]
] = lifespan,
webhooks_supported: bool = False,
):
self.name = name
self.version = version
Expand All @@ -144,6 +145,7 @@ def __init__(
}
self.notification_handlers: dict[type, Callable[..., Awaitable[None]]] = {}
self.notification_options = NotificationOptions()
self.webhooks_supported = webhooks_supported
logger.debug(f"Initializing server '{name}'")

def create_initialization_options(
Expand Down Expand Up @@ -199,7 +201,8 @@ def get_capabilities(
# Set tool capabilities if handler exists
if types.ListToolsRequest in self.request_handlers:
tools_capability = types.ToolsCapability(
listChanged=notification_options.tools_changed
listChanged=notification_options.tools_changed,
webhooksSupported=self.webhooks_supported,
)

# Set logging capabilities if handler exists
Expand Down Expand Up @@ -409,6 +412,8 @@ def decorator(

async def handler(req: types.CallToolRequest):
try:
if req.params.webhooks is not None and len(req.params.webhooks) > 0:
self.request_context.has_webhook = True
results = await func(req.params.name, (req.params.arguments or {}))
return types.ServerResult(
types.CallToolResult(content=list(results), isError=False)
Expand Down
180 changes: 179 additions & 1 deletion src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
responses, with streaming support for long-running operations.
"""

import asyncio
import json
import logging
import re
Expand All @@ -24,6 +25,7 @@
from starlette.responses import Response
from starlette.types import Receive, Scope, Send

from mcp.shared._httpx_utils import create_mcp_http_client
from mcp.shared.message import ServerMessageMetadata, SessionMessage
from mcp.types import (
INTERNAL_ERROR,
Expand All @@ -36,6 +38,7 @@
JSONRPCRequest,
JSONRPCResponse,
RequestId,
Webhook,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -136,6 +139,7 @@ def __init__(
self,
mcp_session_id: str | None,
is_json_response_enabled: bool = False,
is_webhooks_supported: bool = False,
event_store: EventStore | None = None,
) -> None:
"""
Expand All @@ -146,6 +150,10 @@ def __init__(
Must contain only visible ASCII characters (0x21-0x7E).
is_json_response_enabled: If True, return JSON responses for requests
instead of SSE streams. Default is False.
is_webhooks_supported: If True and if webhooks are provided in
tools/call request, the client will receive an Accepted
HTTP response and the CallTool response will be sent to
the webhook. Default is False.
event_store: Event store for resumability support. If provided,
resumability will be enabled, allowing clients to
reconnect and resume messages.
Expand All @@ -162,6 +170,7 @@ def __init__(

self.mcp_session_id = mcp_session_id
self.is_json_response_enabled = is_json_response_enabled
self.is_webhooks_supported = is_webhooks_supported
self._event_store = event_store
self._request_streams: dict[
RequestId,
Expand Down Expand Up @@ -410,9 +419,45 @@ async def _handle_post_request(
](0)
request_stream_reader = self._request_streams[request_id][1]

session_message = SessionMessage(message)
webhooks = self._get_webhooks(session_message.message)
if webhooks is not None:
if self.is_webhooks_supported:
result = {
"content": [
{
"type": "text",
"text": "Response will be forwarded to the webhook.",
}
],
"isError": False,
}
response = self._create_json_response(
JSONRPCMessage(
root=JSONRPCResponse(
jsonrpc="2.0", id=message.root.id, result=result
)
),
HTTPStatus.OK,
)
asyncio.create_task(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: we should start the this task before responding with a 200

self._send_response_to_webhooks(
request_id, session_message, webhooks, request_stream_reader
)
)
else:
logger.exception("Webhooks not supported error")
err = "Webhooks not supported"
response = self._create_error_response(
f"Validation error: {err}",
HTTPStatus.BAD_REQUEST,
INVALID_PARAMS,
)
await response(scope, receive, send)
return

if self.is_json_response_enabled:
# Process the message
session_message = SessionMessage(message)
await writer.send(session_message)
try:
# Process messages from the request-specific stream
Expand Down Expand Up @@ -531,6 +576,126 @@ async def sse_writer():
await writer.send(Exception(err))
return

async def _send_response_to_webhooks(
self,
request_id: str,
session_message: SessionMessage,
webhooks: list[Webhook],
request_stream_reader: MemoryObjectReceiveStream[EventMessage],
):
writer = self._read_stream_writer
if writer is None:
raise ValueError(
"No read stream writer available. Ensure connect() is called first."
)
await writer.send(session_message)

try:
response_message = JSONRPCError(
jsonrpc="2.0",
id="server-error", # We don't have a request ID for general errors
error=ErrorData(
code=INTERNAL_ERROR,
message="Error processing request: No response received",
),
)

if self.is_json_response_enabled:
# Process messages from the request-specific stream
# We need to collect all messages until we get a response
async for event_message in request_stream_reader:
# If it's a response, this is what we're waiting for
if isinstance(
event_message.message.root, JSONRPCResponse | JSONRPCError
):
response_message = event_message.message
break
# For notifications and request, keep waiting
else:
logger.debug(f"received: {event_message.message.root.method}")

await self._send_message_to_webhooks(webhooks, response_message)
else:
# Send each event on the request stream as a separate message
async for event_message in request_stream_reader:
event_data = self._create_event_data(event_message)
await self._send_message_to_webhooks(webhooks, event_data)

# If response, remove from pending streams and close
if isinstance(
event_message.message.root,
JSONRPCResponse | JSONRPCError,
):
break

except Exception as e:
logger.exception(f"Error sending response to webhooks: {e}")

finally:
await self._clean_up_memory_streams(request_id)

async def _send_message_to_webhooks(
self,
webhooks: list[Webhook],
message: JSONRPCMessage | JSONRPCError | dict[str, str],
):
for webhook in webhooks:
headers = {"Content-Type": CONTENT_TYPE_JSON}
# Add authorization headers
if webhook.authentication and webhook.authentication.credentials:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: how much of the auth logic is duplicate from what is there for Streamable HTTP? Can auth stuff be abstracted into a helper layer if it is same?

if webhook.authentication.strategy == "bearer":
headers["Authorization"] = (
f"Bearer {webhook.authentication.credentials}"
)
elif webhook.authentication.strategy == "apiKey":
headers["X-API-Key"] = webhook.authentication.credentials
elif webhook.authentication.strategy == "basic":
try:
# Try to parse as JSON
creds_dict = json.loads(webhook.authentication.credentials)
if "username" in creds_dict and "password" in creds_dict:
# Create basic auth header from username and password
import base64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: generally it is not a good idea to do imports in the runtime code like this as things that would fail at server startup may not fail until a request comes in that hits this path. Better to import this up top directly imo despite the fact that it might lead to more deps. btw is base64 part of the standard python libs?


auth_string = (
f"{creds_dict['username']}:{creds_dict['password']}"
)
credentials = base64.b64encode(
auth_string.encode()
).decode()
headers["Authorization"] = f"Basic {credentials}"
except Exception:
# Not JSON, use as-is
headers["Authorization"] = (
f"Basic {webhook.authentication.credentials}"
)
elif (
webhook.authentication.strategy == "customHeader"
and webhook.authentication.credentials
):
try:
custom_headers = json.loads(webhook.authentication.credentials)
headers.update(custom_headers)
except Exception as e:
logger.exception(f"Error setting custom headers: {e}")

async with create_mcp_http_client(headers=headers) as client:
try:
if isinstance(message, JSONRPCMessage | JSONRPCError):
await client.post(
webhook.url,
json=message.model_dump_json(
by_alias=True, exclude_none=True
),
)
else:
await client.post(webhook.url, json=message)

except Exception as e:
logger.exception(
f"Error sending response to webhook {webhook.url}: {e}"
)

async def _handle_get_request(self, request: Request, send: Send) -> None:
"""
Handle GET request to establish SSE.
Expand Down Expand Up @@ -651,6 +816,19 @@ async def _handle_delete_request(self, request: Request, send: Send) -> None:
)
await response(request.scope, request.receive, send)

def _get_webhooks(self, message: JSONRPCMessage) -> list[Webhook] | None:
"""Return webhooks if the request is a call tool request with webhooks."""
if (
isinstance(message.root, JSONRPCRequest)
and message.root.method == "tools/call"
and message.root.params is not None
and "webhooks" in message.root.params
and message.root.params["webhooks"] is not None
and len(message.root.params["webhooks"]) > 0
):
return [Webhook(**webhook) for webhook in message.root.params["webhooks"]]
return None

async def _terminate_session(self) -> None:
"""Terminate the current session, closing all streams.

Expand Down
Loading
Loading