Skip to content

Commit 29e17d3

Browse files
committed
Add option for immediate execution in McpSyncServer
- The McpSyncServer wraps an async server. By default, reactive operations are scheduled on a bounded-elastic scheduler, to offload blocking work and prevent accidental blocking of non-blocking operations. - With the default behavior, there will be thead ops, even in a blocking context, which means thread-locals from the request thread will be lost. This is inconenvient for frameworks that store state in thread-locals. - This commit adds the ability to avoid offloading, when the user is sure they are executing code in a blocking environment. Work happens in the calling thread, and thread-locals are available throughout the execution.
1 parent c711f83 commit 29e17d3

File tree

7 files changed

+172
-36
lines changed

7 files changed

+172
-36
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024-2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*/
44

55
package io.modelcontextprotocol.server;
@@ -695,6 +695,8 @@ class SyncSpecification {
695695

696696
private Duration requestTimeout = Duration.ofSeconds(10); // Default timeout
697697

698+
private boolean immediateExecution = false;
699+
698700
private SyncSpecification(McpServerTransportProvider transportProvider) {
699701
Assert.notNull(transportProvider, "Transport provider must not be null");
700702
this.transportProvider = transportProvider;
@@ -1116,6 +1118,22 @@ public SyncSpecification objectMapper(ObjectMapper objectMapper) {
11161118
return this;
11171119
}
11181120

1121+
/**
1122+
* Enable on "immediate execution" of the operations on the underlying
1123+
* {@link McpAsyncServer}. Defaults to false, which does blocking code offloading
1124+
* to prevent accidental blocking of the non-blocking transport.
1125+
* <p>
1126+
* When setting to true, the underlying transport should NOT be a reactive,
1127+
* non-blocking implementation.
1128+
* @param immediateExecution When true, do not offload work asynchronously.
1129+
* @return This builder instance for method chaining.
1130+
*
1131+
*/
1132+
public SyncSpecification immediateExecution(boolean immediateExecution) {
1133+
this.immediateExecution = immediateExecution;
1134+
return this;
1135+
}
1136+
11191137
/**
11201138
* Builds a synchronous MCP server that provides blocking operations.
11211139
* @return A new instance of {@link McpSyncServer} configured with this builder's
@@ -1125,12 +1143,13 @@ public McpSyncServer build() {
11251143
McpServerFeatures.Sync syncFeatures = new McpServerFeatures.Sync(this.serverInfo, this.serverCapabilities,
11261144
this.tools, this.resources, this.resourceTemplates, this.prompts, this.completions,
11271145
this.rootsChangeHandlers, this.instructions);
1128-
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures);
1146+
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures,
1147+
this.immediateExecution);
11291148
var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
11301149
var asyncServer = new McpAsyncServer(this.transportProvider, mapper, asyncFeatures, this.requestTimeout,
11311150
this.uriTemplateManagerFactory);
11321151

1133-
return new McpSyncServer(asyncServer);
1152+
return new McpSyncServer(asyncServer, this.immediateExecution);
11341153
}
11351154

11361155
}

mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024-2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*/
44

55
package io.modelcontextprotocol.server;
@@ -95,28 +95,30 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
9595
* blocking code offloading to prevent accidental blocking of the non-blocking
9696
* transport.
9797
* @param syncSpec a potentially blocking, synchronous specification.
98+
* @param immediateExecution when true, do not offload. Only use if you are sure
99+
* you are in a blocking context.
98100
* @return a specification which is protected from blocking calls specified by the
99101
* user.
100102
*/
101-
static Async fromSync(Sync syncSpec) {
103+
static Async fromSync(Sync syncSpec, boolean immediateExecution) {
102104
List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>();
103105
for (var tool : syncSpec.tools()) {
104-
tools.add(AsyncToolSpecification.fromSync(tool));
106+
tools.add(AsyncToolSpecification.fromSync(tool, immediateExecution));
105107
}
106108

107109
Map<String, AsyncResourceSpecification> resources = new HashMap<>();
108110
syncSpec.resources().forEach((key, resource) -> {
109-
resources.put(key, AsyncResourceSpecification.fromSync(resource));
111+
resources.put(key, AsyncResourceSpecification.fromSync(resource, immediateExecution));
110112
});
111113

112114
Map<String, AsyncPromptSpecification> prompts = new HashMap<>();
113115
syncSpec.prompts().forEach((key, prompt) -> {
114-
prompts.put(key, AsyncPromptSpecification.fromSync(prompt));
116+
prompts.put(key, AsyncPromptSpecification.fromSync(prompt, immediateExecution));
115117
});
116118

117119
Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new HashMap<>();
118120
syncSpec.completions().forEach((key, completion) -> {
119-
completions.put(key, AsyncCompletionSpecification.fromSync(completion));
121+
completions.put(key, AsyncCompletionSpecification.fromSync(completion, immediateExecution));
120122
});
121123

122124
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootChangeConsumers = new ArrayList<>();
@@ -239,15 +241,15 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se
239241
public record AsyncToolSpecification(McpSchema.Tool tool,
240242
BiFunction<McpAsyncServerExchange, Map<String, Object>, Mono<McpSchema.CallToolResult>> call) {
241243

242-
static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
244+
static AsyncToolSpecification fromSync(SyncToolSpecification tool, boolean immediate) {
243245
// FIXME: This is temporary, proper validation should be implemented
244246
if (tool == null) {
245247
return null;
246248
}
247-
return new AsyncToolSpecification(tool.tool(),
248-
(exchange, map) -> Mono
249-
.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map))
250-
.subscribeOn(Schedulers.boundedElastic()));
249+
return new AsyncToolSpecification(tool.tool(), (exchange, map) -> {
250+
var toolResult = Mono.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map));
251+
return immediate ? toolResult : toolResult.subscribeOn(Schedulers.boundedElastic());
252+
});
251253
}
252254
}
253255

