diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index e6045170..53b59cb3 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -354,7 +354,7 @@ private Flux extractError(ClientResponse response, Str if (responseException.getStatusCode().isSameCodeAs(HttpStatus.BAD_REQUEST)) { return Mono.error(new McpTransportSessionNotFoundException(sessionRepresentation, toPropagate)); } - return Mono.empty(); + return Mono.error(toPropagate); }).flux(); } diff --git a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java index 43d6f40f..b7a9e4a0 100644 --- a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import org.springframework.context.annotation.Bean; @@ -96,9 +97,11 @@ public void before() { @AfterEach public void after() { + reactor.netty.http.HttpResources.disposeLoopsAndConnections(); if (mcpServerTransportProvider != null) { mcpServerTransportProvider.closeGracefully().block(); } + Schedulers.shutdownNow(); if (tomcatServer.appContext() != null) { tomcatServer.appContext().close(); } @@ -779,6 +782,33 @@ void testToolCallSuccess() { mcpServer.close(); } + @Test + void testThrowingToolCallIsCaughtBeforeTimeout() { + McpSyncServer mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> { + // We trigger a timeout on blocking read, raising an exception + Mono.never().block(Duration.ofSeconds(1)); + return null; + })) + .build(); + + try (var mcpClient = clientBuilder.requestTimeout(Duration.ofMillis(6666)).build()) { + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + // We expect the tool call to fail immediately with the exception raised by + // the offending tool + // instead of getting back a timeout. + assertThatExceptionOfType(McpError.class) + .isThrownBy(() -> mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()))) + .withMessageContaining("Timeout on blocking read"); + } + + mcpServer.close(); + } + @Test void testToolListChangeHandlingSuccess() { diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index ab48fc0f..271f3823 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -421,13 +421,17 @@ public Mono sendMessage(JSONRPCMessage message) { } return this.serializeMessage(message) - .flatMap(body -> sendHttpPost(messageEndpointUri, body)) - .doOnNext(response -> { + .flatMap(body -> sendHttpPost(messageEndpointUri, body).handle((response, sink) -> { if (response.statusCode() != 200 && response.statusCode() != 201 && response.statusCode() != 202 && response.statusCode() != 206) { - logger.error("Error sending message: {}", response.statusCode()); + sink.error(new RuntimeException( + "Sending message failed with a non-OK HTTP code: " + response.statusCode())); } - }) + else { + sink.next(response); + sink.complete(); + } + })) .doOnError(error -> { if (!isClosing) { logger.error("Error sending message: {}", error.getMessage());