Skip to content

Commit

Permalink
Compression support in PipeliningServerHandler (#10246)
Browse files Browse the repository at this point in the history
* Replace chunked write API
This PR replaces the writeChunked and writeFile APIs with a new writeStream API that takes an InputStream. This removes the need for the ChunkedWriteHandler.

Chunked writes were used for two purposes: Sending file regions and sending InputStreams. This has always complicated the HTTP pipeline somewhat as the pipeline had to deal with not just HttpContent objects but also ChunkedInput and FileRegion objects.

This PR replaces the machinery for InputStream writing with a more straight-forward solution that reads the data on the IO thread and then sends it down the channel.

Additionally, the file-specific APIs based on RandomAccessFile are removed. The body writer now just creates an InputStream for the file region in question and sends that. This removes support for zero-copy transfers, however that is a niche feature anyway because it doesn't work with TLS or HTTP/2. If someone wants a performant HTTP server, HTTP/2 takes priority over zero-copy so it makes little sense.

This PR may have small conflicts with #10131 as that PR changed the PipeliningServerHandler body handling a little bit. Otherwise this PR should have no visible impact on users.

* remove unused class

* remove unused class

* Fix request backpressure
PipeliningServerHandler was supposed to implement backpressure, but it turns out that auto read was still enabled and that the implementation didn't really work. This means that it would keep reading even if that means buffering data when the downstream can't keep up.

This PR disables auto read and fixes the read implementation in PipeliningServerHandler. In principle there should be no change to users, this just means that instead of buffering any incoming data internally, backpressure is now applied to the client.

This PR is based on #10138 but is separate for easier review. It also has conflicts with #10131.

* Implement decompression in PipeliningServerHandler
This patch implements the logic of HttpContentDecompressor in PipeliningServerHandler. This allows us to shrink the pipeline a little. The perf impact for uncompressed requests should basically be zero.

This builds on the changes in #10142.

* address review

* revert

* add DecompressionSpec

* Compression support in PipeliningServerHandler
Like #10155
  • Loading branch information
yawkat authored Dec 12, 2023
1 parent 0252f00 commit 2288541
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 128 deletions.
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ asm = "9.6"
awaitility = "4.2.0"
bcpkix = "1.70"
blaze = "1.6.10"
brotli4j = "1.13.0"
caffeine = "2.9.3"
compile-testing = "0.21.0"

Expand Down Expand Up @@ -165,6 +166,8 @@ bcpkix = { module = "org.bouncycastle:bcpkix-jdk15on", version.ref = "bcpkix" }

blaze-persistence-core = { module = "com.blazebit:blaze-persistence-core-impl", version.ref = "blaze" }

brotli4j = { module = "com.aayushatharva.brotli4j:brotli4j", version.ref = "brotli4j" }

caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" }

compile-testing = { module = "com.google.testing.compile:compile-testing", version.ref = "compile-testing" }
Expand Down
1 change: 1 addition & 0 deletions http-server-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
compileOnly libs.managed.kotlin.stdlib
compileOnly libs.managed.netty.transport.native.unix.common
compileOnly libs.managed.netty.incubator.codec.http3
compileOnly libs.brotli4j

testImplementation libs.jmh.core
testAnnotationProcessor libs.jmh.generator.annprocess
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.micronaut.http.server.netty;

import io.netty.handler.codec.compression.StandardCompressionOptions;
import io.netty.handler.codec.http.HttpResponse;

/**
Expand All @@ -36,6 +37,6 @@ public interface HttpCompressionStrategy {
* @return The compression level (0-9)
*/
default int getCompressionLevel() {
return 6;
return StandardCompressionOptions.gzip().compressionLevel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,6 @@ private void insertMicronautHandlers() {
channel.attr(CERTIFICATE_SUPPLIER_ATTRIBUTE.get()).set(sslHandler.findPeerCert());
}

SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy());
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor);

