-
Notifications
You must be signed in to change notification settings - Fork 475
Adding StreamableHttpServerTransportProvider class and unit tests #290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Adding StreamableHttpServerTransportProvider class and unit tests #290
Conversation
*/ | ||
public class StreamableHttpSseStream { | ||
|
||
private final Sinks.Many<SseEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meant to change this to unicast. Will address.
|
||
private String mcpEndpoint; | ||
|
||
private Supplier<String> sessionIdProvider = () -> UUID.randomUUID().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should change the value here to DEFAULT_SESSION_ID_PROVIDER
. Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey 👋 Thanks for a comprehensive PR! I did my first round focusing on the main themes. Happy to offer guidance to cover the essential aspects (simple/stateful servers, multiple streams per session, lifecycle) if you'd like to push this forward.
|
||
return Flux.fromIterable(sessions.values()) | ||
.flatMap(session -> session.sendNotification(method, params) | ||
.doOnError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should provide a handler so users can provide a callback here.
|
||
@Override | ||
public Mono<Void> closeGracefully() { | ||
isClosing.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be lazy, consider using Mono.defer()
.
isClosing.set(true); | ||
logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()); | ||
|
||
return Flux.fromIterable(sessions.values()).flatMap(McpServerSession::closeGracefully).then(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors should be handled and onErrorComplete should be added for each inner chain
* @throws IOException If an I/O error occurs | ||
*/ | ||
@Override | ||
protected void doGet(HttpServletRequest request, HttpServletResponse response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There needs to be a configuration that allows session-less servers.
Enumeration<String> headerNames = request.getHeaderNames(); | ||
while (headerNames.hasMoreElements()) { | ||
String headerName = headerNames.nextElement(); | ||
logger.debug("Header: {} = {}", headerName, request.getHeader(headerName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log line could be aggregated into one with the debug log for receiving a request so only one line would be appended to the log.
} | ||
|
||
// Handle the message | ||
session.handle(message).block(); // Block for servlet compatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the headers be written first? If we call into the handler first, it should attempt to respond, which might trigger flushing the response before headers are set.
|
||
// For requests that expect responses, we need to set up an SSE stream | ||
if (message instanceof McpSchema.JSONRPCRequest && acceptsEventStream) { | ||
// Set up SSE connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should be encapsulated into how the session decides to go about it. If it decides it's a simple response, no SSE should be started.
AsyncContext asyncContext = request.startAsync(); | ||
asyncContext.setTimeout(0); // No timeout | ||
|
||
StreamableHttpSseStream sseStream = getOrCreateSseStream(sessionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the spec is being slightly violated IMO. We should aim to respect the MAY keyword to the best of our ability. The responses should go on a dedicated SSE stream if a stream is to be used, not the one opened initially with GET. The free hanging GET stream is meant for notifications or requests from the server. The SSE events that deal with the originating request should be sent over the stream associated with this request.
Here's some explanation from the specification on POST:
If the server initiates an SSE stream:
...
The server MAY send JSON-RPC requests and notifications before sending a JSON-RPC response.
These messages SHOULD relate to the originating client request.
These requests and notifications MAY be [batched](https://www.jsonrpc.org/specification#batch).
The server SHOULD NOT close the SSE stream before sending a JSON-RPC response
per each received JSON-RPC request, unless the [session](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#session-management) expires
and for GET:
If the server initiates an SSE stream:
The server MAY send JSON-RPC requests and notifications on the stream.
These requests and notifications MAY be [batched](https://www.jsonrpc.org/specification#batch).
These messages SHOULD be unrelated to any concurrently-running JSON-RPC request from the client.
// Create or get SSE stream for this session | ||
StreamableHttpSseStream sseStream = getOrCreateSseStream(sessionId); | ||
if (lastEventId != null) { | ||
sseStream.replayEventsAfter(lastEventId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, in case of resumption, once the final response is streamed, the SSE stream should be closed.
}).subscribe(); | ||
} | ||
else if (message instanceof McpSchema.JSONRPCRequest) { | ||
// Client doesn't accept SSE, we'll return a regular JSON response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be possible. It's up to the server to decide, the clients are obliged to support SSE.
String lastEventId = request.getHeader(LAST_EVENT_ID_HEADER); | ||
|
||
// Create or get SSE stream for this session | ||
StreamableHttpSseStream sseStream = getOrCreateSseStream(sessionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resumption needs to happen on a brand new SSE stream, otherwise the client is unable to distinguish between different streams.
/** | ||
* Implementation of McpServerTransport for Streamable HTTP sessions. | ||
*/ | ||
private class StreamableHttpServerTransport implements McpServerTransport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am guessing for Streamable HTTP we need a new abstraction, a McpStreamableServerTransport
that has a different set of methods allowing opening a new stream and staying with the McpServerTransport
in case of simple (sessionless) servers.
Thank you very much for all of the input @chemicL! I will begin making changes accordingly this afternoon. |
Still missing:
Motivation and Context
Trying to reach spec parity with TS and Python for Java. Will continue working on other aspects of this.
How Has This Been Tested?
Unit tests and integ tests using a partially-complete sHTTP client transport class, but I didn't include the integ test file as the client isn't complete.
Breaking Changes
N/A
Types of changes
Checklist
Additional context
This is part of a larger commit seen here: #289