From cc6f05f5edc79189eaa3c0bce002670044e55d4b Mon Sep 17 00:00:00 2001 From: "Haiyang.Hu" Date: Wed, 10 Apr 2024 15:26:51 +0800 Subject: [PATCH] HDFS-17455. Modify patch based on comments --- .../apache/hadoop/hdfs/DFSInputStream.java | 5 ++- .../hadoop/hdfs/TestDFSInputStream.java | 36 ++++++++----------- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index da7978d48fa1c..90bf1baade73e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -520,9 +520,8 @@ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache) // Update the LastLocatedBlock, if offset is for last block. if (offset >= locatedBlocks.getFileLength()) { setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks)); - // Here locatedBlocks has been updated, need to check offset again. - // If offset to the portion of the last block, will return the last block, - // otherwise the block containing the specified offset needs to be searched again. + // After updating the locatedBlock, the block to which the offset belongs + // should be researched like {@link DFSInputStream#getBlockAt(long)}. if (offset >= locatedBlocks.getFileLength()) { return locatedBlocks.getLastLocatedBlock(); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java index d3932198a940a..b2b18237a752a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; -import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -31,18 +29,15 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; import java.util.Random; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -50,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; @@ -305,32 +301,29 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { public void testCreateBlockReaderWhenInvalidBlockTokenException() throws IOException, InterruptedException, TimeoutException { GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); - Configuration conf = new Configuration(); + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 64 * 1024); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 516); DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get(); + FSDataOutputStream out = null; try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); + + // Create file which only contains one UC block. String file = "/testfile"; Path path = new Path(file); - long fileLen = 1024 * 64; - EnumSet createFlags = EnumSet.of(CREATE); - FSDataOutputStream out = fs.create(path, FsPermission.getFileDefault(), createFlags, - fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3, - fs.getDefaultBlockSize(path), null); - int bufferLen = 1024; + out = fs.create(path, (short) 3); + int bufferLen = 5120; byte[] toWrite = new byte[bufferLen]; Random rb = new Random(0); - long bytesToWrite = fileLen; - while (bytesToWrite > 0) { - rb.nextBytes(toWrite); - int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen : (int) bytesToWrite; - out.write(toWrite, 0, bytesToWriteNext); - bytesToWrite -= bytesToWriteNext; - } + rb.nextBytes(toWrite); + out.write(toWrite, 0, bufferLen); + // Wait for the block length of the file to be 1. GenericTestUtils.waitFor(() -> { try { - return fs.getFileBlockLocations(path, 0, fileLen).length == 1; + return fs.getFileBlockLocations(path, 0, bufferLen).length == 1; } catch (IOException e) { return false; } @@ -356,13 +349,14 @@ public Void answer(InvocationOnMock invocation) throws Throwable { false, null)) { int bufLen = 1024; byte[] buf = new byte[bufLen]; - //Seek the offset to 1024. + // Seek the offset to 1024 and which should be in the range (0, fileSize). in.seek(1024); int read = in.read(buf, 0, bufLen); assertEquals(1024, read); } } finally { DFSClientFaultInjector.set(oldFaultInjector); + IOUtils.closeStream(out); } } }