Optional<NettyServerWebSocketUpgradeHandler> webSocketUpgradeHandler = embeddedServices.getWebSocketUpgradeHandler(server);
if (webSocketUpgradeHandler.isPresent()) {
pipeline.addLast(NettyServerWebSocketUpgradeHandler.COMPRESSION_HANDLER, new WebSocketServerCompressionHandler());
Expand All @@ -621,7 +618,9 @@ private void insertMicronautHandlers() {
if (server.getServerConfiguration().isDualProtocol() && server.getServerConfiguration().isHttpToHttpsRedirect() && sslHandler == null) {
requestHandler = new HttpToHttpsRedirectHandler(routingInBoundHandler.conversionService, server.getServerConfiguration(), sslConfiguration, hostResolver);
}
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_INBOUND, new PipeliningServerHandler(requestHandler));
PipeliningServerHandler pipeliningServerHandler = new PipeliningServerHandler(requestHandler);
pipeliningServerHandler.setCompressionStrategy(embeddedServices.getHttpCompressionStrategy());
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_INBOUND, pipeliningServerHandler);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.server.netty.handler;

import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.server.netty.HttpCompressionStrategy;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.compression.BrotliEncoder;
import io.netty.handler.codec.compression.BrotliOptions;
import io.netty.handler.codec.compression.DeflateOptions;
import io.netty.handler.codec.compression.GzipOptions;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.codec.compression.SnappyOptions;
import io.netty.handler.codec.compression.StandardCompressionOptions;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.compression.Zstd;
import io.netty.handler.codec.compression.ZstdEncoder;
import io.netty.handler.codec.compression.ZstdOptions;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpVersion;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

final class Compressor {
private final HttpCompressionStrategy strategy;
private final BrotliOptions brotliOptions;
private final GzipOptions gzipOptions;
private final DeflateOptions deflateOptions;
private final ZstdOptions zstdOptions;
private final SnappyOptions snappyOptions;

Compressor(HttpCompressionStrategy strategy) {
this.strategy = strategy;
// only use configured compression level for gzip and deflate, other algos have different semantics for the level
this.brotliOptions = Brotli.isAvailable() ? StandardCompressionOptions.brotli() : null;
GzipOptions stdGzip = StandardCompressionOptions.gzip();
this.gzipOptions = StandardCompressionOptions.gzip(strategy.getCompressionLevel(), stdGzip.windowBits(), stdGzip.memLevel());
DeflateOptions stdDeflate = StandardCompressionOptions.deflate();
this.deflateOptions = StandardCompressionOptions.deflate(strategy.getCompressionLevel(), stdDeflate.windowBits(), stdDeflate.memLevel());
this.zstdOptions = Zstd.isAvailable() ? StandardCompressionOptions.zstd() : null;
this.snappyOptions = StandardCompressionOptions.snappy();
}

@Nullable
ChannelHandler prepare(HttpRequest request, HttpResponse response) {
// from HttpContentEncoder: isPassthru
int code = response.status().code();
if (code < 200 || code == 204 || code == 304 ||
(request.method().equals(HttpMethod.HEAD) || (request.method().equals(HttpMethod.CONNECT) && code == 200)) ||
response.protocolVersion() == HttpVersion.HTTP_1_0) {
return null;
}
// special case for FHR to keep behavior identical to HttpContentEncoder
if (response instanceof FullHttpResponse fhr && !fhr.content().isReadable()) {
return null;
}
if (!strategy.shouldCompress(response)) {
return null;
}
if (response.headers().contains(HttpHeaderNames.CONTENT_ENCODING)) {
// already encoded
return null;
}
List<String> acceptEncoding = new ArrayList<>();
for (String s : request.headers().getAll(HttpHeaderNames.ACCEPT_ENCODING)) {
acceptEncoding.addAll(Arrays.asList(s.split(",")));
}
Algorithm encoding = determineEncoding(acceptEncoding);
if (encoding == null) {
return null;
}
response.headers().add(HttpHeaderNames.CONTENT_ENCODING, encoding.contentEncoding);
return switch (encoding) {
case BR -> makeBrotliEncoder();
case ZSTD -> new ZstdEncoder(zstdOptions.compressionLevel(), zstdOptions.blockSize(), zstdOptions.maxEncodeSize());
case SNAPPY -> new SnappyFrameEncoder();
case GZIP -> ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP, gzipOptions.compressionLevel(), gzipOptions.windowBits(), gzipOptions.memLevel());
case DEFLATE -> ZlibCodecFactory.newZlibEncoder(ZlibWrapper.ZLIB, deflateOptions.compressionLevel(), deflateOptions.windowBits(), deflateOptions.memLevel());
};
}

