Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix request backpressure #10142

Merged
merged 6 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import io.micronaut.core.annotation.NonNull;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import org.reactivestreams.Publisher;

import java.io.RandomAccessFile;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
* This interface is used to write the different kinds of netty responses.
Expand Down Expand Up @@ -68,20 +68,11 @@ default void writeFull(@NonNull FullHttpResponse response) {
void writeStreamed(@NonNull HttpResponse response, @NonNull Publisher<HttpContent> content);

/**
* Write a response with a {@link HttpChunkedInput} body.
* Write a response with a body that is a blocking stream.
*
* @param response The response. <b>Must not</b> be a {@link FullHttpResponse}
* @param chunkedInput The response body
* @param response The response. <b>Must not</b> be a {@link FullHttpResponse}
* @param stream The stream to read from
* @param executorService The executor for IO operations
*/
void writeChunked(@NonNull HttpResponse response, @NonNull HttpChunkedInput chunkedInput);

/**
* Write a response with a body that is a section of a {@link RandomAccessFile}.
*
* @param response The response. <b>Must not</b> be a {@link FullHttpResponse}
* @param randomAccessFile File to read from
* @param position Start position
* @param contentLength Length of the section to send
*/
void writeFile(@NonNull HttpResponse response, @NonNull RandomAccessFile randomAccessFile, long position, long contentLength);
void writeStream(@NonNull HttpResponse response, @NonNull InputStream stream, @NonNull ExecutorService executorService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3FrameToHttpObjectCodec;
Expand Down Expand Up @@ -587,23 +586,20 @@ private void insertHttp2DownstreamHandlers() {

registerMicronautChannelHandlers();

insertMicronautHandlers(false);
insertMicronautHandlers();
}

/**
* Insert the handlers that manage the micronaut message handling, e.g. conversion between micronaut requests
* and netty requests, and routing.
*/
private void insertMicronautHandlers(boolean zeroCopySupported) {
private void insertMicronautHandlers() {
channel.attr(STREAM_PIPELINE_ATTRIBUTE.get()).set(this);
if (sslHandler != null) {
channel.attr(CERTIFICATE_SUPPLIER_ATTRIBUTE.get()).set(sslHandler.findPeerCert());
}

SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy());
if (zeroCopySupported) {
channel.attr(PipeliningServerHandler.ZERO_COPY_PREDICATE.get()).set(contentCompressor);
}
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor);
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_DECOMPRESSOR, new HttpContentDecompressor());

Expand All @@ -619,7 +615,6 @@ private void insertMicronautHandlers(boolean zeroCopySupported) {
)
);
}
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK, new ChunkedWriteHandler()); // todo: move to PipeliningServerHandler

