Skip to content

Commit 703d363

Browse files
committed
McpSyncClient: introduce McpTransportContext
- McpSyncClient should be considered thread-agnostic, and therefore consumers cannot rely on thread locals to propagate "context", e.g. pass down the Servlet request reference in a server context. - This PR introduces a mechanism for populating an McpTransportContext before executing client operations, and reworks the HTTP request customizers to leverage that McpTransportContext. - This introduces a breaking change to the Sync/Async request customizers.
1 parent 95ba8e7 commit 703d363

15 files changed

+325
-76
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@
1111
import java.util.Map;
1212
import java.util.function.Consumer;
1313
import java.util.function.Function;
14+
import java.util.function.Supplier;
1415

16+
import io.modelcontextprotocol.server.McpTransportContext;
1517
import io.modelcontextprotocol.spec.McpClientTransport;
1618
import io.modelcontextprotocol.spec.McpSchema;
17-
import io.modelcontextprotocol.spec.McpTransport;
1819
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
1920
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
2021
import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult;
2122
import io.modelcontextprotocol.spec.McpSchema.ElicitRequest;
2223
import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
2324
import io.modelcontextprotocol.spec.McpSchema.Implementation;
2425
import io.modelcontextprotocol.spec.McpSchema.Root;
26+
import io.modelcontextprotocol.spec.McpTransport;
2527
import io.modelcontextprotocol.util.Assert;
2628
import reactor.core.publisher.Mono;
2729

