From ec519c484ea9ed17cefc320822359d9aa4677444 Mon Sep 17 00:00:00 2001 From: yawkat Date: Mon, 20 Nov 2023 15:56:39 +0100 Subject: [PATCH 1/5] Replace chunked write API This PR replaces the writeChunked and writeFile APIs with a new writeStream API that takes an InputStream. This removes the need for the ChunkedWriteHandler. Chunked writes were used for two purposes: Sending file regions and sending InputStreams. This has always complicated the HTTP pipeline somewhat as the pipeline had to deal with not just HttpContent objects but also ChunkedInput and FileRegion objects. This PR replaces the machinery for InputStream writing with a more straight-forward solution that reads the data on the IO thread and then sends it down the channel. Additionally, the file-specific APIs based on RandomAccessFile are removed. The body writer now just creates an InputStream for the file region in question and sends that. This removes support for zero-copy transfers, however that is a niche feature anyway because it doesn't work with TLS or HTTP/2. If someone wants a performant HTTP server, HTTP/2 takes priority over zero-copy so it makes little sense. This PR may have small conflicts with #10131 as that PR changed the PipeliningServerHandler body handling a little bit. Otherwise this PR should have no visible impact on users. --- .../http/netty/body/NettyWriteContext.java | 23 +- .../server/netty/HttpPipelineBuilder.java | 11 +- .../netty/body/InputStreamBodyWriter.java | 11 +- .../netty/body/StreamFileBodyWriter.java | 13 +- .../netty/body/SystemFileBodyWriter.java | 131 ++++++++-- .../handler/PipeliningServerHandler.java | 247 ++++++++++-------- .../accesslog/HttpAccessLogHandler.java | 9 +- .../netty/stream/StreamPressureSpec.groovy | 101 +++++++ 8 files changed, 373 insertions(+), 173 deletions(-) create mode 100644 http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/StreamPressureSpec.groovy diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java index 1493a528788..f3b81043a57 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java @@ -20,12 +20,12 @@ import io.micronaut.core.annotation.NonNull; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponse; import org.reactivestreams.Publisher; -import java.io.RandomAccessFile; +import java.io.InputStream; +import java.util.concurrent.ExecutorService; /** * This interface is used to write the different kinds of netty responses. @@ -68,20 +68,11 @@ default void writeFull(@NonNull FullHttpResponse response) { void writeStreamed(@NonNull HttpResponse response, @NonNull Publisher content); /** - * Write a response with a {@link HttpChunkedInput} body. + * Write a response with a body that is a blocking stream. * - * @param response The response. Must not be a {@link FullHttpResponse} - * @param chunkedInput The response body + * @param response The response. Must not be a {@link FullHttpResponse} + * @param stream The stream to read from + * @param executorService The executor for IO operations */ - void writeChunked(@NonNull HttpResponse response, @NonNull HttpChunkedInput chunkedInput); - - /** - * Write a response with a body that is a section of a {@link RandomAccessFile}. - * - * @param response The response. Must not be a {@link FullHttpResponse} - * @param randomAccessFile File to read from - * @param position Start position - * @param contentLength Length of the section to send - */ - void writeFile(@NonNull HttpResponse response, @NonNull RandomAccessFile randomAccessFile, long position, long contentLength); + void writeStream(@NonNull HttpResponse response, @NonNull InputStream stream, @NonNull ExecutorService executorService); } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java index eb472a7325e..fe6fd2e9a20 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java @@ -61,7 +61,6 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandshakeCompletionEvent; -import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import io.netty.incubator.codec.http3.Http3; import io.netty.incubator.codec.http3.Http3FrameToHttpObjectCodec; @@ -587,23 +586,20 @@ private void insertHttp2DownstreamHandlers() { registerMicronautChannelHandlers(); - insertMicronautHandlers(false); + insertMicronautHandlers(); } /** * Insert the handlers that manage the micronaut message handling, e.g. conversion between micronaut requests * and netty requests, and routing. */ - private void insertMicronautHandlers(boolean zeroCopySupported) { + private void insertMicronautHandlers() { channel.attr(STREAM_PIPELINE_ATTRIBUTE.get()).set(this); if (sslHandler != null) { channel.attr(CERTIFICATE_SUPPLIER_ATTRIBUTE.get()).set(sslHandler.findPeerCert()); } SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy()); - if (zeroCopySupported) { - channel.attr(PipeliningServerHandler.ZERO_COPY_PREDICATE.get()).set(contentCompressor); - } pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor); pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_DECOMPRESSOR, new HttpContentDecompressor()); @@ -619,7 +615,6 @@ private void insertMicronautHandlers(boolean zeroCopySupported) { ) ); } - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK, new ChunkedWriteHandler()); // todo: move to PipeliningServerHandler RequestHandler requestHandler = routingInBoundHandler; if (webSocketUpgradeHandler.isPresent()) { @@ -645,7 +640,7 @@ private void insertHttp1DownstreamHandlers() { registerMicronautChannelHandlers(); pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_KEEP_ALIVE, new HttpServerKeepAliveHandler()); - insertMicronautHandlers(sslHandler == null); + insertMicronautHandlers(); } /** diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java index 37d12372161..3ed2d76d223 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java @@ -27,13 +27,14 @@ import io.micronaut.http.netty.body.NettyBodyWriter; import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; +import io.micronaut.scheduling.TaskExecutors; import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; -import io.netty.handler.stream.ChunkedStream; +import jakarta.inject.Named; import jakarta.inject.Singleton; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.ExecutorService; /** * Body writer for {@link InputStream}s. @@ -45,9 +46,11 @@ @Experimental @Singleton public final class InputStreamBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter { + private final ExecutorService executorService; - InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) { + InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService executorService) { super(configuration); + this.executorService = executorService; } @Override @@ -59,7 +62,7 @@ public void writeTo(HttpRequest request, MutableHttpResponse out nettyResponse.getNettyHeaders() ); // can be null if the stream was closed - nettyContext.writeChunked(finalResponse, new HttpChunkedInput(new ChunkedStream(object))); + nettyContext.writeStream(finalResponse, object, executorService); } else { throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + outgoingResponse); } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java index d5d0b27f780..4ec84d7fdf0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java @@ -28,16 +28,17 @@ import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; import io.micronaut.http.server.types.files.StreamedFile; +import io.micronaut.scheduling.TaskExecutors; import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.stream.ChunkedStream; +import jakarta.inject.Named; import jakarta.inject.Singleton; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.ExecutorService; /** * Body writer for {@link StreamedFile}s. @@ -49,8 +50,11 @@ @Experimental @Internal public final class StreamFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter { - StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) { + private final ExecutorService ioExecutor; + + StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { super(configuration); + this.ioExecutor = ioExecutor; } @Override @@ -72,8 +76,7 @@ public void writeTo(HttpRequest request, MutableHttpResponse ou nettyHeaders ); InputStream inputStream = object.getInputStream(); - HttpChunkedInput chunkedInput = new HttpChunkedInput(new ChunkedStream(inputStream)); - nettyContext.writeChunked(finalResponse, chunkedInput); + nettyContext.writeStream(finalResponse, inputStream, ioExecutor); } } else { diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java index 1c90b09a826..34871654eff 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java @@ -33,15 +33,23 @@ import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; import io.micronaut.http.server.types.files.SystemFile; +import io.micronaut.scheduling.TaskExecutors; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; +import jakarta.inject.Named; import jakarta.inject.Singleton; +import org.jetbrains.annotations.NotNull; +import java.io.EOFException; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.RandomAccessFile; +import java.util.concurrent.ExecutorService; import static io.micronaut.http.HttpHeaders.CONTENT_RANGE; @@ -57,8 +65,11 @@ public final class SystemFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter { private static final String UNIT_BYTES = "bytes"; - public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) { + private final ExecutorService ioExecutor; + + public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { super(configuration); + this.ioExecutor = ioExecutor; } @Override @@ -113,34 +124,22 @@ public void writeTo(HttpRequest request, MutableHttpResponse resp // Write the request data final DefaultHttpResponse finalResponse = new DefaultHttpResponse(nettyResponse.getNettyHttpVersion(), nettyResponse.getNettyHttpStatus(), nettyResponse.getNettyHeaders()); - writeFile(systemFile, nettyContext, position, contentLength, finalResponse); + + File file = systemFile.getFile(); + InputStream is; + try { + is = new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new MessageBodyException("Could not find file", e); + } + + nettyContext.writeStream(finalResponse, new RangeInputStream(is, position, contentLength), ioExecutor); } } else { throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + response); } } - private static void writeFile(SystemFile systemFile, NettyWriteContext context, long position, long contentLength, DefaultHttpResponse finalResponse) { - // Write the content. - File file = systemFile.getFile(); - RandomAccessFile randomAccessFile = open(file); - - context.writeFile( - finalResponse, - randomAccessFile, - position, - contentLength - ); - } - - private static RandomAccessFile open(File file) { - try { - return new RandomAccessFile(file, "r"); - } catch (FileNotFoundException e) { - throw new MessageBodyException("Could not find file", e); - } - } - @Nullable private static IntRange parseRangeHeader(String value, long contentLength) { int equalsIdx = value.indexOf('='); @@ -175,4 +174,90 @@ private static class IntRange { } } + private static class RafInputStream extends InputStream { + private final RandomAccessFile raf; + + RafInputStream(RandomAccessFile raf) { + this.raf = raf; + } + + @Override + public int read() throws IOException { + return raf.read(); + } + + @Override + public int read(@NotNull byte[] b, int off, int len) throws IOException { + return raf.read(b, off, len); + } + } + + private static final class RangeInputStream extends InputStream { + private final InputStream delegate; + private final long toSkip; + private long remainingLength; + private boolean skipped = false; + private boolean skipSuccess = false; + + private RangeInputStream(InputStream delegate, long toSkip, long length) { + this.delegate = delegate; + this.toSkip = toSkip; + this.remainingLength = length; + + if (toSkip == 0) { + skipped = true; + skipSuccess = true; + } + } + + private boolean doSkip() throws IOException { + if (!skipped) { + skipped = true; + try { + delegate.skipNBytes(toSkip); + skipSuccess = true; + } catch (EOFException ignored) { + } + } + return skipSuccess; + } + + @Override + public int read() throws IOException { + if (!doSkip()) { + return -1; + } + if (remainingLength <= 0) { + return -1; + } + int read = delegate.read(); + if (read != -1) { + remainingLength--; + } + return read; + } + + @Override + public int read(@NotNull byte[] b, int off, int len) throws IOException { + if (!doSkip()) { + return -1; + } + if (remainingLength <= 0) { + return -1; + } + if (len > remainingLength) { + len = (int) remainingLength; + } + int n = delegate.read(b, off, len); + if (n != -1) { + remainingLength -= n; + } + return n; + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index d2121ce377d..535f1d9cacd 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -18,13 +18,10 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; -import io.micronaut.core.util.SupplierUtil; -import io.micronaut.http.exceptions.MessageBodyException; import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.netty.stream.DelegateStreamedHttpRequest; import io.micronaut.http.netty.stream.EmptyHttpRequest; import io.micronaut.http.netty.stream.StreamedHttpResponse; -import io.micronaut.http.server.netty.SmartHttpContentCompressor; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; @@ -32,11 +29,11 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpChunkedInput; @@ -49,15 +46,8 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.stream.ChunkedFile; -import io.netty.handler.stream.ChunkedInput; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.ResourceLeakDetector; -import io.netty.util.ResourceLeakDetectorFactory; -import io.netty.util.ResourceLeakTracker; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -67,15 +57,15 @@ import reactor.core.publisher.Sinks; import reactor.util.concurrent.Queues; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; +import java.io.InputStream; +import java.io.InterruptedIOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Queue; -import java.util.function.Supplier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * Netty handler that handles incoming {@link HttpRequest}s and forwards them to a @@ -86,9 +76,6 @@ */ @Internal public final class PipeliningServerHandler extends ChannelInboundHandlerAdapter { - public static final Supplier> ZERO_COPY_PREDICATE = - SupplierUtil.memoized(() -> AttributeKey.newInstance("zero-copy-predicate")); - private static final int LENGTH_8K = 8192; private static final Logger LOG = LoggerFactory.getLogger(PipeliningServerHandler.class); @@ -673,39 +660,10 @@ public void writeStreamed(HttpResponse response, Publisher content) content.subscribe(new StreamingOutboundHandler(this, response)); } - /** - * Write a response with a special body - * ({@link io.netty.handler.codec.http.HttpChunkedInput}, - * {@link io.micronaut.http.server.types.files.SystemFile}). - * - * @param response The response to write - */ - private void writeStreamed(CustomResponse response) { - preprocess(response.response()); - write(new ChunkedOutboundHandler(this, response)); - } - - @Override - public void writeChunked(HttpResponse response, HttpChunkedInput chunkedInput) { - writeStreamed(new CustomResponse(response, chunkedInput, false)); - } - @Override - public void writeFile(HttpResponse response, RandomAccessFile randomAccessFile, long position, long contentLength) { - SmartHttpContentCompressor predicate = ctx.channel().attr(ZERO_COPY_PREDICATE.get()).get(); - if (predicate != null && predicate.shouldSkip(response)) { - // SSL not enabled - can use zero-copy file transfer. - writeStreamed(new CustomResponse(response, new TrackedDefaultFileRegion(randomAccessFile.getChannel(), position, contentLength), true)); - } else { - // SSL enabled - cannot use zero-copy file transfer. - try { - // HttpChunkedInput will write the end marker (LastHttpContent) for us. - final HttpChunkedInput chunkedInput = new HttpChunkedInput(new TrackedChunkedFile(randomAccessFile, position, contentLength, LENGTH_8K)); - writeStreamed(new CustomResponse(response, chunkedInput, false)); - } catch (IOException e) { - throw new MessageBodyException("Could not read file", e); - } - } + public void writeStream(HttpResponse response, InputStream stream, ExecutorService executorService) { + preprocess(response); + write(new BlockingOutboundHandler(this, response, stream, executorService)); } } @@ -912,89 +870,154 @@ void discard() { } } - /** - * Handler that writes a files etc. - */ - private final class ChunkedOutboundHandler extends OutboundHandler { - private final CustomResponse message; - - ChunkedOutboundHandler(OutboundAccess outboundAccess, CustomResponse message) { + private final class BlockingOutboundHandler extends OutboundHandler { + private static final int QUEUE_SIZE = 2; + + private final HttpResponse response; + private final InputStream stream; + private final ExecutorService blockingExecutor; + + private final Queue queue = new ArrayDeque<>(QUEUE_SIZE); + private Future worker = null; + private boolean workerReady = false; + private boolean discard = false; + private boolean done = false; + private boolean producerWaiting = false; + private boolean consumerWaiting = false; + + BlockingOutboundHandler( + OutboundAccess outboundAccess, + HttpResponse response, + InputStream stream, + ExecutorService blockingExecutor) { super(outboundAccess); - this.message = message; + this.response = response; + this.stream = stream; + this.blockingExecutor = blockingExecutor; } @Override void writeSome() { - boolean responseIsLast = message.body() == null && !message.needLast(); - write(message.response(), responseIsLast, responseIsLast && outboundAccess.closeAfterWrite); - if (message.body() != null) { - boolean bodyIsLast = !message.needLast(); - write(message.body(), bodyIsLast, bodyIsLast && outboundAccess.closeAfterWrite); - } - if (message.needLast()) { - write(LastHttpContent.EMPTY_LAST_CONTENT, true, outboundAccess.closeAfterWrite); + if (worker == null) { + write(response, false, false); + worker = blockingExecutor.submit(this::work); } - outboundHandler = null; - requestHandler.responseWritten(outboundAccess.attachment); - PipeliningServerHandler.this.writeSome(); + do { + ByteBuf msg; + synchronized (this) { + if (producerWaiting) { + producerWaiting = false; + notifyAll(); + } + msg = queue.poll(); + if (msg == null && !this.done) { + consumerWaiting = true; + break; + } + } + if (msg == null) { + // this.done == true inside the synchronized block + write(LastHttpContent.EMPTY_LAST_CONTENT, true, false); + + outboundHandler = null; + requestHandler.responseWritten(outboundAccess.attachment); + PipeliningServerHandler.this.writeSome(); + break; + } else { + write(new DefaultHttpContent(msg), true, false); + } + } while (ctx.channel().isWritable()); } @Override void discard() { - ReferenceCountUtil.release(message.response()); - if (message.body() instanceof ChunkedInput ci) { - try { - ci.close(); - } catch (Exception e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Failed to close ChunkedInput", e); - } + discard = true; + if (worker == null) { + worker = blockingExecutor.submit(this::work); + } else { + synchronized (this) { + if (workerReady) { + worker.cancel(true); + // in case the worker was already done, drain buffers + drain(); + } // else worker is still setting up and will see the discard flag in due time } - } else if (message.body() instanceof FileRegion fr) { - fr.release(); } - outboundHandler = null; } - } - private static class TrackedDefaultFileRegion extends DefaultFileRegion { - //to avoid initializing Netty at build time - private static final Supplier> LEAK_DETECTOR = SupplierUtil.memoized(() -> - ResourceLeakDetectorFactory.instance().newResourceLeakDetector(TrackedDefaultFileRegion.class)); - - private final ResourceLeakTracker tracker; - - public TrackedDefaultFileRegion(FileChannel fileChannel, long position, long count) { - super(fileChannel, position, count); - this.tracker = LEAK_DETECTOR.get().track(this); - } + private void work() { + ByteBuf buf = null; + try (InputStream stream = this.stream) { + synchronized (this) { + this.workerReady = true; + if (this.discard) { + // don't read + return; + } + } + while (true) { + buf = ctx.alloc().heapBuffer(LENGTH_8K); + int n = buf.writeBytes(stream, LENGTH_8K); + synchronized (this) { + if (n == -1) { + done = true; + wakeConsumer(); + break; + } + while (queue.size() >= QUEUE_SIZE && !discard) { + producerWaiting = true; + wait(); + } + if (discard) { + break; + } + queue.add(buf); + // buf is now owned by the queue + buf = null; + + wakeConsumer(); + } + } + } catch (InterruptedException | InterruptedIOException ignored) { + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("InputStream threw an error during read. This error cannot be forwarded to the client. Please make sure any errors are thrown by the controller instead.", e); + } + } finally { + // if we failed to add a buffer to the queue, release it + if (buf != null) { + buf.release(); + } + synchronized (this) { + done = true; - @Override - protected void deallocate() { - super.deallocate(); - if (tracker != null) { - tracker.close(this); + if (discard) { + drain(); + } + } } } - } - - private static class TrackedChunkedFile extends ChunkedFile { - //to avoid initializing Netty at build time - private static final Supplier> LEAK_DETECTOR = SupplierUtil.memoized(() -> - ResourceLeakDetectorFactory.instance().newResourceLeakDetector(TrackedChunkedFile.class)); - private final ResourceLeakTracker tracker; + private void wakeConsumer() { + assert Thread.holdsLock(this); - public TrackedChunkedFile(RandomAccessFile file, long offset, long length, int chunkSize) throws IOException { - super(file, offset, length, chunkSize); - this.tracker = LEAK_DETECTOR.get().track(this); + if (!discard && consumerWaiting) { + consumerWaiting = false; + ctx.executor().execute(PipeliningServerHandler.this::writeSome); + } } - @Override - public void close() throws Exception { - super.close(); - if (tracker != null) { - tracker.close(this); + private void drain() { + assert Thread.holdsLock(this); + + ByteBuf buf; + while (true) { + buf = queue.poll(); + if (buf != null) { + buf.release(); + } else { + break; + } } } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java index 5c84f273165..1a1252f9173 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java @@ -151,10 +151,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - private void log(ChannelHandlerContext ctx, Object msg, ChannelPromise promise, AccessLog accessLog) { + private void log(ChannelHandlerContext ctx, Object msg, ChannelPromise promise, AccessLog accessLog, AccessLogHolder accessLogHolder) { ctx.write(msg, promise.unvoid()).addListener(future -> { if (future.isSuccess()) { accessLog.log(logger); + accessLogHolder.logForReuse = accessLog; } }); } @@ -171,7 +172,7 @@ private void processWriteEvent(ChannelHandlerContext ctx, Object msg, ChannelPro } if (msg instanceof LastHttpContent content) { accessLogger.onLastResponseWrite(content.content().readableBytes()); - log(ctx, msg, promise, accessLogger); + log(ctx, msg, promise, accessLogger, accessLogHolder); return; } else if (msg instanceof ByteBufHolder holder) { accessLogger.onResponseWrite(holder.content().readableBytes()); @@ -224,9 +225,7 @@ void excludeRequest() { @Nullable AccessLog getLogForResponse(boolean finishResponse) { if (finishResponse) { - AccessLog accessLog = liveLogs.poll(); - logForReuse = accessLog; - return accessLog; + return liveLogs.poll(); } else { return liveLogs.peek(); } diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/StreamPressureSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/StreamPressureSpec.groovy new file mode 100644 index 00000000000..6df809d4800 --- /dev/null +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/StreamPressureSpec.groovy @@ -0,0 +1,101 @@ +package io.micronaut.http.server.netty.stream + +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Requires +import io.micronaut.core.io.buffer.ByteBuffer +import io.micronaut.http.HttpRequest +import io.micronaut.http.annotation.Controller +import io.micronaut.http.annotation.Get +import io.micronaut.http.client.HttpClient +import io.micronaut.http.client.StreamingHttpClient +import io.micronaut.runtime.server.EmbeddedServer +import reactor.core.publisher.Flux +import spock.lang.Specification + +import java.util.concurrent.ThreadLocalRandom + +class StreamPressureSpec extends Specification { + def 'producer pressure'() { + given: + def data = new byte[1024 * 1024 * 4] + ThreadLocalRandom.current().nextBytes(data) + + def ctx = ApplicationContext.run(['spec.name': 'StreamPressureSpec']) + ctx.getBean(MyController).stream = new ByteArrayInputStream(data) + + def server = ctx.getBean(EmbeddedServer) + server.start() + def client = ctx.createBean(HttpClient, server.URI).toBlocking() + + expect: + client.retrieve("/stream-pressure", byte[]) == data + + cleanup: + server.stop() + client.close() + ctx.close() + } + + def 'consumer pressure'() { + given: + def ctx = ApplicationContext.run(['spec.name': 'StreamPressureSpec']) + + byte[] data = new byte[1024 * 1024] + ThreadLocalRandom.current().nextBytes(data) + def serverStream = new PipedOutputStream() + ctx.getBean(MyController).stream = new PipedInputStream(serverStream) + + def clientOStream = new PipedOutputStream() + def clientIStream = new PipedInputStream(clientOStream) + + def server = ctx.getBean(EmbeddedServer) + server.start() + def client = ctx.createBean(StreamingHttpClient, server.URI) + + when: + Flux.from(client.dataStream(HttpRequest.GET("/stream-pressure"))).subscribe { + clientOStream.write(it.toByteArray()) + } + serverStream.write(data) + serverStream.flush() + then: + clientIStream.readNBytes(data.length) == data + + when: + serverStream.write(data) + serverStream.flush() + then: + clientIStream.readNBytes(data.length) == data + + cleanup: + serverStream.close() + clientIStream.close() + server.stop() + client.close() + ctx.close() + } + + private byte[] read(Iterator> itr, int n) { + byte[] out = new byte[n] + int off = 0 + while (n > 0) { + def buf = itr.next() + def chunkN = buf.readableBytes() + buf.read(out, off, chunkN) + off += chunkN + n -= chunkN + } + return out + } + + @Requires(property = "spec.name", value = "StreamPressureSpec") + @Controller + static class MyController { + InputStream stream + + @Get("/stream-pressure") + InputStream get() { + return this.stream + } + } +} From 590f9872fbe4e9b96d8664c33d45cbcbc78d2786 Mon Sep 17 00:00:00 2001 From: yawkat Date: Mon, 20 Nov 2023 17:33:55 +0100 Subject: [PATCH 2/5] remove unused class --- .../netty/body/SystemFileBodyWriter.java | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java index 34871654eff..fcaa64bff59 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java @@ -48,7 +48,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.RandomAccessFile; import java.util.concurrent.ExecutorService; import static io.micronaut.http.HttpHeaders.CONTENT_RANGE; @@ -174,24 +173,6 @@ private static class IntRange { } } - private static class RafInputStream extends InputStream { - private final RandomAccessFile raf; - - RafInputStream(RandomAccessFile raf) { - this.raf = raf; - } - - @Override - public int read() throws IOException { - return raf.read(); - } - - @Override - public int read(@NotNull byte[] b, int off, int len) throws IOException { - return raf.read(b, off, len); - } - } - private static final class RangeInputStream extends InputStream { private final InputStream delegate; private final long toSkip; From f7e0f25461aaffa91a6496cd08ffa1d050ef1a18 Mon Sep 17 00:00:00 2001 From: yawkat Date: Tue, 21 Nov 2023 08:12:26 +0100 Subject: [PATCH 3/5] remove unused class --- .../handler/PipeliningServerHandler.java | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index 535f1d9cacd..20e33d055d3 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -30,13 +30,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop; -import io.netty.channel.FileRegion; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; @@ -306,24 +304,6 @@ void readComplete() { } } - /** - * Wrapper class for a netty response with a special body type, like - * {@link HttpChunkedInput} or - * {@link FileRegion}. - * - * @param response The response - * @param body The body, or {@code null} if there is no body - * @param needLast Whether to finish the response with a - * {@link LastHttpContent} - */ - private record CustomResponse(HttpResponse response, @Nullable Object body, boolean needLast) { - CustomResponse { - if (response instanceof FullHttpResponse) { - throw new IllegalArgumentException("Response must not be a FullHttpResponse to send a special body"); - } - } - } - /** * Base {@link InboundHandler} that handles {@link HttpRequest}s and then determines how to * deal with the body. From 29fe47db2ef8c13df40aa9a52f34b6316bf550cd Mon Sep 17 00:00:00 2001 From: yawkat Date: Tue, 21 Nov 2023 12:04:40 +0100 Subject: [PATCH 4/5] Fix request backpressure PipeliningServerHandler was supposed to implement backpressure, but it turns out that auto read was still enabled and that the implementation didn't really work. This means that it would keep reading even if that means buffering data when the downstream can't keep up. This PR disables auto read and fixes the read implementation in PipeliningServerHandler. In principle there should be no change to users, this just means that instead of buffering any incoming data internally, backpressure is now applied to the client. This PR is based on #10138 but is separate for easier review. It also has conflicts with #10131. --- .../handler/PipeliningServerHandler.java | 75 ++++++++++++++----- .../NettyServerWebSocketUpgradeHandler.java | 2 + .../http/server/netty/EmbeddedTestUtil.groovy | 5 +- .../PipeliningServerHandlerSpec.groovy | 63 ++++++++++++++++ 4 files changed, 126 insertions(+), 19 deletions(-) diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index 20e33d055d3..c1e11a35cbd 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -105,9 +105,9 @@ public final class PipeliningServerHandler extends ChannelInboundHandlerAdapter */ private boolean reading = false; /** - * {@code true} iff we want to read more data. + * {@code true} iff {@code ctx.read()} has been called already. */ - private boolean moreRequested = false; + private boolean readCalled = false; /** * {@code true} iff this handler has been removed. */ @@ -151,16 +151,18 @@ private static boolean hasBody(HttpRequest request) { } /** - * Set whether we need more input, i.e. another call to {@link #channelRead}. This is usally a - * {@link ChannelHandlerContext#read()} call, but it's coalesced until - * {@link #channelReadComplete}. - * - * @param needMore {@code true} iff we need more input + * Call {@code ctx.read()} if necessary. */ - private void setNeedMore(boolean needMore) { - boolean oldMoreRequested = moreRequested; - moreRequested = needMore; - if (!oldMoreRequested && !reading && needMore) { + private void refreshNeedMore() { + // if readCalled is true, ctx.read() is already called and we haven't seen the associated readComplete yet. + + // needMore is false if there is downstream backpressure. + + // requestHandler itself (i.e. non-streaming request processing) does not have + // backpressure. For this, check whether there is a request that has been fully read but + // has no response yet. If there is, apply backpressure. + if (!readCalled && outboundQueue.size() <= 1 && inboundHandler.needMore()) { + readCalled = true; ctx.read(); } } @@ -168,6 +170,9 @@ private void setNeedMore(boolean needMore) { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; + // we take control of reading now. + ctx.channel().config().setAutoRead(false); + refreshNeedMore(); } @Override @@ -195,13 +200,13 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { inboundHandler.readComplete(); reading = false; + // only unset readCalled now. This ensures no read call is done before channelReadComplete + readCalled = false; if (flushPending) { ctx.flush(); flushPending = false; } - if (moreRequested) { - ctx.read(); - } + refreshNeedMore(); } @Override @@ -267,6 +272,7 @@ private void writeSome() { if (next != null && next.handler != null) { outboundQueue.poll(); outboundHandler = next.handler; + refreshNeedMore(); } else { return; } @@ -286,7 +292,15 @@ private void writeSome() { /** * An inbound handler is responsible for all incoming messages. */ - private abstract static class InboundHandler { + private abstract class InboundHandler { + /** + * @return {@code true} iff this handler can process more data. This is usually {@code true}, + * except for streaming requests when there is downstream backpressure. + */ + boolean needMore() { + return true; + } + /** * @see #channelRead */ @@ -448,7 +462,6 @@ void read(Object message) { sink.tryEmitComplete(); inboundHandler = baseInboundHandler; } - setNeedMore(requested > 0); } @Override @@ -459,6 +472,11 @@ void handleUpstreamError(Throwable cause) { } } + @Override + boolean needMore() { + return requested > 0; + } + private void request(long n) { EventLoop eventLoop = ctx.channel().eventLoop(); if (!eventLoop.inEventLoop()) { @@ -472,20 +490,27 @@ private void request(long n) { newRequested = Long.MAX_VALUE; } requested = newRequested; - setNeedMore(newRequested > 0); + refreshNeedMore(); } Flux flux() { return sink.asFlux() .doOnRequest(this::request) - .doOnCancel(this::releaseQueue); + .doOnCancel(this::cancel); } void closeIfNoSubscriber() { + EventLoop eventLoop = ctx.channel().eventLoop(); + if (!eventLoop.inEventLoop()) { + eventLoop.execute(this::closeIfNoSubscriber); + return; + } + if (sink.currentSubscriberCount() == 0) { releaseQueue(); if (inboundHandler == this) { inboundHandler = droppingInboundHandler; + refreshNeedMore(); } } } @@ -499,6 +524,20 @@ private void releaseQueue() { c.release(); } } + + private void cancel() { + EventLoop eventLoop = ctx.channel().eventLoop(); + if (!eventLoop.inEventLoop()) { + eventLoop.execute(this::cancel); + return; + } + + if (inboundHandler == this) { + inboundHandler = droppingInboundHandler; + refreshNeedMore(); + } + releaseQueue(); + } } /** diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/websocket/NettyServerWebSocketUpgradeHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/websocket/NettyServerWebSocketUpgradeHandler.java index 75fbdccc5aa..43c1bb14eaf 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/websocket/NettyServerWebSocketUpgradeHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/websocket/NettyServerWebSocketUpgradeHandler.java @@ -204,6 +204,8 @@ private void writeResponse(ChannelHandlerContext ctx, NettyHttpRequest msg, b } catch (NoSuchElementException ignored) { } + // websocket needs auto read for now + ctx.channel().config().setAutoRead(true); } catch (Throwable e) { if (LOG.isErrorEnabled()) { LOG.error("Error opening WebSocket: " + e.getMessage(), e); diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy index 8e0365cfb85..6e297193950 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy @@ -23,7 +23,10 @@ class EmbeddedTestUtil { static void connect(EmbeddedChannel server, EmbeddedChannel client) { new ConnectionDirection(server, client).register() - new ConnectionDirection(client, server).register() + def csDir = new ConnectionDirection(client, server) + csDir.register() + // PipeliningServerHandler fires a read() before this method is called, so we don't see it. + csDir.readPending = true } private static class ConnectionDirection { diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/PipeliningServerHandlerSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/PipeliningServerHandlerSpec.groovy index c67836a7be4..a74762359f7 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/PipeliningServerHandlerSpec.groovy +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/PipeliningServerHandlerSpec.groovy @@ -21,6 +21,8 @@ import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpResponse import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpVersion +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import spock.lang.Issue @@ -290,6 +292,67 @@ class PipeliningServerHandlerSpec extends Specification { completeOnCancel << [true, false] } + def 'read backpressure for streaming requests'() { + given: + def mon = new MonitorHandler() + Subscription subscription = null + def ch = new EmbeddedChannel(mon, new PipeliningServerHandler(new RequestHandler() { + @Override + void accept(ChannelHandlerContext ctx, HttpRequest request, PipeliningServerHandler.OutboundAccess outboundAccess) { + ((StreamedHttpRequest) request).subscribe(new Subscriber() { + @Override + void onSubscribe(Subscription s) { + subscription = s + } + + @Override + void onNext(HttpContent httpContent) { + httpContent.release() + } + + @Override + void onError(Throwable t) { + t.printStackTrace() + } + + @Override + void onComplete() { + outboundAccess.writeFull(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT)) + } + }) + } + + @Override + void handleUnboundError(Throwable cause) { + cause.printStackTrace() + } + })) + + expect: + mon.read == 1 + mon.flush == 0 + + when: + def req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/") + req.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) + ch.writeInbound(req) + then: + // no read call until request + mon.read == 1 + + when: + subscription.request(1) + then: + mon.read == 2 + + when: + ch.writeInbound(new DefaultLastHttpContent(Unpooled.wrappedBuffer("foo".getBytes(StandardCharsets.UTF_8)))) + then: + // read call for the next request + mon.read == 3 + ch.checkException() + } + static class MonitorHandler extends ChannelOutboundHandlerAdapter { int flush = 0 int read = 0 From 4a663baaa155381a243b01df5cc41315937bb80a Mon Sep 17 00:00:00 2001 From: yawkat Date: Fri, 8 Dec 2023 12:35:43 +0100 Subject: [PATCH 5/5] fix test --- .../io/micronaut/http/server/netty/EmbeddedTestUtil.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy index 6e297193950..ce35b101e69 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/EmbeddedTestUtil.groovy @@ -43,7 +43,7 @@ class EmbeddedTestUtil { } private void forwardLater(Object msg) { - if (readPending || dest.config().isAutoRead()) { + if (readPending || dest.config().isAutoRead() || msg == FLUSH) { dest.eventLoop().execute(() -> forwardNow(msg)) readPending = false } else {