Skip to content

Add option for immediate execution in McpSyncServer #371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024-2024 the original author or authors.
* Copyright 2024-2025 the original author or authors.
*/

package io.modelcontextprotocol.server;
Expand Down Expand Up @@ -695,6 +695,8 @@ class SyncSpecification {

private Duration requestTimeout = Duration.ofSeconds(10); // Default timeout

private boolean immediateExecution = false;

private SyncSpecification(McpServerTransportProvider transportProvider) {
Assert.notNull(transportProvider, "Transport provider must not be null");
this.transportProvider = transportProvider;
Expand Down Expand Up @@ -1116,6 +1118,22 @@ public SyncSpecification objectMapper(ObjectMapper objectMapper) {
return this;
}

/**
* Enable on "immediate execution" of the operations on the underlying
* {@link McpAsyncServer}. Defaults to false, which does blocking code offloading
* to prevent accidental blocking of the non-blocking transport.
* <p>
* Do NOT set to true if the underlying transport is a non-blocking
* implementation.
* @param immediateExecution When true, do not offload work asynchronously.
* @return This builder instance for method chaining.
*
*/
public SyncSpecification immediateExecution(boolean immediateExecution) {
this.immediateExecution = immediateExecution;
return this;
}

/**
* Builds a synchronous MCP server that provides blocking operations.
* @return A new instance of {@link McpSyncServer} configured with this builder's
Expand All @@ -1125,12 +1143,13 @@ public McpSyncServer build() {
McpServerFeatures.Sync syncFeatures = new McpServerFeatures.Sync(this.serverInfo, this.serverCapabilities,
this.tools, this.resources, this.resourceTemplates, this.prompts, this.completions,
this.rootsChangeHandlers, this.instructions);
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures);
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures,
this.immediateExecution);
var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
var asyncServer = new McpAsyncServer(this.transportProvider, mapper, asyncFeatures, this.requestTimeout,
this.uriTemplateManagerFactory);

return new McpSyncServer(asyncServer);
return new McpSyncServer(asyncServer, this.immediateExecution);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024-2024 the original author or authors.
* Copyright 2024-2025 the original author or authors.
*/

package io.modelcontextprotocol.server;
Expand Down Expand Up @@ -95,28 +95,30 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
* blocking code offloading to prevent accidental blocking of the non-blocking
* transport.
* @param syncSpec a potentially blocking, synchronous specification.
* @param immediateExecution when true, do not offload. Do NOT set to true when
* using a non-blocking transport.
* @return a specification which is protected from blocking calls specified by the
* user.
*/
static Async fromSync(Sync syncSpec) {
static Async fromSync(Sync syncSpec, boolean immediateExecution) {
List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>();
for (var tool : syncSpec.tools()) {
tools.add(AsyncToolSpecification.fromSync(tool));
tools.add(AsyncToolSpecification.fromSync(tool, immediateExecution));
}

Map<String, AsyncResourceSpecification> resources = new HashMap<>();
syncSpec.resources().forEach((key, resource) -> {
resources.put(key, AsyncResourceSpecification.fromSync(resource));
resources.put(key, AsyncResourceSpecification.fromSync(resource, immediateExecution));
});

Map<String, AsyncPromptSpecification> prompts = new HashMap<>();
syncSpec.prompts().forEach((key, prompt) -> {
prompts.put(key, AsyncPromptSpecification.fromSync(prompt));
prompts.put(key, AsyncPromptSpecification.fromSync(prompt, immediateExecution));
});

Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new HashMap<>();
syncSpec.completions().forEach((key, completion) -> {
completions.put(key, AsyncCompletionSpecification.fromSync(completion));
completions.put(key, AsyncCompletionSpecification.fromSync(completion, immediateExecution));
});

List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootChangeConsumers = new ArrayList<>();
Expand Down Expand Up @@ -239,15 +241,15 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se
public record AsyncToolSpecification(McpSchema.Tool tool,
BiFunction<McpAsyncServerExchange, Map<String, Object>, Mono<McpSchema.CallToolResult>> call) {

static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
static AsyncToolSpecification fromSync(SyncToolSpecification tool, boolean immediate) {
// FIXME: This is temporary, proper validation should be implemented
if (tool == null) {
return null;
}
return new AsyncToolSpecification(tool.tool(),
(exchange, map) -> Mono
.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map))
.subscribeOn(Schedulers.boundedElastic()));
return new AsyncToolSpecification(tool.tool(), (exchange, map) -> {
var toolResult = Mono.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map));
return immediate ? toolResult : toolResult.subscribeOn(Schedulers.boundedElastic());
});
}
}