private BrotliEncoder makeBrotliEncoder() {
return new BrotliEncoder(brotliOptions.parameters());
}

@SuppressWarnings("FloatingPointEquality")
private Algorithm determineEncoding(List<String> acceptEncoding) {
// from HttpContentCompressor, slightly modified
float starQ = -1.0f;
float brQ = -1.0f;
float zstdQ = -1.0f;
float snappyQ = -1.0f;
float gzipQ = -1.0f;
float deflateQ = -1.0f;
for (String encoding : acceptEncoding) {
float q = 1.0f;
int equalsPos = encoding.indexOf('=');
if (equalsPos != -1) {
try {
q = Float.parseFloat(encoding.substring(equalsPos + 1));
} catch (NumberFormatException e) {
// Ignore encoding
q = 0.0f;
}
}
if (encoding.contains("*")) {
starQ = q;
} else if (encoding.contains("br") && q > brQ) {
brQ = q;
} else if (encoding.contains("zstd") && q > zstdQ) {
zstdQ = q;
} else if (encoding.contains("snappy") && q > snappyQ) {
snappyQ = q;
} else if (encoding.contains("gzip") && q > gzipQ) {
gzipQ = q;
} else if (encoding.contains("deflate") && q > deflateQ) {
deflateQ = q;
}
}
if (brQ > 0.0f || zstdQ > 0.0f || snappyQ > 0.0f || gzipQ > 0.0f || deflateQ > 0.0f) {
if (brQ != -1.0f && brQ >= zstdQ && this.brotliOptions != null) {
return Algorithm.BR;
} else if (zstdQ != -1.0f && zstdQ >= snappyQ && this.zstdOptions != null) {
return Algorithm.ZSTD;
} else if (snappyQ != -1.0f && snappyQ >= gzipQ && this.snappyOptions != null) {
return Algorithm.SNAPPY;
} else if (gzipQ != -1.0f && gzipQ >= deflateQ && this.gzipOptions != null) {
return Algorithm.GZIP;
} else if (deflateQ != -1.0f && this.deflateOptions != null) {
return Algorithm.DEFLATE;
}
}
if (starQ > 0.0f) {
if (brQ == -1.0f && this.brotliOptions != null) {
return Algorithm.BR;
}
if (zstdQ == -1.0f && this.zstdOptions != null) {
return Algorithm.ZSTD;
}
if (snappyQ == -1.0f && this.snappyOptions != null) {
return Algorithm.SNAPPY;
}
if (gzipQ == -1.0f && this.gzipOptions != null) {
return Algorithm.GZIP;
}
if (deflateQ == -1.0f && this.deflateOptions != null) {
return Algorithm.DEFLATE;
}
}
return null;
}

private enum Algorithm {
BR(HttpHeaderValues.BR),
ZSTD(HttpHeaderValues.ZSTD),
SNAPPY(HttpHeaderValues.SNAPPY),
GZIP(HttpHeaderValues.GZIP),
DEFLATE(HttpHeaderValues.DEFLATE);

final CharSequence contentEncoding;

Algorithm(CharSequence contentEncoding) {
this.contentEncoding = contentEncoding;
}
}
}
Loading

0 comments on commit 2288541

Please sign in to comment.