Skip to content

Commit

Permalink
Add switch and optimize UT
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhaobo99 committed Jan 30, 2024
1 parent c21d0d7 commit 42aa99c
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_PIPELINE_SLOWNODE_ENABLED_DEFAULT = false;
public static final String DFS_PIPELINE_CONGESTION_RATIO = "dfs.pipeline.congestion.ratio";
public static final double DFS_PIPELINE_CONGESTION_RATIO_DEFAULT = 1.5;
public static final String DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY =
"dfs.datanode.read.blockid.counts.metric.enabled";
public static final boolean DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_DEFAULT = false;

// Key Provider Cache Expiry
public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private static final int NUM_CORES = Runtime.getRuntime()
.availableProcessors();
private final double congestionRatio;
private final boolean readBlockIdCountsEnabled;
private DiskBalancer diskBalancer;
private DataSetLockManager dataSetLockManager;

Expand Down Expand Up @@ -494,7 +495,7 @@ private static Tracer createTracer(Configuration conf) {
private DataTransferThrottler ecReconstuctReadThrottler;
private DataTransferThrottler ecReconstuctWriteThrottler;

private AtomicLongMap<String> readBlockIdCounts;
private final AtomicLongMap<String> readBlockIdCounts;

/**
* Creates a dummy DataNode for testing purpose.
Expand Down Expand Up @@ -525,6 +526,10 @@ private static Tracer createTracer(Configuration conf) {
DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT);
this.congestionRatio = congestionRationTmp > 0 ?
congestionRationTmp : DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT;
this.readBlockIdCountsEnabled =
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_DEFAULT);
readBlockIdCounts = readBlockIdCountsEnabled ? AtomicLongMap.create() : null;
}

/**
Expand Down Expand Up @@ -628,7 +633,10 @@ public Map<String, Long> load(String key) {
DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT);
this.congestionRatio = congestionRationTmp > 0 ?
congestionRationTmp : DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT;
readBlockIdCounts = AtomicLongMap.create();
this.readBlockIdCountsEnabled =
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_DEFAULT);
this.readBlockIdCounts = readBlockIdCountsEnabled ? AtomicLongMap.create() : null;
}

@Override // ReconfigurableBase
Expand Down Expand Up @@ -4393,16 +4401,20 @@ public BlockPoolManager getBlockPoolManager() {
}

public void incrReadBlockIdCounts(String blockId) {
readBlockIdCounts.incrementAndGet(blockId);
if (readBlockIdCountsEnabled) {
readBlockIdCounts.incrementAndGet(blockId);
}
}

public void decrReadBlockIdCounts(String blockId) {
readBlockIdCounts.decrementAndGet(blockId);
readBlockIdCounts.removeIfZero(blockId);
if (readBlockIdCountsEnabled) {
readBlockIdCounts.decrementAndGet(blockId);
readBlockIdCounts.removeIfZero(blockId);
}
}

@Override // DataNodeMXBean
public Map<String, Long> getReadBlockIdCounts() {
return readBlockIdCounts.asMap();
return readBlockIdCountsEnabled ? readBlockIdCounts.asMap() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6582,4 +6582,11 @@
Enables observer reads for clients. This should only be enabled when clients are using routers.
</description>
</property>
<property>
<name>dfs.datanode.read.blockid.counts.metric.enabled</name>
<value>false</value>
<description>
The datanode records the frequency switch for reading a certain block, defaults to false.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertInverseQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
Expand All @@ -34,6 +35,7 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -83,6 +85,8 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.AtomicLongMap;

@NotThreadSafe
public class TestDataNodeMetrics {
private static final Logger LOG =
Expand Down Expand Up @@ -828,9 +832,13 @@ public Boolean get() {

@Test
public void testReadBlockIdCounts() throws Exception {
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build()) {
HdfsConfiguration config = new HdfsConfiguration();
config.setBoolean(DFS_DATANODE_READ_BLOCKID_COUNTS_METRIC_ENABLED_KEY, true);
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).build()) {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(1, datanodes.size());

final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName mxBeanName = new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
Expand All @@ -843,40 +851,15 @@ public void testReadBlockIdCounts() throws Exception {
output.hsync();
}

// Test ReadBlockIdCounts Metric
ExecutorService executorService = Executors.newFixedThreadPool(100);
for (int i = 0; i < 500; i++) {
executorService.execute(() -> {
StringBuilder res = new StringBuilder();
try (FSDataInputStream in = fs.open(path);) {
byte[] buffer = new byte[1024];
int read;
while ((read = in.read(buffer)) > 0) {
res.append(new String(buffer, 0, read));
}
assertEquals("ReadBlockIdCountsTest", res.toString());
} catch (IOException e) {
LOG.warn("Test ReadBlockIdCounts Read IO error", e);
}
});
}

GenericTestUtils.waitFor(() -> {
try {
Object readBlockIdCountsObj = mBeanServer.getAttribute(mxBeanName, "ReadBlockIdCounts");
Map<String, Long> readBlockIdCountsMap =
(HashMap) JSON.parse(JSON.toString(readBlockIdCountsObj));
for (Map.Entry<String, Long> entry : readBlockIdCountsMap.entrySet()) {
return !entry.getKey().isEmpty();
}
} catch (Exception e) {
throw new RuntimeException(e);
StringBuilder res = new StringBuilder();
try (FSDataInputStream in = fs.open(path);) {
byte[] buffer = new byte[1024];
int read;
while ((read = in.read(buffer)) > 0) {
res.append(new String(buffer, 0, read));
}
return false;
}, 100, 10000);

// mock client exception, directly interrupt the read request
executorService.shutdownNow();
assertEquals("ReadBlockIdCountsTest", res.toString());
}

GenericTestUtils.waitFor(() -> {
try {
Expand Down

0 comments on commit 42aa99c

Please sign in to comment.