Expand Down Expand Up @@ -281,15 +283,16 @@ static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
public record AsyncResourceSpecification(McpSchema.Resource resource,
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {

static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (resource == null) {
return null;
}
return new AsyncResourceSpecification(resource.resource(),
(exchange, req) -> Mono
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req))
.subscribeOn(Schedulers.boundedElastic()));
return new AsyncResourceSpecification(resource.resource(), (exchange, req) -> {
var resourceResult = Mono
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req));
return immediateExecution ? resourceResult : resourceResult.subscribeOn(Schedulers.boundedElastic());
});
}
}

Expand Down Expand Up @@ -327,15 +330,16 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
public record AsyncPromptSpecification(McpSchema.Prompt prompt,
BiFunction<McpAsyncServerExchange, McpSchema.GetPromptRequest, Mono<McpSchema.GetPromptResult>> promptHandler) {

static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt) {
static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (prompt == null) {
return null;
}
return new AsyncPromptSpecification(prompt.prompt(),
(exchange, req) -> Mono
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req))
.subscribeOn(Schedulers.boundedElastic()));
return new AsyncPromptSpecification(prompt.prompt(), (exchange, req) -> {
var promptResult = Mono
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req));
return immediateExecution ? promptResult : promptResult.subscribeOn(Schedulers.boundedElastic());
});
}
}

Expand Down Expand Up @@ -366,14 +370,17 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference
* @return an asynchronous wrapper of the provided sync specification, or
* {@code null} if input is null
*/
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion) {
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
boolean immediateExecution) {
if (completion == null) {
return null;
}
return new AsyncCompletionSpecification(completion.referenceKey(),
(exchange, request) -> Mono.fromCallable(
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request))
.subscribeOn(Schedulers.boundedElastic()));
return new AsyncCompletionSpecification(completion.referenceKey(), (exchange, request) -> {
var completionResult = Mono.fromCallable(
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request));
return immediateExecution ? completionResult
: completionResult.subscribeOn(Schedulers.boundedElastic());
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,37 @@ public class McpSyncServer {
*/
private final McpAsyncServer asyncServer;

private final boolean immediateExecution;

/**
* Creates a new synchronous server that wraps the provided async server.
* @param asyncServer The async server to wrap
*/
public McpSyncServer(McpAsyncServer asyncServer) {
this(asyncServer, false);
}

/**
* Creates a new synchronous server that wraps the provided async server.
* @param asyncServer The async server to wrap
* @param immediateExecution Tools, prompts, and resources handlers execute work
* without blocking code offloading. Do NOT set to true if the {@code asyncServer}'s
* transport is non-blocking.
*/
public McpSyncServer(McpAsyncServer asyncServer, boolean immediateExecution) {
Assert.notNull(asyncServer, "Async server must not be null");
this.asyncServer = asyncServer;
this.immediateExecution = immediateExecution;
}

/**
* Add a new tool handler.
* @param toolHandler The tool handler to add
*/
public void addTool(McpServerFeatures.SyncToolSpecification toolHandler) {
this.asyncServer.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler)).block();
this.asyncServer
.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler, this.immediateExecution))
.block();
}

/**
Expand All @@ -84,7 +100,10 @@ public void removeTool(String toolName) {
* @param resourceHandler The resource handler to add
*/
public void addResource(McpServerFeatures.SyncResourceSpecification resourceHandler) {
this.asyncServer.addResource(McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler)).block();
this.asyncServer
.addResource(
McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler, this.immediateExecution))
.block();
}

