Skip to content

Commit

Permalink
HADOOP-19098. Vector IO: Specify and validate ranges consistently. #6604
Browse files Browse the repository at this point in the history


Clarifies behaviour of VectorIO methods with contract tests as well as
specification.

* Add precondition range checks to all implementations
* Identify and fix bug where direct buffer reads was broken
  (HADOOP-19101; this surfaced in ABFS contract tests)
* Logging in VectoredReadUtils.
* TestVectoredReadUtils verifies validation logic.
* FileRangeImpl toString() improvements
* CombinedFileRange tracks bytes in range which are wanted;
   toString() output logs this.

HDFS
* Add test TestHDFSContractVectoredRead

ABFS
* Add test ITestAbfsFileSystemContractVectoredRead

S3A
* checks for vector IO being stopped in all iterative
  vector operations, including draining
* maps read() returning -1 to failure
* passes in file length to validation
* Error reporting to only completeExceptionally() those ranges
  which had not yet read data in.
* Improved logging.

readVectored()
* made synchronized. This is only for the invocation;
  the actual async retrieves are unsynchronized.
* closes input stream on invocation
* switches to random IO, so avoids keeping any long-lived connection around.

+ AbstractSTestS3AHugeFiles enhancements.
+ ADDENDUM: test fix in ITestS3AContractVectoredRead

Contains: HADOOP-19101. Vectored Read into off-heap buffer broken in fallback
implementation

Contributed by Steve Loughran

Change-Id: Ia4ed71864c595f175c275aad83a2ff5741693432
  • Loading branch information
steveloughran committed Apr 3, 2024
1 parent b4f9d8e commit 87fb977
Show file tree
Hide file tree
Showing 24 changed files with 1,830 additions and 940 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.IntFunction;
Expand All @@ -52,9 +53,9 @@
import org.apache.hadoop.util.Progressable;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;

/****************************************************************
* Abstract Checksumed FileSystem.
Expand Down Expand Up @@ -425,41 +426,31 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
}

/**
* Validates range parameters.
* In case of CheckSum FS, we already have calculated
* fileLength so failing fast here.
* @param ranges requested ranges.
* @param fileLength length of file.
* @throws EOFException end of file exception.
* Vectored read.
* If the file has no checksums: delegate to the underlying stream.
* If the file is checksummed: calculate the checksum ranges as
* well as the data ranges, read both, and validate the checksums
* as well as returning the data.
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @throws IOException
*/
private void validateRangeRequest(List<? extends FileRange> ranges,
final long fileLength) throws EOFException {
for (FileRange range : ranges) {
VectoredReadUtils.validateRangeRequest(range);
if (range.getOffset() + range.getLength() > fileLength) {
final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s",
range.getOffset(), range.getLength(), file);
LOG.warn(errMsg);
throw new EOFException(errMsg);
}
}
}

@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
final long length = getFileLength();
validateRangeRequest(ranges, length);

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
datas.readVectored(ranges, allocate);
return;
}
final long length = getFileLength();
final List<? extends FileRange> sorted = validateAndSortRanges(ranges,
Optional.of(length));
int minSeek = minSeekForVectorReads();
int maxSize = maxReadSizeForVectorReads();
List<CombinedFileRange> dataRanges =
VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum,
VectoredReadUtils.mergeSortedRanges(sorted, bytesPerSum,
minSeek, maxReadSizeForVectorReads());
// While merging the ranges above, they are rounded up based on the value of bytesPerSum
// which leads to some ranges crossing the EOF thus they need to be fixed else it will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ default int maxReadSizeForVectorReads() {
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @throws IOException any IOE.
* @throws IllegalArgumentException if the any of ranges are invalid, or they overlap.
*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
Expand Down Expand Up @@ -319,10 +319,11 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {

List<? extends FileRange> sortedRanges = Arrays.asList(sortRanges(ranges));
// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
Optional.empty());
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) {
VectoredReadUtils.validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
Expand Down
Loading

0 comments on commit 87fb977

Please sign in to comment.