Skip to content

Commit

Permalink
fix: Close Jetty h2 streams with RST_STREAM and no error code (#6441)
Browse files Browse the repository at this point in the history
This is a Jetty-specific workaround to avoid irritating the Python gRPC
client into failing calls that had already half-closed successfully.

Since we're now using ServletOutputStream.close() in place of
AsyncContext.complete(), we need to apply the same wrapping for trailers
to close() - that is, when the stream is clsoed, we can't rely on the
servlet container sending our trailers because grpc-web trailers are
actually a DATA frame which must be explicitly written.

See #6400
Fixes #5996
Reapplies #6401
Backport #6424
Backport #6478
  • Loading branch information
niloc132 authored Dec 12, 2024
1 parent e67510f commit a002463
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,15 @@ public boolean isFinestEnabled() {
transportState.runOnTransportThread(
() -> {
transportState.complete();
asyncContext.complete();
// asyncContext.complete();
log.fine("call completed");
});
// Jetty specific fix: When AsyncContext.complete() is called, Jetty sends a RST_STREAM with
// "cancel" error to the client, while other containers send "no error" in this case. Calling
// close() instead on the output stream still sends the RST_STREAM, but with "no error". Note
// that this does the opposite in at least Tomcat, so we're not going to upstream this change.
// See https://github.com/deephaven/deephaven-core/issues/6400
outputStream.close();
};
this.isReady = () -> outputStream.isReady();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx
logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers});
}

Long timeoutNanos = headers.get(TIMEOUT_KEY);
// Always ignore grpc-timeout at this time, as the servlet timeout isn't being reset
// when the output stream is closed. See https://github.com/deephaven/deephaven-core/issues/6400
// for more information.
Long timeoutNanos = null; // headers.get(TIMEOUT_KEY);
if (timeoutNanos == null) {
timeoutNanos = 0L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
import jakarta.servlet.WriteListener;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

/**
* Wraps the usual ServletOutputStream so as to allow downstream writers to use it according to the servlet spec, but
* still make it easy to write trailers as a payload instead of using HTTP trailers at the end of a stream.
*/
public class GrpcWebOutputStream extends ServletOutputStream implements WriteListener {
private final ServletOutputStream wrapped;
private final GrpcWebServletResponse grpcWebServletResponse;

// Access to these are guarded by synchronized
private Runnable waiting;
private WriteListener writeListener;

public GrpcWebOutputStream(ServletOutputStream wrapped) {
public GrpcWebOutputStream(ServletOutputStream wrapped, GrpcWebServletResponse grpcWebServletResponse) {
this.wrapped = wrapped;
this.grpcWebServletResponse = grpcWebServletResponse;
}

@Override
Expand Down Expand Up @@ -97,7 +100,21 @@ public void flush() throws IOException {

@Override
public void close() throws IOException {
wrapped.close();
// Since we're a grpc-web response, we must write trailers on our way out as part of close - but trailers
// for grpc-web are a data frame, not HTTP trailers. Call up to the response to write the trailer frame,
// then close the underlying stream.
AtomicReference<IOException> exception = new AtomicReference<>();
grpcWebServletResponse.writeTrailers(() -> {
try {
wrapped.close();
} catch (IOException e) {
exception.set(e);
}
});
IOException ex = exception.get();
if (ex != null) {
throw ex;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public AsyncContext startAsync() throws IllegalStateException {
public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
throws IllegalStateException {
AsyncContext delegate = super.startAsync(servletRequest, servletResponse);
// Note that this anonymous class has no purpose while our workaround for
// https://github.com/deephaven/deephaven-core/issues/6400 is in place.
return new DelegatingAsyncContext(delegate) {
private void safelyComplete() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Supplier<Map<String, String>> getTrailerFields() {
public synchronized GrpcWebOutputStream getOutputStream() throws IOException {
if (outputStream == null) {
// Provide our own output stream instance, so we can control/monitor the write listener
outputStream = new GrpcWebOutputStream(super.getOutputStream());
outputStream = new GrpcWebOutputStream(super.getOutputStream(), this);
}
return outputStream;
}
Expand Down

0 comments on commit a002463

Please sign in to comment.