Skip to content

Commit

Permalink
Implement decompression in PipeliningServerHandler
Browse files Browse the repository at this point in the history
This patch implements the logic of HttpContentDecompressor in PipeliningServerHandler. This allows us to shrink the pipeline a little. The perf impact for uncompressed requests should basically be zero.

This builds on the changes in #10142.
  • Loading branch information
yawkat committed Nov 23, 2023
1 parent 29fe47d commit 869fa76
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
Expand Down Expand Up @@ -72,11 +71,6 @@
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand All @@ -90,6 +84,10 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Helper class that manages the {@link ChannelPipeline} of incoming HTTP connections.
Expand Down Expand Up @@ -601,7 +599,6 @@ private void insertMicronautHandlers() {

SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy());
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor);
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_DECOMPRESSOR, new HttpContentDecompressor());

Optional<NettyServerWebSocketUpgradeHandler> webSocketUpgradeHandler = embeddedServices.getWebSocketUpgradeHandler(server);
if (webSocketUpgradeHandler.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,22 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.compression.BrotliDecoder;
import io.netty.handler.codec.compression.SnappyFrameDecoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -46,15 +54,6 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
Expand All @@ -64,6 +63,14 @@
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

/**
* Netty handler that handles incoming {@link HttpRequest}s and forwards them to a
Expand Down Expand Up @@ -328,18 +335,81 @@ void read(Object message) {
HttpRequest request = (HttpRequest) message;
OutboundAccess outboundAccess = new OutboundAccess();
outboundQueue.add(outboundAccess);
if (request instanceof FullHttpRequest full) {
requestHandler.accept(ctx, full, outboundAccess);

HttpHeaders headers = request.headers();
String contentEncoding = getContentEncoding(headers);
EmbeddedChannel decompressionChannel;
if (contentEncoding == null) {
decompressionChannel = null;
} else if (HttpHeaderValues.GZIP.contentEqualsIgnoreCase(contentEncoding) ||
HttpHeaderValues.X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
decompressionChannel = new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
} else if (HttpHeaderValues.DEFLATE.contentEqualsIgnoreCase(contentEncoding) ||
HttpHeaderValues.X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
decompressionChannel = new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.ZLIB_OR_NONE));
} else if (Brotli.isAvailable() && HttpHeaderValues.BR.contentEqualsIgnoreCase(contentEncoding)) {
decompressionChannel = new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), new BrotliDecoder());
} else if (HttpHeaderValues.SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
decompressionChannel = new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), new SnappyFrameDecoder());
} else {
decompressionChannel = null;
}
if (decompressionChannel != null) {
headers.remove(HttpHeaderNames.CONTENT_LENGTH);
headers.remove(HttpHeaderNames.CONTENT_ENCODING);
headers.add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
}

boolean full = request instanceof FullHttpRequest;
if (full && decompressionChannel == null) {
requestHandler.accept(ctx, request, outboundAccess);
} else if (!hasBody(request)) {
inboundHandler = droppingInboundHandler;
if (message instanceof HttpContent) {
inboundHandler.read(message);
}
if (decompressionChannel != null) {
decompressionChannel.finish();
}
requestHandler.accept(ctx, new EmptyHttpRequest(request), outboundAccess);
} else {
optimisticBufferingInboundHandler.init(request, outboundAccess);
inboundHandler = optimisticBufferingInboundHandler;
if (decompressionChannel == null) {
inboundHandler = optimisticBufferingInboundHandler;
} else {
inboundHandler = new DecompressingInboundHandler(decompressionChannel, optimisticBufferingInboundHandler);
}
if (full) {
inboundHandler.read(new DefaultLastHttpContent(((FullHttpRequest) request).content()));
}
}
}

private static String getContentEncoding(HttpHeaders headers) {
// from io.netty.handler.codec.http.HttpContentDecoder

// Determine the content encoding.
String contentEncoding = headers.get(HttpHeaderNames.CONTENT_ENCODING);
if (contentEncoding != null) {
contentEncoding = contentEncoding.trim();
} else {
String transferEncoding = headers.get(HttpHeaderNames.TRANSFER_ENCODING);
if (transferEncoding != null) {
int idx = transferEncoding.indexOf(",");
if (idx != -1) {
contentEncoding = transferEncoding.substring(0, idx).trim();
} else {
contentEncoding = transferEncoding.trim();
}
} else {
contentEncoding = null;
}
}
return contentEncoding;
}

@Override
Expand All @@ -360,7 +430,6 @@ private final class OptimisticBufferingInboundHandler extends InboundHandler {

void init(HttpRequest request, OutboundAccess outboundAccess) {
assert buffer.isEmpty();
assert !(request instanceof HttpContent);
this.request = request;
this.outboundAccess = outboundAccess;
}
Expand Down Expand Up @@ -429,7 +498,11 @@ private void devolveToStreaming() {
this.request = null;
this.outboundAccess = null;

inboundHandler = streamingInboundHandler;
if (inboundHandler == this) {
inboundHandler = streamingInboundHandler;
} else {
((DecompressingInboundHandler) inboundHandler).delegate = streamingInboundHandler;
}
Flux<HttpContent> flux = streamingInboundHandler.flux();
if (HttpUtil.is100ContinueExpected(request)) {
flux = flux.doOnSubscribe(s -> outboundAccess.writeContinue());
Expand Down Expand Up @@ -507,11 +580,7 @@ void closeIfNoSubscriber() {
}

if (sink.currentSubscriberCount() == 0) {
releaseQueue();
if (inboundHandler == this) {
inboundHandler = droppingInboundHandler;
refreshNeedMore();
}
cancelImpl();
}
}

Expand All @@ -532,14 +601,77 @@ private void cancel() {
return;
}

cancelImpl();
}

private void cancelImpl() {
if (inboundHandler == this) {
inboundHandler = droppingInboundHandler;
refreshNeedMore();
} else if (inboundHandler instanceof DecompressingInboundHandler dec && dec.delegate == this) {
dec.dispose();
inboundHandler = droppingInboundHandler;
refreshNeedMore();
}
releaseQueue();
}
}

private class DecompressingInboundHandler extends InboundHandler {
private final EmbeddedChannel channel;
private InboundHandler delegate;

public DecompressingInboundHandler(EmbeddedChannel channel, InboundHandler delegate) {
this.channel = channel;
this.delegate = delegate;
}

@Override
void read(Object message) {
ByteBuf compressed = ((HttpContent) message).content();
if (!compressed.isReadable()) {
delegate.read(message);
return;
}

channel.writeInbound(compressed);
boolean last = message instanceof LastHttpContent;
if (last) {
channel.finish();
}

while (true) {
ByteBuf decompressed = channel.readInbound();
if (decompressed == null) {
break;
}
if (!decompressed.isReadable()) {
decompressed.release();
continue;
}
delegate.read(new DefaultHttpContent(decompressed));
}

if (last) {
delegate.read(LastHttpContent.EMPTY_LAST_CONTENT);
}
}

void dispose() {
channel.finishAndReleaseAll();
}

@Override
void readComplete() {
delegate.readComplete();
}

@Override
void handleUpstreamError(Throwable cause) {
delegate.handleUpstreamError(cause);
}
}

/**
* Handler that drops all incoming content.
*/
Expand Down
Loading

0 comments on commit 869fa76

Please sign in to comment.