@@ -183,6 +185,8 @@ class SyncSpec {
183185

184186
private Function<ElicitRequest, ElicitResult> elicitationHandler;
185187

188+
private Supplier<McpTransportContext> contextProvider = McpTransportContext.EMPTY::copy;
189+
186190
private SyncSpec(McpClientTransport transport) {
187191
Assert.notNull(transport, "Transport must not be null");
188192
this.transport = transport;
@@ -409,6 +413,18 @@ public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>>
409413
return this;
410414
}
411415

416+
/**
417+
* Add a provider of {@link McpTransportContext}, providing a context before
418+
* calling any client operation. This allows to extract thread-locals and hand
419+
* them over to the underlying transport.
420+
* @param contextProvider A supplier to create a context
421+
* @return This builder for method chaining
422+
*/
423+
public SyncSpec transportContextProvider(Supplier<McpTransportContext> contextProvider) {
424+
this.contextProvider = contextProvider;
425+
return this;
426+
}
427+
412428
/**
413429
* Create an instance of {@link McpSyncClient} with the provided configurations or
414430
* sensible defaults.
@@ -423,7 +439,8 @@ public McpSyncClient build() {
423439
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
424440

425441
return new McpSyncClient(
426-
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
442+
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures),
443+
this.contextProvider);
427444
}
428445

429446
}

mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package io.modelcontextprotocol.client;
66

77
import java.time.Duration;
8+
import java.util.function.Supplier;
89

910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

13+
import io.modelcontextprotocol.server.McpTransportContext;
1214
import io.modelcontextprotocol.spec.McpSchema;
1315
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
1416
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
@@ -63,14 +65,20 @@ public class McpSyncClient implements AutoCloseable {
6365

6466
private final McpAsyncClient delegate;
6567

68+
private final Supplier<McpTransportContext> contextProvider;
69+
6670
/**
6771
* Create a new McpSyncClient with the given delegate.
6872
* @param delegate the asynchronous kernel on top of which this synchronous client
6973
* provides a blocking API.
74+
* @param contextProvider the supplier of context before calling any non-blocking
75+
* operation on underlying delegate
7076
*/
71-
McpSyncClient(McpAsyncClient delegate) {
77+
McpSyncClient(McpAsyncClient delegate, Supplier<McpTransportContext> contextProvider) {
7278
Assert.notNull(delegate, "The delegate can not be null");
79+
Assert.notNull(contextProvider, "The contextProvider can not be null");
7380
this.delegate = delegate;
81+
this.contextProvider = contextProvider;
7482
}
7583

7684
/**
@@ -177,36 +185,43 @@ public boolean closeGracefully() {
177185
public McpSchema.InitializeResult initialize() {
178186
// TODO: block takes no argument here as we assume the async client is
179187
// configured with a requestTimeout at all times
180-
return this.delegate.initialize().block();
188+
var context = this.contextProvider.get();
189+
return this.delegate.initialize().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
181190
}
182191

183192
/**
184193
* Send a roots/list_changed notification.
185194
*/
186195
public void rootsListChangedNotification() {
187-
this.delegate.rootsListChangedNotification().block();
196+
var context = this.contextProvider.get();
197+
this.delegate.rootsListChangedNotification()
198+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
199+
.block();
188200
}
189201

190202
/**
191203
* Add a roots dynamically.
192204
*/
193205
public void addRoot(McpSchema.Root root) {
194-
this.delegate.addRoot(root).block();
206+
var context = this.contextProvider.get();
207+
this.delegate.addRoot(root).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
195208
}
196209

197210
/**
198211
* Remove a root dynamically.
199212
*/
200213
public void removeRoot(String rootUri) {
201-
this.delegate.removeRoot(rootUri).block();
214+
var context = this.contextProvider.get();
215+
this.delegate.removeRoot(rootUri).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
202216
}
203217

204218
/**
205219
* Send a synchronous ping request.
206220
* @return
207221
*/
208222
public Object ping() {
209-
return this.delegate.ping().block();
223+
var context = this.contextProvider.get();
224+
return this.delegate.ping().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
210225
}
211226

212227
// --------------------------
@@ -224,7 +239,11 @@ public Object ping() {
224239
* Boolean indicating if the execution failed (true) or succeeded (false/absent)
225240
*/
226241
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
227-
return this.delegate.callTool(callToolRequest).block();
242+
var context = this.contextProvider.get();
243+
return this.delegate.callTool(callToolRequest)
244+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
245+
.block();
246+
228247
}
229248

230249
/**
@@ -234,7 +253,8 @@ public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolReque
234253
* pagination if more tools are available
235254
*/
236255
public McpSchema.ListToolsResult listTools() {
237-
return this.delegate.listTools().block();
256+
var context = this.contextProvider.get();
257+
return this.delegate.listTools().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
238258
}
239259

240260
/**
@@ -245,7 +265,9 @@ public McpSchema.ListToolsResult listTools() {
245265
* pagination if more tools are available
246266
*/
247267
public McpSchema.ListToolsResult listTools(String cursor) {
248-
return this.delegate.listTools(cursor).block();
268+
var context = this.contextProvider.get();
269+
return this.delegate.listTools(cursor).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
270+
249271
}
250272

251273
// --------------------------
@@ -257,7 +279,9 @@ public McpSchema.ListToolsResult listTools(String cursor) {
257279
* @return The list of all resources result
258280
*/
259281
public McpSchema.ListResourcesResult listResources() {
260-
return this.delegate.listResources().block();
282+
var context = this.contextProvider.get();
283+
return this.delegate.listResources().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
284+
261285
}
262286

263287
/**
@@ -266,7 +290,11 @@ public McpSchema.ListResourcesResult listResources() {
266290
* @return The list of resources result
267291
*/
268292
public McpSchema.ListResourcesResult listResources(String cursor) {
269-
return this.delegate.listResources(cursor).block();
293+
var context = this.contextProvider.get();
294+
return this.delegate.listResources(cursor)
295+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
296+
.block();
297+
270298
}
271299

272300
/**
@@ -275,7 +303,11 @@ public McpSchema.ListResourcesResult listResources(String cursor) {
275303
* @return the resource content.
276304
*/
277305
public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
278-
return this.delegate.readResource(resource).block();
306+
var context = this.contextProvider.get();
307+
return this.delegate.readResource(resource)
308+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
309+
.block();
310+
279311
}
280312

281313
/**
@@ -284,15 +316,23 @@ public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
284316
* @return the resource content.
285317
*/
286318
public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {
287-
return this.delegate.readResource(readResourceRequest).block();
319+
var context = this.contextProvider.get();
320+
return this.delegate.readResource(readResourceRequest)
321+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
322+
.block();
323+
288324
}
289325

290326
/**
291327
* Retrieves the list of all resource templates provided by the server.
292328
* @return The list of all resource templates result.
293329
*/
294330
public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
295-
return this.delegate.listResourceTemplates().block();
331+
var context = this.contextProvider.get();
332+
return this.delegate.listResourceTemplates()
333+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
334+
.block();
335+
296336
}
297337

298338
/**
@@ -304,7 +344,11 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
304344
* @return The list of resource templates result.
305345
*/
306346
public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {
307-
return this.delegate.listResourceTemplates(cursor).block();
347+
var context = this.contextProvider.get();
348+
return this.delegate.listResourceTemplates(cursor)
349+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
350+
.block();
351+
308352
}
309353

310354
/**
@@ -317,7 +361,11 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor
317361
* subscribe to.
318362
*/
319363
public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
320-
this.delegate.subscribeResource(subscribeRequest).block();
364+
var context = this.contextProvider.get();
365+
this.delegate.subscribeResource(subscribeRequest)
366+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
367+
.block();
368+
321369
}
322370