@@ -281,15 +283,16 @@ static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
281283
public record AsyncResourceSpecification(McpSchema.Resource resource,
282284
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {
283285

284-
static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
286+
static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, boolean immediateExecution) {
285287
// FIXME: This is temporary, proper validation should be implemented
286288
if (resource == null) {
287289
return null;
288290
}
289-
return new AsyncResourceSpecification(resource.resource(),
290-
(exchange, req) -> Mono
291-
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req))
292-
.subscribeOn(Schedulers.boundedElastic()));
291+
return new AsyncResourceSpecification(resource.resource(), (exchange, req) -> {
292+
var resourceResult = Mono
293+
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req));
294+
return immediateExecution ? resourceResult : resourceResult.subscribeOn(Schedulers.boundedElastic());
295+
});
293296
}
294297
}
295298

@@ -327,15 +330,16 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
327330
public record AsyncPromptSpecification(McpSchema.Prompt prompt,
328331
BiFunction<McpAsyncServerExchange, McpSchema.GetPromptRequest, Mono<McpSchema.GetPromptResult>> promptHandler) {
329332

330-
static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt) {
333+
static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
331334
// FIXME: This is temporary, proper validation should be implemented
332335
if (prompt == null) {
333336
return null;
334337
}
335-
return new AsyncPromptSpecification(prompt.prompt(),
336-
(exchange, req) -> Mono
337-
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req))
338-
.subscribeOn(Schedulers.boundedElastic()));
338+
return new AsyncPromptSpecification(prompt.prompt(), (exchange, req) -> {
339+
var promptResult = Mono
340+
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req));
341+
return immediateExecution ? promptResult : promptResult.subscribeOn(Schedulers.boundedElastic());
342+
});
339343
}
340344
}
341345

@@ -366,14 +370,17 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference
366370
* @return an asynchronous wrapper of the provided sync specification, or
367371
* {@code null} if input is null
368372
*/
369-
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion) {
373+
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
374+
boolean immediateExecution) {
370375
if (completion == null) {
371376
return null;
372377
}
373-
return new AsyncCompletionSpecification(completion.referenceKey(),
374-
(exchange, request) -> Mono.fromCallable(
375-
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request))
376-
.subscribeOn(Schedulers.boundedElastic()));
378+
return new AsyncCompletionSpecification(completion.referenceKey(), (exchange, request) -> {
379+
var completionResult = Mono.fromCallable(
380+
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request));
381+
return immediateExecution ? completionResult
382+
: completionResult.subscribeOn(Schedulers.boundedElastic());
383+
});
377384
}
378385
}
379386

mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,36 @@ public class McpSyncServer {
5454
*/
5555
private final McpAsyncServer asyncServer;
5656

57+
private final boolean immediateExecution;
58+
5759
/**
5860
* Creates a new synchronous server that wraps the provided async server.
5961
* @param asyncServer The async server to wrap
6062
*/
6163
public McpSyncServer(McpAsyncServer asyncServer) {
64+
this(asyncServer, false);
65+
}
66+
67+
/**
68+
* Creates a new synchronous server that wraps the provided async server.
69+
* @param asyncServer The async server to wrap
70+
* @param immediateExecution Tools, prompts, and resources handlers execute work
71+
* without blocking code offloading.
72+
*/
73+
public McpSyncServer(McpAsyncServer asyncServer, boolean immediateExecution) {
6274
Assert.notNull(asyncServer, "Async server must not be null");
6375
this.asyncServer = asyncServer;
76+
this.immediateExecution = immediateExecution;
6477
}
6578

6679
/**
6780
* Add a new tool handler.
6881
* @param toolHandler The tool handler to add
6982
*/
7083
public void addTool(McpServerFeatures.SyncToolSpecification toolHandler) {
71-
this.asyncServer.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler)).block();
84+
this.asyncServer
85+
.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler, this.immediateExecution))
86+
.block();
7287
}
7388