/**
Expand All @@ -100,7 +119,10 @@ public void removeResource(String resourceUri) {
* @param promptSpecification The prompt specification to add
*/
public void addPrompt(McpServerFeatures.SyncPromptSpecification promptSpecification) {
this.asyncServer.addPrompt(McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification)).block();
this.asyncServer
.addPrompt(
McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification, this.immediateExecution))
.block();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 - 2024 the original author or authors.
* Copyright 2024 - 2025 the original author or authors.
*/
package io.modelcontextprotocol.server.transport;

Expand Down Expand Up @@ -37,7 +37,6 @@
import org.apache.catalina.startup.Tomcat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand All @@ -46,6 +45,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.InstanceOfAssertFactories.type;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -728,6 +728,9 @@ void testToolCallSuccess() {
var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
assertThat(McpTestServletFilter.getThreadLocalValue())
.as("blocking code exectuion should be offloaded")
.isNull();
// perform a blocking call to a remote service
String response = RestClient.create()
.get()
Expand Down Expand Up @@ -758,6 +761,37 @@ void testToolCallSuccess() {
mcpServer.close();
}

@Test
void testToolCallImmediateExecution() {
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
var threadLocalValue = McpTestServletFilter.getThreadLocalValue();
return CallToolResult.builder()
.addTextContent(threadLocalValue != null ? threadLocalValue : "<unset>")
.build();
});

var mcpServer = McpServer.sync(mcpServerTransportProvider)
.capabilities(ServerCapabilities.builder().tools(true).build())
.tools(tool1)
.immediateExecution(true)
.build();

try (var mcpClient = clientBuilder.build()) {
mcpClient.initialize();

CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));

assertThat(response).isNotNull();
assertThat(response.content()).first()
.asInstanceOf(type(McpSchema.TextContent.class))
.extracting(McpSchema.TextContent::text)
.isEqualTo(McpTestServletFilter.THREAD_LOCAL_VALUE);
}

mcpServer.close();
}

@Test
void testToolListChangeHandlingSuccess() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 - 2025 the original author or authors.
*/

package io.modelcontextprotocol.server.transport;

import java.io.IOException;

import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;

/**
* Simple {@link Filter} which sets a value in a thread local. Used to verify whether MCP
* executions happen on the thread processing the request or are offloaded.
*
* @author Daniel Garnier-Moiroux
*/
public class McpTestServletFilter implements Filter {

public static final String THREAD_LOCAL_VALUE = McpTestServletFilter.class.getName();

private static final ThreadLocal<String> holder = new ThreadLocal<>();

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
holder.set(THREAD_LOCAL_VALUE);
try {
filterChain.doFilter(servletRequest, servletResponse);
}
finally {
holder.remove();
}
}

public static String getThreadLocalValue() {
return holder.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import jakarta.servlet.Servlet;
import org.apache.catalina.Context;
import org.apache.catalina.startup.Tomcat;
import org.apache.tomcat.util.descriptor.web.FilterDef;
import org.apache.tomcat.util.descriptor.web.FilterMap;

/**
* @author Christian Tzolov
* @author Daniel Garnier-Moiroux
*/
public class TomcatTestUtil {

Expand All @@ -39,6 +42,16 @@ public static Tomcat createTomcatServer(String contextPath, int port, Servlet se
context.addChild(wrapper);
context.addServletMappingDecoded("/*", "mcpServlet");

var filterDef = new FilterDef();
filterDef.setFilterClass(McpTestServletFilter.class.getName());
filterDef.setFilterName(McpTestServletFilter.class.getSimpleName());
context.addFilterDef(filterDef);

var filterMap = new FilterMap();
filterMap.setFilterName(McpTestServletFilter.class.getSimpleName());
filterMap.addURLPattern("/*");
context.addFilterMap(filterMap);

var connector = tomcat.getConnector();
connector.setAsyncTimeout(3000);

Expand Down
Loading