323371
/**
@@ -326,7 +374,11 @@ public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
326374
* to unsubscribe from.
327375
*/
328376
public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
329-
this.delegate.unsubscribeResource(unsubscribeRequest).block();
377+
var context = this.contextProvider.get();
378+
this.delegate.unsubscribeResource(unsubscribeRequest)
379+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
380+
.block();
381+
330382
}
331383

332384
// --------------------------
@@ -338,7 +390,8 @@ public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest)
338390
* @return The list of all prompts result.
339391
*/
340392
public ListPromptsResult listPrompts() {
341-
return this.delegate.listPrompts().block();
393+
var context = this.contextProvider.get();
394+
return this.delegate.listPrompts().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
342395
}
343396

344397
/**
@@ -347,19 +400,29 @@ public ListPromptsResult listPrompts() {
347400
* @return The list of prompts result.
348401
*/
349402
public ListPromptsResult listPrompts(String cursor) {
350-
return this.delegate.listPrompts(cursor).block();
403+
var context = this.contextProvider.get();
404+
return this.delegate.listPrompts(cursor).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
405+
351406
}
352407

353408
public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
354-
return this.delegate.getPrompt(getPromptRequest).block();
409+
var context = this.contextProvider.get();
410+
return this.delegate.getPrompt(getPromptRequest)
411+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
412+
.block();
413+
355414
}
356415

357416
/**
358417
* Client can set the minimum logging level it wants to receive from the server.
359418
* @param loggingLevel the min logging level
360419
*/
361420
public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
362-
this.delegate.setLoggingLevel(loggingLevel).block();
421+
var context = this.contextProvider.get();
422+
this.delegate.setLoggingLevel(loggingLevel)
423+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
424+
.block();
425+
363426
}
364427

365428
/**
@@ -369,7 +432,11 @@ public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
369432
* @return the completion result containing suggested values.
370433
*/
371434
public McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest) {
372-
return this.delegate.completeCompletion(completeRequest).block();
435+
var context = this.contextProvider.get();
436+
return this.delegate.completeCompletion(completeRequest)
437+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
438+
.block();
439+
373440
}
374441

375442
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpRequestCustomizer;
2626
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpRequestCustomizer;
2727
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
28+
import io.modelcontextprotocol.server.McpTransportContext;
2829
import io.modelcontextprotocol.spec.McpClientTransport;
2930
import io.modelcontextprotocol.spec.McpSchema;
30-
import io.modelcontextprotocol.spec.ProtocolVersions;
3131
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
3232
import io.modelcontextprotocol.spec.McpTransportException;
33+
import io.modelcontextprotocol.spec.ProtocolVersions;
3334
import io.modelcontextprotocol.util.Assert;
3435
import io.modelcontextprotocol.util.Utils;
3536
import reactor.core.Disposable;
@@ -410,14 +411,15 @@ public HttpClientSseClientTransport build() {
410411
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
411412
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
412413

413-
return Mono.defer(() -> {
414+
return Mono.deferContextual(ctx -> {
414415
var builder = requestBuilder.copy()
415416
.uri(uri)
416417
.header("Accept", "text/event-stream")
417418
.header("Cache-Control", "no-cache")
418419
.header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
419420
.GET();
420-
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null));
421+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
422+
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext.copy()));
421423
}).flatMap(requestBuilder -> Mono.create(sink -> {
422424
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
423425
.sendAsync(requestBuilder.build(),
@@ -538,13 +540,15 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {
538540

539541
private Mono<HttpResponse<String>> sendHttpPost(final String endpoint, final String body) {
540542
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
541-
return Mono.defer(() -> {
543+
return Mono.deferContextual(ctx -> {
542544
var builder = this.requestBuilder.copy()
543545
.uri(requestUri)
544546
.header("Content-Type", "application/json")
545547
.header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
546548
.POST(HttpRequest.BodyPublishers.ofString(body));
547-
return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body));
549+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
550+
return Mono
551+
.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body, transportContext.copy()));
548552
}).flatMap(customizedBuilder -> {
549553
var request = customizedBuilder.build();
550554
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));

0 commit comments

Comments
 (0)