7489
/**
@@ -84,7 +99,10 @@ public void removeTool(String toolName) {
8499
* @param resourceHandler The resource handler to add
85100
*/
86101
public void addResource(McpServerFeatures.SyncResourceSpecification resourceHandler) {
87-
this.asyncServer.addResource(McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler)).block();
102+
this.asyncServer
103+
.addResource(
104+
McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler, this.immediateExecution))
105+
.block();
88106
}
89107

90108
/**
@@ -100,7 +118,10 @@ public void removeResource(String resourceUri) {
100118
* @param promptSpecification The prompt specification to add
101119
*/
102120
public void addPrompt(McpServerFeatures.SyncPromptSpecification promptSpecification) {
103-
this.asyncServer.addPrompt(McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification)).block();
121+
this.asyncServer
122+
.addPrompt(
123+
McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification, this.immediateExecution))
124+
.block();
104125
}
105126

106127
/**

mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ void testRemoveTool() {
155155
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
156156
}
157157

158+
// TODO: call tool? -> verify thread local still present
159+
158160
@Test
159161
void testRemoveNonexistentTool() {
160162
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())

mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 - 2024 the original author or authors.
2+
* Copyright 2024 - 2025 the original author or authors.
33
*/
44
package io.modelcontextprotocol.server.transport;
55

@@ -37,7 +37,6 @@
3737
import org.apache.catalina.startup.Tomcat;
3838
import org.junit.jupiter.api.AfterEach;
3939
import org.junit.jupiter.api.BeforeEach;
40-
import org.junit.jupiter.api.Disabled;
4140
import org.junit.jupiter.api.Test;
4241
import reactor.core.publisher.Mono;
4342
import reactor.test.StepVerifier;
@@ -46,6 +45,7 @@
4645

4746
import static org.assertj.core.api.Assertions.assertThat;
4847
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
48+
import static org.assertj.core.api.InstanceOfAssertFactories.type;
4949
import static org.awaitility.Awaitility.await;
5050
import static org.mockito.Mockito.mock;
5151

@@ -728,6 +728,8 @@ void testToolCallSuccess() {
728728
var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
729729
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
730730
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
731+
assertThat(TestFilter.getThreadLocalValue()).as("blocking code exectuion should be offloaded")
732+
.isNull();
731733
// perform a blocking call to a remote service
732734
String response = RestClient.create()
733735
.get()
@@ -758,6 +760,37 @@ void testToolCallSuccess() {
758760
mcpServer.close();
759761
}
760762

763+
@Test
764+
void testToolCallImmediateExecution() {
765+
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
766+
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
767+
var threadLocalValue = TestFilter.getThreadLocalValue();
768+
return CallToolResult.builder()
769+
.addTextContent(threadLocalValue != null ? threadLocalValue : "<unset>")
770+
.build();
771+
});
772+
773+
var mcpServer = McpServer.sync(mcpServerTransportProvider)
774+
.capabilities(ServerCapabilities.builder().tools(true).build())
775+
.tools(tool1)
776+
.immediateExecution(true)
777+
.build();
778+
779+
try (var mcpClient = clientBuilder.build()) {
780+
mcpClient.initialize();
781+
782+
CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
783+
784+
assertThat(response).isNotNull();
785+
assertThat(response.content()).first()
786+
.asInstanceOf(type(McpSchema.TextContent.class))
787+
.extracting(McpSchema.TextContent::text)
788+
.isEqualTo(TestFilter.THREAD_LOCAL_VALUE);
789+
}
790+
791+
mcpServer.close();
792+
}
793+
761794
@Test
762795
void testToolListChangeHandlingSuccess() {
763796

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2025 - 2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.server.transport;
6+
7+
import jakarta.servlet.Filter;
8+
import jakarta.servlet.FilterChain;
9+
import jakarta.servlet.ServletException;
10+
import jakarta.servlet.ServletRequest;
11+
import jakarta.servlet.ServletResponse;
12+
import java.io.IOException;
13+
14+
/**
15+
* Simple {@link Filter} which sets a value in a thread local. Used to verify that MCP
16+
* executions happen on the thread processing the request.
17+
*
18+
* @author Daniel Garnier-Moiroux
19+
*/
20+
public class TestFilter implements Filter {
21+
22+
public static final String THREAD_LOCAL_VALUE = TestFilter.class.getName();
23+
24+
private static final ThreadLocal<String> holder = new ThreadLocal<>();
25+
26+
@Override
27+
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
28+
throws IOException, ServletException {
29+
holder.set(THREAD_LOCAL_VALUE);
30+
try {
31+
filterChain.doFilter(servletRequest, servletResponse);
32+
}
33+
finally {
34+
holder.remove();
35+
}
36+
}
37+
38+
public static String getThreadLocalValue() {
39+
return holder.get();
40+
}
41+
42+
}

0 commit comments

Comments
 (0)