diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java index 8b2c1da5412..9384be40faf 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/AsyncServletOutputStreamWriter.java @@ -121,9 +121,15 @@ public boolean isFinestEnabled() { transportState.runOnTransportThread( () -> { transportState.complete(); - asyncContext.complete(); + // asyncContext.complete(); log.fine("call completed"); }); + // Jetty specific fix: When AsyncContext.complete() is called, Jetty sends a RST_STREAM with + // "cancel" error to the client, while other containers send "no error" in this case. Calling + // close() instead on the output stream still sends the RST_STREAM, but with "no error". Note + // that this does the opposite in at least Tomcat, so we're not going to upstream this change. + // See https://github.com/deephaven/deephaven-core/issues/6400 + outputStream.close(); }; this.isReady = () -> outputStream.isReady(); } diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java index 5be92f0989e..379ee668a20 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java @@ -143,7 +143,10 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers}); } - Long timeoutNanos = headers.get(TIMEOUT_KEY); + // Always ignore grpc-timeout at this time, as the servlet timeout isn't being reset + // when the output stream is closed. See https://github.com/deephaven/deephaven-core/issues/6400 + // for more information. + Long timeoutNanos = null; // headers.get(TIMEOUT_KEY); if (timeoutNanos == null) { timeoutNanos = 0L; } diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebOutputStream.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebOutputStream.java index 8ea70c19022..a2f70b7ac47 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebOutputStream.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebOutputStream.java @@ -17,6 +17,7 @@ import jakarta.servlet.WriteListener; import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; /** * Wraps the usual ServletOutputStream so as to allow downstream writers to use it according to the servlet spec, but @@ -24,13 +25,15 @@ */ public class GrpcWebOutputStream extends ServletOutputStream implements WriteListener { private final ServletOutputStream wrapped; + private final GrpcWebServletResponse grpcWebServletResponse; // Access to these are guarded by synchronized private Runnable waiting; private WriteListener writeListener; - public GrpcWebOutputStream(ServletOutputStream wrapped) { + public GrpcWebOutputStream(ServletOutputStream wrapped, GrpcWebServletResponse grpcWebServletResponse) { this.wrapped = wrapped; + this.grpcWebServletResponse = grpcWebServletResponse; } @Override @@ -97,7 +100,21 @@ public void flush() throws IOException { @Override public void close() throws IOException { - wrapped.close(); + // Since we're a grpc-web response, we must write trailers on our way out as part of close - but trailers + // for grpc-web are a data frame, not HTTP trailers. Call up to the response to write the trailer frame, + // then close the underlying stream. + AtomicReference exception = new AtomicReference<>(); + grpcWebServletResponse.writeTrailers(() -> { + try { + wrapped.close(); + } catch (IOException e) { + exception.set(e); + } + }); + IOException ex = exception.get(); + if (ex != null) { + throw ex; + } } @Override diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletRequest.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletRequest.java index 82dfde664ab..66a25d37a83 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletRequest.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletRequest.java @@ -58,6 +58,8 @@ public AsyncContext startAsync() throws IllegalStateException { public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) throws IllegalStateException { AsyncContext delegate = super.startAsync(servletRequest, servletResponse); + // Note that this anonymous class has no purpose while our workaround for + // https://github.com/deephaven/deephaven-core/issues/6400 is in place. return new DelegatingAsyncContext(delegate) { private void safelyComplete() { try { diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletResponse.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletResponse.java index 2306e3e20f5..68affbeb319 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletResponse.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletResponse.java @@ -60,7 +60,7 @@ public Supplier> getTrailerFields() { public synchronized GrpcWebOutputStream getOutputStream() throws IOException { if (outputStream == null) { // Provide our own output stream instance, so we can control/monitor the write listener - outputStream = new GrpcWebOutputStream(super.getOutputStream()); + outputStream = new GrpcWebOutputStream(super.getOutputStream(), this); } return outputStream; }