From 228854173a004d135e9ea28e677898557891c590 Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Tue, 12 Dec 2023 08:05:05 +0100 Subject: [PATCH] Compression support in PipeliningServerHandler (#10246) * Replace chunked write API This PR replaces the writeChunked and writeFile APIs with a new writeStream API that takes an InputStream. This removes the need for the ChunkedWriteHandler. Chunked writes were used for two purposes: Sending file regions and sending InputStreams. This has always complicated the HTTP pipeline somewhat as the pipeline had to deal with not just HttpContent objects but also ChunkedInput and FileRegion objects. This PR replaces the machinery for InputStream writing with a more straight-forward solution that reads the data on the IO thread and then sends it down the channel. Additionally, the file-specific APIs based on RandomAccessFile are removed. The body writer now just creates an InputStream for the file region in question and sends that. This removes support for zero-copy transfers, however that is a niche feature anyway because it doesn't work with TLS or HTTP/2. If someone wants a performant HTTP server, HTTP/2 takes priority over zero-copy so it makes little sense. This PR may have small conflicts with #10131 as that PR changed the PipeliningServerHandler body handling a little bit. Otherwise this PR should have no visible impact on users. * remove unused class * remove unused class * Fix request backpressure PipeliningServerHandler was supposed to implement backpressure, but it turns out that auto read was still enabled and that the implementation didn't really work. This means that it would keep reading even if that means buffering data when the downstream can't keep up. This PR disables auto read and fixes the read implementation in PipeliningServerHandler. In principle there should be no change to users, this just means that instead of buffering any incoming data internally, backpressure is now applied to the client. This PR is based on #10138 but is separate for easier review. It also has conflicts with #10131. * Implement decompression in PipeliningServerHandler This patch implements the logic of HttpContentDecompressor in PipeliningServerHandler. This allows us to shrink the pipeline a little. The perf impact for uncompressed requests should basically be zero. This builds on the changes in #10142. * address review * revert * add DecompressionSpec * Compression support in PipeliningServerHandler Like #10155 --- gradle/libs.versions.toml | 3 + http-server-netty/build.gradle | 1 + .../server/netty/HttpCompressionStrategy.java | 3 +- .../server/netty/HttpPipelineBuilder.java | 7 +- .../netty/SmartHttpContentCompressor.java | 74 ------- .../http/server/netty/handler/Compressor.java | 188 ++++++++++++++++++ .../handler/PipeliningServerHandler.java | 144 ++++++++++++-- .../http/server/netty/CompressionSpec.groovy | 73 +++++++ .../SmartHttpContentCompressorSpec.groovy | 37 ---- 9 files changed, 402 insertions(+), 128 deletions(-) delete mode 100644 http-server-netty/src/main/java/io/micronaut/http/server/netty/SmartHttpContentCompressor.java create mode 100644 http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Compressor.java create mode 100644 http-server-netty/src/test/groovy/io/micronaut/http/server/netty/CompressionSpec.groovy delete mode 100644 http-server-netty/src/test/groovy/io/micronaut/http/server/netty/SmartHttpContentCompressorSpec.groovy diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a398eefcf45..034b25ebf92 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,6 +3,7 @@ asm = "9.6" awaitility = "4.2.0" bcpkix = "1.70" blaze = "1.6.10" +brotli4j = "1.13.0" caffeine = "2.9.3" compile-testing = "0.21.0" @@ -165,6 +166,8 @@ bcpkix = { module = "org.bouncycastle:bcpkix-jdk15on", version.ref = "bcpkix" } blaze-persistence-core = { module = "com.blazebit:blaze-persistence-core-impl", version.ref = "blaze" } +brotli4j = { module = "com.aayushatharva.brotli4j:brotli4j", version.ref = "brotli4j" } + caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" } compile-testing = { module = "com.google.testing.compile:compile-testing", version.ref = "compile-testing" } diff --git a/http-server-netty/build.gradle b/http-server-netty/build.gradle index 2be633fec98..b7e29b4151c 100644 --- a/http-server-netty/build.gradle +++ b/http-server-netty/build.gradle @@ -32,6 +32,7 @@ dependencies { compileOnly libs.managed.kotlin.stdlib compileOnly libs.managed.netty.transport.native.unix.common compileOnly libs.managed.netty.incubator.codec.http3 + compileOnly libs.brotli4j testImplementation libs.jmh.core testAnnotationProcessor libs.jmh.generator.annprocess diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java index 47f24ebf789..008f99a57b0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java @@ -15,6 +15,7 @@ */ package io.micronaut.http.server.netty; +import io.netty.handler.codec.compression.StandardCompressionOptions; import io.netty.handler.codec.http.HttpResponse; /** @@ -36,6 +37,6 @@ public interface HttpCompressionStrategy { * @return The compression level (0-9) */ default int getCompressionLevel() { - return 6; + return StandardCompressionOptions.gzip().compressionLevel(); } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java index 30caf11207a..7857f030a31 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java @@ -597,9 +597,6 @@ private void insertMicronautHandlers() { channel.attr(CERTIFICATE_SUPPLIER_ATTRIBUTE.get()).set(sslHandler.findPeerCert()); } - SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy()); - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor); - Optional webSocketUpgradeHandler = embeddedServices.getWebSocketUpgradeHandler(server); if (webSocketUpgradeHandler.isPresent()) { pipeline.addLast(NettyServerWebSocketUpgradeHandler.COMPRESSION_HANDLER, new WebSocketServerCompressionHandler()); @@ -621,7 +618,9 @@ private void insertMicronautHandlers() { if (server.getServerConfiguration().isDualProtocol() && server.getServerConfiguration().isHttpToHttpsRedirect() && sslHandler == null) { requestHandler = new HttpToHttpsRedirectHandler(routingInBoundHandler.conversionService, server.getServerConfiguration(), sslConfiguration, hostResolver); } - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_INBOUND, new PipeliningServerHandler(requestHandler)); + PipeliningServerHandler pipeliningServerHandler = new PipeliningServerHandler(requestHandler); + pipeliningServerHandler.setCompressionStrategy(embeddedServices.getHttpCompressionStrategy()); + pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_INBOUND, pipeliningServerHandler); } /** diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/SmartHttpContentCompressor.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/SmartHttpContentCompressor.java deleted file mode 100644 index 8ea5b258ec4..00000000000 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/SmartHttpContentCompressor.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.http.server.netty; - -import io.micronaut.core.annotation.Internal; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.HttpContentCompressor; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpResponse; - -import java.util.List; - -/** - * An extension of {@link HttpContentCompressor} that skips encoding if the content type is not compressible or if - * the content is too small. - * - * @author James Kleeh - * @since 1.0 - */ -@Internal -public class SmartHttpContentCompressor extends HttpContentCompressor { - - private final HttpCompressionStrategy httpCompressionStrategy; - private boolean skipEncoding = false; - - /** - * Creates a SmartHttpContentCompressor with the given compression logic. - * - * @param httpCompressionStrategy The compression strategy - */ - SmartHttpContentCompressor(HttpCompressionStrategy httpCompressionStrategy) { - super(httpCompressionStrategy.getCompressionLevel()); - this.httpCompressionStrategy = httpCompressionStrategy; - } - - /** - * Determines if encoding should occur based on the response. - * - * @param response The response - * @return True if the content should not be compressed - */ - public boolean shouldSkip(HttpResponse response) { - return !httpCompressionStrategy.shouldCompress(response); - } - - @Override - protected void encode(ChannelHandlerContext ctx, HttpObject msg, List out) throws Exception { - if (msg instanceof HttpResponse res) { - skipEncoding = shouldSkip(res); - } - super.encode(ctx, msg, out); - } - - @Override - protected Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception { - if (skipEncoding) { - return null; - } - return super.beginEncode(headers, acceptEncoding); - } -} diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Compressor.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Compressor.java new file mode 100644 index 00000000000..eb8068cdf8c --- /dev/null +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Compressor.java @@ -0,0 +1,188 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.server.netty.handler; + +import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.server.netty.HttpCompressionStrategy; +import io.netty.channel.ChannelHandler; +import io.netty.handler.codec.compression.Brotli; +import io.netty.handler.codec.compression.BrotliEncoder; +import io.netty.handler.codec.compression.BrotliOptions; +import io.netty.handler.codec.compression.DeflateOptions; +import io.netty.handler.codec.compression.GzipOptions; +import io.netty.handler.codec.compression.SnappyFrameEncoder; +import io.netty.handler.codec.compression.SnappyOptions; +import io.netty.handler.codec.compression.StandardCompressionOptions; +import io.netty.handler.codec.compression.ZlibCodecFactory; +import io.netty.handler.codec.compression.ZlibWrapper; +import io.netty.handler.codec.compression.Zstd; +import io.netty.handler.codec.compression.ZstdEncoder; +import io.netty.handler.codec.compression.ZstdOptions; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpVersion; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +final class Compressor { + private final HttpCompressionStrategy strategy; + private final BrotliOptions brotliOptions; + private final GzipOptions gzipOptions; + private final DeflateOptions deflateOptions; + private final ZstdOptions zstdOptions; + private final SnappyOptions snappyOptions; + + Compressor(HttpCompressionStrategy strategy) { + this.strategy = strategy; + // only use configured compression level for gzip and deflate, other algos have different semantics for the level + this.brotliOptions = Brotli.isAvailable() ? StandardCompressionOptions.brotli() : null; + GzipOptions stdGzip = StandardCompressionOptions.gzip(); + this.gzipOptions = StandardCompressionOptions.gzip(strategy.getCompressionLevel(), stdGzip.windowBits(), stdGzip.memLevel()); + DeflateOptions stdDeflate = StandardCompressionOptions.deflate(); + this.deflateOptions = StandardCompressionOptions.deflate(strategy.getCompressionLevel(), stdDeflate.windowBits(), stdDeflate.memLevel()); + this.zstdOptions = Zstd.isAvailable() ? StandardCompressionOptions.zstd() : null; + this.snappyOptions = StandardCompressionOptions.snappy(); + } + + @Nullable + ChannelHandler prepare(HttpRequest request, HttpResponse response) { + // from HttpContentEncoder: isPassthru + int code = response.status().code(); + if (code < 200 || code == 204 || code == 304 || + (request.method().equals(HttpMethod.HEAD) || (request.method().equals(HttpMethod.CONNECT) && code == 200)) || + response.protocolVersion() == HttpVersion.HTTP_1_0) { + return null; + } + // special case for FHR to keep behavior identical to HttpContentEncoder + if (response instanceof FullHttpResponse fhr && !fhr.content().isReadable()) { + return null; + } + if (!strategy.shouldCompress(response)) { + return null; + } + if (response.headers().contains(HttpHeaderNames.CONTENT_ENCODING)) { + // already encoded + return null; + } + List acceptEncoding = new ArrayList<>(); + for (String s : request.headers().getAll(HttpHeaderNames.ACCEPT_ENCODING)) { + acceptEncoding.addAll(Arrays.asList(s.split(","))); + } + Algorithm encoding = determineEncoding(acceptEncoding); + if (encoding == null) { + return null; + } + response.headers().add(HttpHeaderNames.CONTENT_ENCODING, encoding.contentEncoding); + return switch (encoding) { + case BR -> makeBrotliEncoder(); + case ZSTD -> new ZstdEncoder(zstdOptions.compressionLevel(), zstdOptions.blockSize(), zstdOptions.maxEncodeSize()); + case SNAPPY -> new SnappyFrameEncoder(); + case GZIP -> ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP, gzipOptions.compressionLevel(), gzipOptions.windowBits(), gzipOptions.memLevel()); + case DEFLATE -> ZlibCodecFactory.newZlibEncoder(ZlibWrapper.ZLIB, deflateOptions.compressionLevel(), deflateOptions.windowBits(), deflateOptions.memLevel()); + }; + } + + private BrotliEncoder makeBrotliEncoder() { + return new BrotliEncoder(brotliOptions.parameters()); + } + + @SuppressWarnings("FloatingPointEquality") + private Algorithm determineEncoding(List acceptEncoding) { + // from HttpContentCompressor, slightly modified + float starQ = -1.0f; + float brQ = -1.0f; + float zstdQ = -1.0f; + float snappyQ = -1.0f; + float gzipQ = -1.0f; + float deflateQ = -1.0f; + for (String encoding : acceptEncoding) { + float q = 1.0f; + int equalsPos = encoding.indexOf('='); + if (equalsPos != -1) { + try { + q = Float.parseFloat(encoding.substring(equalsPos + 1)); + } catch (NumberFormatException e) { + // Ignore encoding + q = 0.0f; + } + } + if (encoding.contains("*")) { + starQ = q; + } else if (encoding.contains("br") && q > brQ) { + brQ = q; + } else if (encoding.contains("zstd") && q > zstdQ) { + zstdQ = q; + } else if (encoding.contains("snappy") && q > snappyQ) { + snappyQ = q; + } else if (encoding.contains("gzip") && q > gzipQ) { + gzipQ = q; + } else if (encoding.contains("deflate") && q > deflateQ) { + deflateQ = q; + } + } + if (brQ > 0.0f || zstdQ > 0.0f || snappyQ > 0.0f || gzipQ > 0.0f || deflateQ > 0.0f) { + if (brQ != -1.0f && brQ >= zstdQ && this.brotliOptions != null) { + return Algorithm.BR; + } else if (zstdQ != -1.0f && zstdQ >= snappyQ && this.zstdOptions != null) { + return Algorithm.ZSTD; + } else if (snappyQ != -1.0f && snappyQ >= gzipQ && this.snappyOptions != null) { + return Algorithm.SNAPPY; + } else if (gzipQ != -1.0f && gzipQ >= deflateQ && this.gzipOptions != null) { + return Algorithm.GZIP; + } else if (deflateQ != -1.0f && this.deflateOptions != null) { + return Algorithm.DEFLATE; + } + } + if (starQ > 0.0f) { + if (brQ == -1.0f && this.brotliOptions != null) { + return Algorithm.BR; + } + if (zstdQ == -1.0f && this.zstdOptions != null) { + return Algorithm.ZSTD; + } + if (snappyQ == -1.0f && this.snappyOptions != null) { + return Algorithm.SNAPPY; + } + if (gzipQ == -1.0f && this.gzipOptions != null) { + return Algorithm.GZIP; + } + if (deflateQ == -1.0f && this.deflateOptions != null) { + return Algorithm.DEFLATE; + } + } + return null; + } + + private enum Algorithm { + BR(HttpHeaderValues.BR), + ZSTD(HttpHeaderValues.ZSTD), + SNAPPY(HttpHeaderValues.SNAPPY), + GZIP(HttpHeaderValues.GZIP), + DEFLATE(HttpHeaderValues.DEFLATE); + + final CharSequence contentEncoding; + + Algorithm(CharSequence contentEncoding) { + this.contentEncoding = contentEncoding; + } + } +} diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index d85aeade00a..50222dea4f0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -22,11 +22,13 @@ import io.micronaut.http.netty.stream.DelegateStreamedHttpRequest; import io.micronaut.http.netty.stream.EmptyHttpRequest; import io.micronaut.http.netty.stream.StreamedHttpResponse; +import io.micronaut.http.server.netty.HttpCompressionStrategy; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop; @@ -39,6 +41,7 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -92,6 +95,8 @@ public final class PipeliningServerHandler extends ChannelInboundHandlerAdapter private final InboundHandler baseInboundHandler = new MessageInboundHandler(); private final OptimisticBufferingInboundHandler optimisticBufferingInboundHandler = new OptimisticBufferingInboundHandler(); + private Compressor compressor; + /** * Current handler for inbound messages. */ @@ -133,6 +138,10 @@ public PipeliningServerHandler(RequestHandler requestHandler) { this.requestHandler = requestHandler; } + public void setCompressionStrategy(HttpCompressionStrategy compressionStrategy) { + this.compressor = new Compressor(compressionStrategy); + } + public static boolean canHaveBody(HttpResponseStatus status) { // All 1xx (Informational), 204 (No Content), and 304 (Not Modified) // responses do not include a message body @@ -334,7 +343,7 @@ private final class MessageInboundHandler extends InboundHandler { @Override void read(Object message) { HttpRequest request = (HttpRequest) message; - OutboundAccess outboundAccess = new OutboundAccess(); + OutboundAccess outboundAccess = new OutboundAccess(request); outboundQueue.add(outboundAccess); HttpHeaders headers = request.headers(); @@ -695,6 +704,10 @@ void handleUpstreamError(Throwable cause) { * Class that allows writing the response for the request this object is associated with. */ public final class OutboundAccess implements NettyWriteContext { + /** + * The request that caused this response. This is used for compression decisions. + */ + private final HttpRequest request; /** * The handler that will perform the actual write operation. */ @@ -702,7 +715,8 @@ public final class OutboundAccess implements NettyWriteContext { private Object attachment = null; private boolean closeAfterWrite = false; - private OutboundAccess() { + private OutboundAccess(HttpRequest request) { + this.request = request; } @Override @@ -797,7 +811,9 @@ public void writeFull(FullHttpResponse response, boolean headResponse) { response.headers().remove(HttpHeaderNames.CONTENT_LENGTH); } preprocess(response); - write(new FullOutboundHandler(this, response)); + FullOutboundHandler oh = new FullOutboundHandler(this, response); + prepareCompression(response, oh); + write(oh); } @Override @@ -809,26 +825,120 @@ public void writeStreamed(HttpResponse response, Publisher content) response.headers().remove(HttpHeaderNames.TRANSFER_ENCODING); } preprocess(response); - content.subscribe(new StreamingOutboundHandler(this, response)); + StreamingOutboundHandler oh = new StreamingOutboundHandler(this, response); + prepareCompression(response, oh); + content.subscribe(oh); } @Override public void writeStream(HttpResponse response, InputStream stream, ExecutorService executorService) { preprocess(response); - write(new BlockingOutboundHandler(this, response, stream, executorService)); + BlockingOutboundHandler oh = new BlockingOutboundHandler(this, response, stream, executorService); + prepareCompression(response, oh); + write(oh); + } + + private void prepareCompression(HttpResponse response, OutboundHandler outboundHandler) { + if (compressor == null) { + return; + } + ChannelHandler compressionHandler = compressor.prepare(request, response); + if (compressionHandler != null) { + // if content-length and transfer-encoding are unset, we will close anyway. + // if this is a full response, there's special handling below in OutboundHandler + if (!(response instanceof FullHttpResponse) && response.headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { + response.headers().remove(HttpHeaderNames.CONTENT_LENGTH); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + } + outboundHandler.compressionChannel = new EmbeddedChannel( + ctx.channel().id(), + ctx.channel().metadata().hasDisconnect(), + ctx.channel().config(), + compressionHandler + ); + } } } - private abstract static class OutboundHandler { + private abstract class OutboundHandler { /** * {@link OutboundAccess} that created this handler, for metadata access. */ final OutboundAccess outboundAccess; + EmbeddedChannel compressionChannel; + private OutboundHandler(OutboundAccess outboundAccess) { this.outboundAccess = outboundAccess; } + protected final void writeCompressing(HttpContent content, @SuppressWarnings("SameParameterValue") boolean flush, boolean close) { + if (this.compressionChannel == null) { + write(content, flush, close); + } else { + // slow path + writeCompressing0(content, flush, close); + } + } + + private void writeCompressing0(HttpContent content, boolean flush, boolean close) { + EmbeddedChannel compressionChannel = this.compressionChannel; + if (content.content().isReadable()) { + compressionChannel.writeOutbound(content.content()); + } else { + content.content().release(); + } + boolean last = content instanceof LastHttpContent; + if (last) { + compressionChannel.finish(); + this.compressionChannel = null; + } + if (content instanceof HttpResponse hr) { + assert last; + + // fix content-length if necessary + if (hr.headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { + long newContentLength = 0; + for (Object outboundMessage : compressionChannel.outboundMessages()) { + newContentLength += ((ByteBuf) outboundMessage).readableBytes(); + } + hr.headers().set(HttpHeaderNames.CONTENT_LENGTH, newContentLength); + } + + // this can happen in FullHttpResponse, just send the full body. + write(new DefaultHttpResponse(hr.protocolVersion(), hr.status(), hr.headers()), false, false); + } + // this is a bit awkward. we go over all the compressed data, *except the last buffer*, and forward them with + // flush=false and close=false. only for the last buffer we use those flags. + ByteBuf nextToSend = null; + while (true) { + ByteBuf buf = compressionChannel.readOutbound(); + if (buf == null) { + break; + } + if (nextToSend != null) { + write(new DefaultHttpContent(nextToSend), false, false); + } + nextToSend = buf; + } + // send the last buffer with the flags. + if (nextToSend == null) { + if (last) { + HttpHeaders trailingHeaders = ((LastHttpContent) content).trailingHeaders(); + write(trailingHeaders.isEmpty() ? LastHttpContent.EMPTY_LAST_CONTENT : new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, trailingHeaders), flush, close); + } else if (flush || close) { + // not sure if this can actually happen, but we need to forward a flush/close + write(new DefaultHttpContent(Unpooled.EMPTY_BUFFER), flush, close); + } // else just don't send anything + } else { + if (last) { + write(new DefaultLastHttpContent(nextToSend, ((LastHttpContent) content).trailingHeaders()), flush, close); + } else { + write(new DefaultHttpContent(nextToSend), flush, close); + } + } + } + /** * Write some data to the channel. */ @@ -837,7 +947,13 @@ private OutboundHandler(OutboundAccess outboundAccess) { /** * Discard the remaining data. */ - abstract void discard(); + void discard() { + EmbeddedChannel compressionChannel = this.compressionChannel; + if (compressionChannel != null) { + compressionChannel.finishAndReleaseAll(); + this.compressionChannel = null; + } + } } /** @@ -867,6 +983,7 @@ void writeSome() { @Override void discard() { + super.discard(); if (next != null) { next.discard(); next = null; @@ -887,7 +1004,7 @@ private final class FullOutboundHandler extends OutboundHandler { @Override void writeSome() { - write(message, true, outboundAccess.closeAfterWrite); + writeCompressing(message, true, outboundAccess.closeAfterWrite); outboundHandler = null; requestHandler.responseWritten(outboundAccess.attachment); PipeliningServerHandler.this.writeSome(); @@ -895,6 +1012,7 @@ void writeSome() { @Override void discard() { + super.discard(); message.release(); outboundHandler = null; } @@ -962,7 +1080,7 @@ public void onNext(HttpContent httpContent) { if (httpContent instanceof LastHttpContent) { writtenLast = true; } - write(httpContent, true, false); + writeCompressing(httpContent, true, false); if (ctx.channel().isWritable()) { subscription.request(1); } @@ -1011,7 +1129,7 @@ public void onComplete() { } if (!writtenLast) { - write(LastHttpContent.EMPTY_LAST_CONTENT, true, outboundAccess.closeAfterWrite); + writeCompressing(LastHttpContent.EMPTY_LAST_CONTENT, true, outboundAccess.closeAfterWrite); } requestHandler.responseWritten(outboundAccess.attachment); PipeliningServerHandler.this.writeSome(); @@ -1020,6 +1138,7 @@ public void onComplete() { @Override void discard() { + super.discard(); // this is safe because: // - onComplete/onError cannot have been called yet, because otherwise outboundHandler // would be null and discard couldn't have been called @@ -1078,20 +1197,21 @@ void writeSome() { } if (msg == null) { // this.done == true inside the synchronized block - write(LastHttpContent.EMPTY_LAST_CONTENT, true, false); + writeCompressing(LastHttpContent.EMPTY_LAST_CONTENT, true, false); outboundHandler = null; requestHandler.responseWritten(outboundAccess.attachment); PipeliningServerHandler.this.writeSome(); break; } else { - write(new DefaultHttpContent(msg), true, false); + writeCompressing(new DefaultHttpContent(msg), true, false); } } while (ctx.channel().isWritable()); } @Override void discard() { + super.discard(); discard = true; if (worker == null) { worker = blockingExecutor.submit(this::work); diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/CompressionSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/CompressionSpec.groovy new file mode 100644 index 00000000000..a56eaa244f2 --- /dev/null +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/CompressionSpec.groovy @@ -0,0 +1,73 @@ +package io.micronaut.http.server.netty + +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Requires +import io.micronaut.http.annotation.Controller +import io.micronaut.http.annotation.Get +import io.micronaut.runtime.server.EmbeddedServer +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufUtil +import io.netty.buffer.Unpooled +import io.netty.channel.ChannelHandler +import io.netty.channel.embedded.EmbeddedChannel +import io.netty.handler.codec.compression.SnappyFrameDecoder +import io.netty.handler.codec.compression.ZlibCodecFactory +import io.netty.handler.codec.compression.ZlibWrapper +import io.netty.handler.codec.http.HttpHeaderValues +import spock.lang.Specification + +import java.util.concurrent.ThreadLocalRandom + +class CompressionSpec extends Specification { + def compression(ChannelHandler decompressor, CharSequence contentEncoding) { + given: + def ctx = ApplicationContext.run(['spec.name': 'CompressionSpec']) + def server = ctx.getBean(EmbeddedServer).start() + + byte[] uncompressed = new byte[1024] + ThreadLocalRandom.current().nextBytes(uncompressed) + ctx.getBean(Ctrl).data = uncompressed + + def connection = new URL("$server.URI/compress").openConnection() + connection.addRequestProperty("Accept-Encoding", contentEncoding.toString()) + + when: + byte[] compressed = connection.inputStream.readAllBytes() + def compChannel = new EmbeddedChannel(decompressor) + compChannel.writeInbound(Unpooled.copiedBuffer(compressed)) + compChannel.finish() + ByteBuf decompressed = Unpooled.buffer() + while (true) { + ByteBuf o = compChannel.readInbound() + if (o == null) { + break + } + decompressed.writeBytes(o) + o.release() + } + then: + ByteBufUtil.getBytes(decompressed) == uncompressed + + cleanup: + connection.inputStream.close() + server.stop() + ctx.close() + + where: + contentEncoding | decompressor + HttpHeaderValues.GZIP | ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP) + HttpHeaderValues.DEFLATE | ZlibCodecFactory.newZlibDecoder(ZlibWrapper.ZLIB) + HttpHeaderValues.SNAPPY | new SnappyFrameDecoder() + } + + @Requires(property = "spec.name", value = "CompressionSpec") + @Controller + static class Ctrl { + byte[] data + + @Get("/compress") + byte[] send() { + return data + } + } +} diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/SmartHttpContentCompressorSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/SmartHttpContentCompressorSpec.groovy deleted file mode 100644 index 019e7760143..00000000000 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/SmartHttpContentCompressorSpec.groovy +++ /dev/null @@ -1,37 +0,0 @@ -package io.micronaut.http.server.netty - -import io.netty.handler.codec.http.* -import spock.lang.Specification -import spock.lang.Unroll - -class SmartHttpContentCompressorSpec extends Specification { - - private static String compressible = "text/html" - private static String inCompressible = "image/png" - - @Unroll - void "test #type with #length"() { - expect: - HttpHeaders headers = new DefaultHttpHeaders() - if (type != null) { - headers.add(HttpHeaderNames.CONTENT_TYPE, type) - } - if (length != null) { - headers.add(HttpHeaderNames.CONTENT_LENGTH, length) - } - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, headers) - new SmartHttpContentCompressor(new DefaultHttpCompressionStrategy(1024, 6)).shouldSkip(response) == expected - - where: - type | length | expected - compressible | 1024 | false // compressible type and equal to 1k - compressible | 1023 | true // compressible type but smaller than 1k - compressible | null | false // compressible type but unknown size - compressible | 0 | true // compressible type no content - inCompressible | 1 | true // incompressible, always skip - inCompressible | 5000 | true // incompressible, always skip - inCompressible | null | true // incompressible, always skip - inCompressible | 0 | true // incompressible, always skip - null | null | true // if the content type is unknown, skip - } -}