diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f92a2ad56581b9..57a09aaf820421 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1603,6 +1603,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_PIPELINE_SLOWNODE_ENABLED_DEFAULT = false; public static final String DFS_PIPELINE_CONGESTION_RATIO = "dfs.pipeline.congestion.ratio"; public static final double DFS_PIPELINE_CONGESTION_RATIO_DEFAULT = 1.5; + public static final String DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY = + "dfs.datanode.read.blockid.counts.metric.enabled"; + public static final boolean DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_DEFAULT = false; // Key Provider Cache Expiry public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index e4d20d2f2f57b7..8652096f300bec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -467,6 +467,7 @@ public static InetSocketAddress createSocketAddr(String target) { private static final int NUM_CORES = Runtime.getRuntime() .availableProcessors(); private final double congestionRatio; + private final boolean readBlockIdCountsEnabled; private DiskBalancer diskBalancer; private DataSetLockManager dataSetLockManager; @@ -494,7 +495,7 @@ private static Tracer createTracer(Configuration conf) { private DataTransferThrottler ecReconstuctReadThrottler; private DataTransferThrottler ecReconstuctWriteThrottler; - private AtomicLongMap readBlockIdCounts; + private final AtomicLongMap readBlockIdCounts; /** * Creates a dummy DataNode for testing purpose. @@ -525,6 +526,10 @@ private static Tracer createTracer(Configuration conf) { DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT); this.congestionRatio = congestionRationTmp > 0 ? congestionRationTmp : DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT; + this.readBlockIdCountsEnabled = + conf.getBoolean(DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_DEFAULT); + readBlockIdCounts = readBlockIdCountsEnabled ? AtomicLongMap.create() : null; } /** @@ -628,7 +633,10 @@ public Map load(String key) { DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT); this.congestionRatio = congestionRationTmp > 0 ? congestionRationTmp : DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT; - readBlockIdCounts = AtomicLongMap.create(); + this.readBlockIdCountsEnabled = + conf.getBoolean(DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_DEFAULT); + this.readBlockIdCounts = readBlockIdCountsEnabled ? AtomicLongMap.create() : null; } @Override // ReconfigurableBase @@ -4393,16 +4401,20 @@ public BlockPoolManager getBlockPoolManager() { } public void incrReadBlockIdCounts(String blockId) { - readBlockIdCounts.incrementAndGet(blockId); + if (readBlockIdCountsEnabled) { + readBlockIdCounts.incrementAndGet(blockId); + } } public void decrReadBlockIdCounts(String blockId) { - readBlockIdCounts.decrementAndGet(blockId); - readBlockIdCounts.removeIfZero(blockId); + if (readBlockIdCountsEnabled) { + readBlockIdCounts.decrementAndGet(blockId); + readBlockIdCounts.removeIfZero(blockId); + } } @Override // DataNodeMXBean public Map getReadBlockIdCounts() { - return readBlockIdCounts.asMap(); + return readBlockIdCountsEnabled ? readBlockIdCounts.asMap() : null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 174f7242bfbcf9..e54874bb11e66d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6582,4 +6582,11 @@ Enables observer reads for clients. This should only be enabled when clients are using routers. + + dfs.datanode.read.blockid.counts.metric.enabled + false + + The datanode records the frequency switch for reading a certain block, defaults to false. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 2ee4e33a82b34e..66c17ac7796d13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertInverseQuantileGauges; import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; @@ -34,6 +35,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -83,6 +85,8 @@ import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.AtomicLongMap; + @NotThreadSafe public class TestDataNodeMetrics { private static final Logger LOG = @@ -828,9 +832,13 @@ public Boolean get() { @Test public void testReadBlockIdCounts() throws Exception { - try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build()) { + HdfsConfiguration config = new HdfsConfiguration(); + config.setBoolean(DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY, true); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).build()) { cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); + List datanodes = cluster.getDataNodes(); + assertEquals(1, datanodes.size()); final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); final ObjectName mxBeanName = new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo"); @@ -843,40 +851,15 @@ public void testReadBlockIdCounts() throws Exception { output.hsync(); } - // Test ReadBlockIdCounts Metric - ExecutorService executorService = Executors.newFixedThreadPool(100); - for (int i = 0; i < 500; i++) { - executorService.execute(() -> { - StringBuilder res = new StringBuilder(); - try (FSDataInputStream in = fs.open(path);) { - byte[] buffer = new byte[1024]; - int read; - while ((read = in.read(buffer)) > 0) { - res.append(new String(buffer, 0, read)); - } - assertEquals("ReadBlockIdCountsTest", res.toString()); - } catch (IOException e) { - LOG.warn("Test ReadBlockIdCounts Read IO error", e); - } - }); - } - - GenericTestUtils.waitFor(() -> { - try { - Object readBlockIdCountsObj = mBeanServer.getAttribute(mxBeanName, "ReadBlockIdCounts"); - Map readBlockIdCountsMap = - (HashMap) JSON.parse(JSON.toString(readBlockIdCountsObj)); - for (Map.Entry entry : readBlockIdCountsMap.entrySet()) { - return !entry.getKey().isEmpty(); - } - } catch (Exception e) { - throw new RuntimeException(e); + StringBuilder res = new StringBuilder(); + try (FSDataInputStream in = fs.open(path);) { + byte[] buffer = new byte[1024]; + int read; + while ((read = in.read(buffer)) > 0) { + res.append(new String(buffer, 0, read)); } - return false; - }, 100, 10000); - - // mock client exception, directly interrupt the read request - executorService.shutdownNow(); + assertEquals("ReadBlockIdCountsTest", res.toString()); + } GenericTestUtils.waitFor(() -> { try {