RequestHandler requestHandler = routingInBoundHandler;
if (webSocketUpgradeHandler.isPresent()) {
Expand All @@ -645,7 +640,7 @@ private void insertHttp1DownstreamHandlers() {
registerMicronautChannelHandlers();
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_KEEP_ALIVE, new HttpServerKeepAliveHandler());

insertMicronautHandlers(sslHandler == null);
insertMicronautHandlers();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import io.micronaut.http.netty.body.NettyBodyWriter;
import io.micronaut.http.netty.body.NettyWriteContext;
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.stream.ChunkedStream;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;

/**
* Body writer for {@link InputStream}s.
Expand All @@ -45,9 +46,11 @@
@Experimental
@Singleton
public final class InputStreamBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter<InputStream> {
private final ExecutorService executorService;

InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) {
InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService executorService) {
super(configuration);
this.executorService = executorService;
}

@Override
Expand All @@ -59,7 +62,7 @@ public void writeTo(HttpRequest<?> request, MutableHttpResponse<InputStream> out
nettyResponse.getNettyHeaders()
);
// can be null if the stream was closed
nettyContext.writeChunked(finalResponse, new HttpChunkedInput(new ChunkedStream(object)));
nettyContext.writeStream(finalResponse, object, executorService);
} else {
throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + outgoingResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
import io.micronaut.http.netty.body.NettyWriteContext;
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
import io.micronaut.http.server.types.files.StreamedFile;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.stream.ChunkedStream;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;

/**
* Body writer for {@link StreamedFile}s.
Expand All @@ -49,8 +50,11 @@
@Experimental
@Internal
public final class StreamFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter<StreamedFile> {
StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) {
private final ExecutorService ioExecutor;

StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) {
super(configuration);
this.ioExecutor = ioExecutor;
}

@Override
Expand All @@ -72,8 +76,7 @@ public void writeTo(HttpRequest<?> request, MutableHttpResponse<StreamedFile> ou
nettyHeaders
);
InputStream inputStream = object.getInputStream();
HttpChunkedInput chunkedInput = new HttpChunkedInput(new ChunkedStream(inputStream));
nettyContext.writeChunked(finalResponse, chunkedInput);
nettyContext.writeStream(finalResponse, inputStream, ioExecutor);
}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,22 @@
import io.micronaut.http.netty.body.NettyWriteContext;
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
import io.micronaut.http.server.types.files.SystemFile;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jetbrains.annotations.NotNull;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;

import static io.micronaut.http.HttpHeaders.CONTENT_RANGE;

Expand All @@ -57,8 +64,11 @@
public final class SystemFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter<SystemFile> {
private static final String UNIT_BYTES = "bytes";

public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) {
private final ExecutorService ioExecutor;

public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) {
super(configuration);
this.ioExecutor = ioExecutor;
}

@Override
Expand Down Expand Up @@ -113,34 +123,22 @@ public void writeTo(HttpRequest<?> request, MutableHttpResponse<SystemFile> resp

// Write the request data
final DefaultHttpResponse finalResponse = new DefaultHttpResponse(nettyResponse.getNettyHttpVersion(), nettyResponse.getNettyHttpStatus(), nettyResponse.getNettyHeaders());
writeFile(systemFile, nettyContext, position, contentLength, finalResponse);

File file = systemFile.getFile();
InputStream is;
try {
is = new FileInputStream(file);
} catch (FileNotFoundException e) {
throw new MessageBodyException("Could not find file", e);
}

nettyContext.writeStream(finalResponse, new RangeInputStream(is, position, contentLength), ioExecutor);
}
} else {
throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + response);
}
}

private static void writeFile(SystemFile systemFile, NettyWriteContext context, long position, long contentLength, DefaultHttpResponse finalResponse) {
// Write the content.
File file = systemFile.getFile();
RandomAccessFile randomAccessFile = open(file);

context.writeFile(
finalResponse,
randomAccessFile,
position,
contentLength
);
}

private static RandomAccessFile open(File file) {
try {
return new RandomAccessFile(file, "r");
} catch (FileNotFoundException e) {
throw new MessageBodyException("Could not find file", e);
}
}

@Nullable
private static IntRange parseRangeHeader(String value, long contentLength) {
int equalsIdx = value.indexOf('=');
Expand Down Expand Up @@ -175,4 +173,72 @@ private static class IntRange {
}
}

private static final class RangeInputStream extends InputStream {
private final InputStream delegate;
private final long toSkip;
private long remainingLength;
private boolean skipped = false;
private boolean skipSuccess = false;

private RangeInputStream(InputStream delegate, long toSkip, long length) {
this.delegate = delegate;
this.toSkip = toSkip;
this.remainingLength = length;

if (toSkip == 0) {
skipped = true;
skipSuccess = true;
}
}

private boolean doSkip() throws IOException {
if (!skipped) {
skipped = true;
try {
delegate.skipNBytes(toSkip);
skipSuccess = true;
} catch (EOFException ignored) {
}
}
return skipSuccess;
}

@Override
public int read() throws IOException {
if (!doSkip()) {
return -1;
}
if (remainingLength <= 0) {
return -1;
}
int read = delegate.read();
if (read != -1) {
remainingLength--;
}
return read;
}

@Override
public int read(@NotNull byte[] b, int off, int len) throws IOException {
if (!doSkip()) {
return -1;
}
if (remainingLength <= 0) {
return -1;
}
if (len > remainingLength) {
len = (int) remainingLength;
}
int n = delegate.read(b, off, len);
if (n != -1) {
remainingLength -= n;
}
return n;
}

@Override
public void close() throws IOException {
delegate.close();
}
}
}
Loading