diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a398eefcf45..034b25ebf92 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,6 +3,7 @@ asm = "9.6" awaitility = "4.2.0" bcpkix = "1.70" blaze = "1.6.10" +brotli4j = "1.13.0" caffeine = "2.9.3" compile-testing = "0.21.0" @@ -165,6 +166,8 @@ bcpkix = { module = "org.bouncycastle:bcpkix-jdk15on", version.ref = "bcpkix" } blaze-persistence-core = { module = "com.blazebit:blaze-persistence-core-impl", version.ref = "blaze" } +brotli4j = { module = "com.aayushatharva.brotli4j:brotli4j", version.ref = "brotli4j" } + caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" } compile-testing = { module = "com.google.testing.compile:compile-testing", version.ref = "compile-testing" } diff --git a/http-server-netty/build.gradle b/http-server-netty/build.gradle index 2be633fec98..b7e29b4151c 100644 --- a/http-server-netty/build.gradle +++ b/http-server-netty/build.gradle @@ -32,6 +32,7 @@ dependencies { compileOnly libs.managed.kotlin.stdlib compileOnly libs.managed.netty.transport.native.unix.common compileOnly libs.managed.netty.incubator.codec.http3 + compileOnly libs.brotli4j testImplementation libs.jmh.core testAnnotationProcessor libs.jmh.generator.annprocess diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java index 47f24ebf789..008f99a57b0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpCompressionStrategy.java @@ -15,6 +15,7 @@ */ package io.micronaut.http.server.netty; +import io.netty.handler.codec.compression.StandardCompressionOptions; import io.netty.handler.codec.http.HttpResponse; /** @@ -36,6 +37,6 @@ public interface HttpCompressionStrategy { * @return The compression level (0-9) */ default int getCompressionLevel() { - return 6; + return StandardCompressionOptions.gzip().compressionLevel(); } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java index 30caf11207a..89b06e1487d 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java @@ -71,6 +71,11 @@ import io.netty.util.AsciiString; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLPeerUnverifiedException; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -84,10 +89,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.function.Supplier; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLPeerUnverifiedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Helper class that manages the {@link ChannelPipeline} of incoming HTTP connections. @@ -597,9 +598,6 @@ private void insertMicronautHandlers() { channel.attr(CERTIFICATE_SUPPLIER_ATTRIBUTE.get()).set(sslHandler.findPeerCert()); } - SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy()); - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor); - Optional webSocketUpgradeHandler = embeddedServices.getWebSocketUpgradeHandler(server); if (webSocketUpgradeHandler.isPresent()) { pipeline.addLast(NettyServerWebSocketUpgradeHandler.COMPRESSION_HANDLER, new WebSocketServerCompressionHandler()); @@ -621,7 +619,9 @@ private void insertMicronautHandlers() { if (server.getServerConfiguration().isDualProtocol() && server.getServerConfiguration().isHttpToHttpsRedirect() && sslHandler == null) { requestHandler = new HttpToHttpsRedirectHandler(routingInBoundHandler.conversionService, server.getServerConfiguration(), sslConfiguration, hostResolver); } - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_INBOUND, new PipeliningServerHandler(requestHandler)); + PipeliningServerHandler pipeliningServerHandler = new PipeliningServerHandler(requestHandler); + pipeliningServerHandler.setCompressionStrategy(embeddedServices.getHttpCompressionStrategy()); + pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_INBOUND, pipeliningServerHandler); } /** diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/SmartHttpContentCompressor.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/SmartHttpContentCompressor.java deleted file mode 100644 index 8ea5b258ec4..00000000000 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/SmartHttpContentCompressor.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.http.server.netty; - -import io.micronaut.core.annotation.Internal; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.HttpContentCompressor; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpResponse; - -import java.util.List; - -/** - * An extension of {@link HttpContentCompressor} that skips encoding if the content type is not compressible or if - * the content is too small. - * - * @author James Kleeh - * @since 1.0 - */ -@Internal -public class SmartHttpContentCompressor extends HttpContentCompressor { - - private final HttpCompressionStrategy httpCompressionStrategy; - private boolean skipEncoding = false; - - /** - * Creates a SmartHttpContentCompressor with the given compression logic. - * - * @param httpCompressionStrategy The compression strategy - */ - SmartHttpContentCompressor(HttpCompressionStrategy httpCompressionStrategy) { - super(httpCompressionStrategy.getCompressionLevel()); - this.httpCompressionStrategy = httpCompressionStrategy; - } - - /** - * Determines if encoding should occur based on the response. - * - * @param response The response - * @return True if the content should not be compressed - */ - public boolean shouldSkip(HttpResponse response) { - return !httpCompressionStrategy.shouldCompress(response); - } - - @Override - protected void encode(ChannelHandlerContext ctx, HttpObject msg, List out) throws Exception { - if (msg instanceof HttpResponse res) { - skipEncoding = shouldSkip(res); - } - super.encode(ctx, msg, out); - } - - @Override - protected Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception { - if (skipEncoding) { - return null; - } - return super.beginEncode(headers, acceptEncoding); - } -} diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Compressor.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Compressor.java new file mode 100644 index 00000000000..eb8068cdf8c --- /dev/null +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Compressor.java @@ -0,0 +1,188 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.server.netty.handler; + +import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.server.netty.HttpCompressionStrategy; +import io.netty.channel.ChannelHandler; +import io.netty.handler.codec.compression.Brotli; +import io.netty.handler.codec.compression.BrotliEncoder; +import io.netty.handler.codec.compression.BrotliOptions; +import io.netty.handler.codec.compression.DeflateOptions; +import io.netty.handler.codec.compression.GzipOptions; +import io.netty.handler.codec.compression.SnappyFrameEncoder; +import io.netty.handler.codec.compression.SnappyOptions; +import io.netty.handler.codec.compression.StandardCompressionOptions; +import io.netty.handler.codec.compression.ZlibCodecFactory; +import io.netty.handler.codec.compression.ZlibWrapper; +import io.netty.handler.codec.compression.Zstd; +import io.netty.handler.codec.compression.ZstdEncoder; +import io.netty.handler.codec.compression.ZstdOptions; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpVersion; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +final class Compressor { + private final HttpCompressionStrategy strategy; + private final BrotliOptions brotliOptions; + private final GzipOptions gzipOptions; + private final DeflateOptions deflateOptions; + private final ZstdOptions zstdOptions; + private final SnappyOptions snappyOptions; + + Compressor(HttpCompressionStrategy strategy) { + this.strategy = strategy; + // only use configured compression level for gzip and deflate, other algos have different semantics for the level + this.brotliOptions = Brotli.isAvailable() ? StandardCompressionOptions.brotli() : null; + GzipOptions stdGzip = StandardCompressionOptions.gzip(); + this.gzipOptions = StandardCompressionOptions.gzip(strategy.getCompressionLevel(), stdGzip.windowBits(), stdGzip.memLevel()); + DeflateOptions stdDeflate = StandardCompressionOptions.deflate(); + this.deflateOptions = StandardCompressionOptions.deflate(strategy.getCompressionLevel(), stdDeflate.windowBits(), stdDeflate.memLevel()); + this.zstdOptions = Zstd.isAvailable() ? StandardCompressionOptions.zstd() : null; + this.snappyOptions = StandardCompressionOptions.snappy(); + } + + @Nullable + ChannelHandler prepare(HttpRequest request, HttpResponse response) { + // from HttpContentEncoder: isPassthru + int code = response.status().code(); + if (code < 200 || code == 204 || code == 304 || + (request.method().equals(HttpMethod.HEAD) || (request.method().equals(HttpMethod.CONNECT) && code == 200)) || + response.protocolVersion() == HttpVersion.HTTP_1_0) { + return null; + } + // special case for FHR to keep behavior identical to HttpContentEncoder + if (response instanceof FullHttpResponse fhr && !fhr.content().isReadable()) { + return null; + } + if (!strategy.shouldCompress(response)) { + return null; + } + if (response.headers().contains(HttpHeaderNames.CONTENT_ENCODING)) { + // already encoded + return null; + } + List acceptEncoding = new ArrayList<>(); + for (String s : request.headers().getAll(HttpHeaderNames.ACCEPT_ENCODING)) { + acceptEncoding.addAll(Arrays.asList(s.split(","))); + } + Algorithm encoding = determineEncoding(acceptEncoding); + if (encoding == null) { + return null; + } + response.headers().add(HttpHeaderNames.CONTENT_ENCODING, encoding.contentEncoding); + return switch (encoding) { + case BR -> makeBrotliEncoder(); + case ZSTD -> new ZstdEncoder(zstdOptions.compressionLevel(), zstdOptions.blockSize(), zstdOptions.maxEncodeSize()); + case SNAPPY -> new SnappyFrameEncoder(); + case GZIP -> ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP, gzipOptions.compressionLevel(), gzipOptions.windowBits(), gzipOptions.memLevel()); + case DEFLATE -> ZlibCodecFactory.newZlibEncoder(ZlibWrapper.ZLIB, deflateOptions.compressionLevel(), deflateOptions.windowBits(), deflateOptions.memLevel()); + }; + } + + private BrotliEncoder makeBrotliEncoder() { + return new BrotliEncoder(brotliOptions.parameters()); + } + + @SuppressWarnings("FloatingPointEquality") + private Algorithm determineEncoding(List acceptEncoding) { + // from HttpContentCompressor, slightly modified + float starQ = -1.0f; + float brQ = -1.0f; + float zstdQ = -1.0f; + float snappyQ = -1.0f; + float gzipQ = -1.0f; + float deflateQ = -1.0f; + for (String encoding : acceptEncoding) { + float q = 1.0f; + int equalsPos = encoding.indexOf('='); + if (equalsPos != -1) { + try { + q = Float.parseFloat(encoding.substring(equalsPos + 1)); + } catch (NumberFormatException e) { + // Ignore encoding + q = 0.0f; + } + } + if (encoding.contains("*")) { + starQ = q; + } else if (encoding.contains("br") && q > brQ) { + brQ = q; + } else if (encoding.contains("zstd") && q > zstdQ) { + zstdQ = q; + } else if (encoding.contains("snappy") && q > snappyQ) { + snappyQ = q; + } else if (encoding.contains("gzip") && q > gzipQ) { + gzipQ = q; + } else if (encoding.contains("deflate") && q > deflateQ) { + deflateQ = q; + } + } + if (brQ > 0.0f || zstdQ > 0.0f || snappyQ > 0.0f || gzipQ > 0.0f || deflateQ > 0.0f) { + if (brQ != -1.0f && brQ >= zstdQ && this.brotliOptions != null) { + return Algorithm.BR; + } else if (zstdQ != -1.0f && zstdQ >= snappyQ && this.zstdOptions != null) { + return Algorithm.ZSTD; + } else if (snappyQ != -1.0f && snappyQ >= gzipQ && this.snappyOptions != null) { + return Algorithm.SNAPPY; + } else if (gzipQ != -1.0f && gzipQ >= deflateQ && this.gzipOptions != null) { + return Algorithm.GZIP; + } else if (deflateQ != -1.0f && this.deflateOptions != null) { + return Algorithm.DEFLATE; + } + } + if (starQ > 0.0f) { + if (brQ == -1.0f && this.brotliOptions != null) { + return Algorithm.BR; + } + if (zstdQ == -1.0f && this.zstdOptions != null) { + return Algorithm.ZSTD; + } + if (snappyQ == -1.0f && this.snappyOptions != null) { + return Algorithm.SNAPPY; + } + if (gzipQ == -1.0f && this.gzipOptions != null) { + return Algorithm.GZIP; + } + if (deflateQ == -1.0f && this.deflateOptions != null) { + return Algorithm.DEFLATE; + } + } + return null; + } + + private enum Algorithm { + BR(HttpHeaderValues.BR), + ZSTD(HttpHeaderValues.ZSTD), + SNAPPY(HttpHeaderValues.SNAPPY), + GZIP(HttpHeaderValues.GZIP), + DEFLATE(HttpHeaderValues.DEFLATE); + + final CharSequence contentEncoding; + + Algorithm(CharSequence contentEncoding) { + this.contentEncoding = contentEncoding; + } + } +} diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index d85aeade00a..50222dea4f0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -22,11 +22,13 @@ import io.micronaut.http.netty.stream.DelegateStreamedHttpRequest; import io.micronaut.http.netty.stream.EmptyHttpRequest; import io.micronaut.http.netty.stream.StreamedHttpResponse; +import io.micronaut.http.server.netty.HttpCompressionStrategy; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop; @@ -39,6 +41,7 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -92,6 +95,8 @@ public final class PipeliningServerHandler extends ChannelInboundHandlerAdapter private final InboundHandler baseInboundHandler = new MessageInboundHandler(); private final OptimisticBufferingInboundHandler optimisticBufferingInboundHandler = new OptimisticBufferingInboundHandler(); + private Compressor compressor; + /** * Current handler for inbound messages. */ @@ -133,6 +138,10 @@ public PipeliningServerHandler(RequestHandler requestHandler) { this.requestHandler = requestHandler; } + public void setCompressionStrategy(HttpCompressionStrategy compressionStrategy) { + this.compressor = new Compressor(compressionStrategy); + } + public static boolean canHaveBody(HttpResponseStatus status) { // All 1xx (Informational), 204 (No Content), and 304 (Not Modified) // responses do not include a message body @@ -334,7 +343,7 @@ private final class MessageInboundHandler extends InboundHandler { @Override void read(Object message) { HttpRequest request = (HttpRequest) message; - OutboundAccess outboundAccess = new OutboundAccess(); + OutboundAccess outboundAccess = new OutboundAccess(request); outboundQueue.add(outboundAccess); HttpHeaders headers = request.headers(); @@ -695,6 +704,10 @@ void handleUpstreamError(Throwable cause) { * Class that allows writing the response for the request this object is associated with. */ public final class OutboundAccess implements NettyWriteContext { + /** + * The request that caused this response. This is used for compression decisions. + */ + private final HttpRequest request; /** * The handler that will perform the actual write operation. */ @@ -702,7 +715,8 @@ public final class OutboundAccess implements NettyWriteContext { private Object attachment = null; private boolean closeAfterWrite = false; - private OutboundAccess() { + private OutboundAccess(HttpRequest request) { + this.request = request; } @Override @@ -797,7 +811,9 @@ public void writeFull(FullHttpResponse response, boolean headResponse) { response.headers().remove(HttpHeaderNames.CONTENT_LENGTH); } preprocess(response); - write(new FullOutboundHandler(this, response)); + FullOutboundHandler oh = new FullOutboundHandler(this, response); + prepareCompression(response, oh); + write(oh); } @Override @@ -809,26 +825,120 @@ public void writeStreamed(HttpResponse response, Publisher content) response.headers().remove(HttpHeaderNames.TRANSFER_ENCODING); } preprocess(response); - content.subscribe(new StreamingOutboundHandler(this, response)); + StreamingOutboundHandler oh = new StreamingOutboundHandler(this, response); + prepareCompression(response, oh); + content.subscribe(oh); } @Override public void writeStream(HttpResponse response, InputStream stream, ExecutorService executorService) { preprocess(response); - write(new BlockingOutboundHandler(this, response, stream, executorService)); + BlockingOutboundHandler oh = new BlockingOutboundHandler(this, response, stream, executorService); + prepareCompression(response, oh); + write(oh); + } + + private void prepareCompression(HttpResponse response, OutboundHandler outboundHandler) { + if (compressor == null) { + return; + } + ChannelHandler compressionHandler = compressor.prepare(request, response); + if (compressionHandler != null) { + // if content-length and transfer-encoding are unset, we will close anyway. + // if this is a full response, there's special handling below in OutboundHandler + if (!(response instanceof FullHttpResponse) && response.headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { + response.headers().remove(HttpHeaderNames.CONTENT_LENGTH); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + } + outboundHandler.compressionChannel = new EmbeddedChannel( + ctx.channel().id(), + ctx.channel().metadata().hasDisconnect(), + ctx.channel().config(), + compressionHandler + ); + } } } - private abstract static class OutboundHandler { + private abstract class OutboundHandler { /** * {@link OutboundAccess} that created this handler, for metadata access. */ final OutboundAccess outboundAccess; + EmbeddedChannel compressionChannel; + private OutboundHandler(OutboundAccess outboundAccess) { this.outboundAccess = outboundAccess; } + protected final void writeCompressing(HttpContent content, @SuppressWarnings("SameParameterValue") boolean flush, boolean close) { + if (this.compressionChannel == null) { + write(content, flush, close); + } else { + // slow path + writeCompressing0(content, flush, close); + } + } + + private void writeCompressing0(HttpContent content, boolean flush, boolean close) { + EmbeddedChannel compressionChannel = this.compressionChannel; + if (content.content().isReadable()) { + compressionChannel.writeOutbound(content.content()); + } else { + content.content().release(); + } + boolean last = content instanceof LastHttpContent; + if (last) { + compressionChannel.finish(); + this.compressionChannel = null; + } + if (content instanceof HttpResponse hr) { + assert last; + + // fix content-length if necessary + if (hr.headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { + long newContentLength = 0; + for (Object outboundMessage : compressionChannel.outboundMessages()) { + newContentLength += ((ByteBuf) outboundMessage).readableBytes(); + } + hr.headers().set(HttpHeaderNames.CONTENT_LENGTH, newContentLength); + } + + // this can happen in FullHttpResponse, just send the full body. + write(new DefaultHttpResponse(hr.protocolVersion(), hr.status(), hr.headers()), false, false); + } + // this is a bit awkward. we go over all the compressed data, *except the last buffer*, and forward them with + // flush=false and close=false. only for the last buffer we use those flags. + ByteBuf nextToSend = null; + while (true) { + ByteBuf buf = compressionChannel.readOutbound(); + if (buf == null) { + break; + } + if (nextToSend != null) { + write(new DefaultHttpContent(nextToSend), false, false); + } + nextToSend = buf; + } + // send the last buffer with the flags. + if (nextToSend == null) { + if (last) { + HttpHeaders trailingHeaders = ((LastHttpContent) content).trailingHeaders(); + write(trailingHeaders.isEmpty() ? LastHttpContent.EMPTY_LAST_CONTENT : new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, trailingHeaders), flush, close); + } else if (flush || close) { + // not sure if this can actually happen, but we need to forward a flush/close + write(new DefaultHttpContent(Unpooled.EMPTY_BUFFER), flush, close); + } // else just don't send anything + } else { + if (last) { + write(new DefaultLastHttpContent(nextToSend, ((LastHttpContent) content).trailingHeaders()), flush, close); + } else { + write(new DefaultHttpContent(nextToSend), flush, close); + } + } + } + /** * Write some data to the channel. */ @@ -837,7 +947,13 @@ private OutboundHandler(OutboundAccess outboundAccess) { /** * Discard the remaining data. */ - abstract void discard(); + void discard() { + EmbeddedChannel compressionChannel = this.compressionChannel; + if (compressionChannel != null) { + compressionChannel.finishAndReleaseAll(); + this.compressionChannel = null; + } + } } /** @@ -867,6 +983,7 @@ void writeSome() { @Override void discard() { + super.discard(); if (next != null) { next.discard(); next = null; @@ -887,7 +1004,7 @@ private final class FullOutboundHandler extends OutboundHandler { @Override void writeSome() { - write(message, true, outboundAccess.closeAfterWrite); + writeCompressing(message, true, outboundAccess.closeAfterWrite); outboundHandler = null; requestHandler.responseWritten(outboundAccess.attachment); PipeliningServerHandler.this.writeSome(); @@ -895,6 +1012,7 @@ void writeSome() { @Override void discard() { + super.discard(); message.release(); outboundHandler = null; } @@ -962,7 +1080,7 @@ public void onNext(HttpContent httpContent) { if (httpContent instanceof LastHttpContent) { writtenLast = true; } - write(httpContent, true, false); + writeCompressing(httpContent, true, false); if (ctx.channel().isWritable()) { subscription.request(1); } @@ -1011,7 +1129,7 @@ public void onComplete() { } if (!writtenLast) { - write(LastHttpContent.EMPTY_LAST_CONTENT, true, outboundAccess.closeAfterWrite); + writeCompressing(LastHttpContent.EMPTY_LAST_CONTENT, true, outboundAccess.closeAfterWrite); } requestHandler.responseWritten(outboundAccess.attachment); PipeliningServerHandler.this.writeSome(); @@ -1020,6 +1138,7 @@ public void onComplete() { @Override void discard() { + super.discard(); // this is safe because: // - onComplete/onError cannot have been called yet, because otherwise outboundHandler // would be null and discard couldn't have been called @@ -1078,20 +1197,21 @@ void writeSome() { } if (msg == null) { // this.done == true inside the synchronized block - write(LastHttpContent.EMPTY_LAST_CONTENT, true, false); + writeCompressing(LastHttpContent.EMPTY_LAST_CONTENT, true, false); outboundHandler = null; requestHandler.responseWritten(outboundAccess.attachment); PipeliningServerHandler.this.writeSome(); break; } else { - write(new DefaultHttpContent(msg), true, false); + writeCompressing(new DefaultHttpContent(msg), true, false); } } while (ctx.channel().isWritable()); } @Override void discard() { + super.discard(); discard = true; if (worker == null) { worker = blockingExecutor.submit(this::work); diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/CompressionSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/CompressionSpec.groovy new file mode 100644 index 00000000000..a56eaa244f2 --- /dev/null +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/CompressionSpec.groovy @@ -0,0 +1,73 @@ +package io.micronaut.http.server.netty + +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Requires +import io.micronaut.http.annotation.Controller +import io.micronaut.http.annotation.Get +import io.micronaut.runtime.server.EmbeddedServer +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufUtil +import io.netty.buffer.Unpooled +import io.netty.channel.ChannelHandler +import io.netty.channel.embedded.EmbeddedChannel +import io.netty.handler.codec.compression.SnappyFrameDecoder +import io.netty.handler.codec.compression.ZlibCodecFactory +import io.netty.handler.codec.compression.ZlibWrapper +import io.netty.handler.codec.http.HttpHeaderValues +import spock.lang.Specification + +import java.util.concurrent.ThreadLocalRandom + +class CompressionSpec extends Specification { + def compression(ChannelHandler decompressor, CharSequence contentEncoding) { + given: + def ctx = ApplicationContext.run(['spec.name': 'CompressionSpec']) + def server = ctx.getBean(EmbeddedServer).start() + + byte[] uncompressed = new byte[1024] + ThreadLocalRandom.current().nextBytes(uncompressed) + ctx.getBean(Ctrl).data = uncompressed + + def connection = new URL("$server.URI/compress").openConnection() + connection.addRequestProperty("Accept-Encoding", contentEncoding.toString()) + + when: + byte[] compressed = connection.inputStream.readAllBytes() + def compChannel = new EmbeddedChannel(decompressor) + compChannel.writeInbound(Unpooled.copiedBuffer(compressed)) + compChannel.finish() + ByteBuf decompressed = Unpooled.buffer() + while (true) { + ByteBuf o = compChannel.readInbound() + if (o == null) { + break + } + decompressed.writeBytes(o) + o.release() + } + then: + ByteBufUtil.getBytes(decompressed) == uncompressed + + cleanup: + connection.inputStream.close() + server.stop() + ctx.close() + + where: + contentEncoding | decompressor + HttpHeaderValues.GZIP | ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP) + HttpHeaderValues.DEFLATE | ZlibCodecFactory.newZlibDecoder(ZlibWrapper.ZLIB) + HttpHeaderValues.SNAPPY | new SnappyFrameDecoder() + } + + @Requires(property = "spec.name", value = "CompressionSpec") + @Controller + static class Ctrl { + byte[] data + + @Get("/compress") + byte[] send() { + return data + } + } +} diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/SmartHttpContentCompressorSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/SmartHttpContentCompressorSpec.groovy deleted file mode 100644 index 019e7760143..00000000000 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/SmartHttpContentCompressorSpec.groovy +++ /dev/null @@ -1,37 +0,0 @@ -package io.micronaut.http.server.netty - -import io.netty.handler.codec.http.* -import spock.lang.Specification -import spock.lang.Unroll - -class SmartHttpContentCompressorSpec extends Specification { - - private static String compressible = "text/html" - private static String inCompressible = "image/png" - - @Unroll - void "test #type with #length"() { - expect: - HttpHeaders headers = new DefaultHttpHeaders() - if (type != null) { - headers.add(HttpHeaderNames.CONTENT_TYPE, type) - } - if (length != null) { - headers.add(HttpHeaderNames.CONTENT_LENGTH, length) - } - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, headers) - new SmartHttpContentCompressor(new DefaultHttpCompressionStrategy(1024, 6)).shouldSkip(response) == expected - - where: - type | length | expected - compressible | 1024 | false // compressible type and equal to 1k - compressible | 1023 | true // compressible type but smaller than 1k - compressible | null | false // compressible type but unknown size - compressible | 0 | true // compressible type no content - inCompressible | 1 | true // incompressible, always skip - inCompressible | 5000 | true // incompressible, always skip - inCompressible | null | true // incompressible, always skip - inCompressible | 0 | true // incompressible, always skip - null | null | true // if the content type is unknown, skip - } -}