Skip to content

Commit

Permalink
HDFS-17294. Reconfigure the scheduling cycle of the slowPeerCollector…
Browse files Browse the repository at this point in the history
…Daemon thread. (apache#6366)

Signed-off-by: Takanobu Asanuma <[email protected]>
  • Loading branch information
huangzhaobo99 authored Dec 19, 2023
1 parent 62cc673 commit 95ea31f
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public class DatanodeManager {
private SlowPeerTracker slowPeerTracker;
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
private Daemon slowPeerCollectorDaemon;
private final long slowPeerCollectionInterval;
private volatile long slowPeerCollectionInterval;
private volatile int maxSlowPeerReportNodes;

@Nullable
Expand Down Expand Up @@ -408,7 +408,7 @@ public void run() {
LOG.info("Slow peers collection thread start.");
}

public void stopSlowPeerCollector() {
private void stopSlowPeerCollector() {
LOG.info("Slow peers collection thread shutdown");
if (slowPeerCollectorDaemon == null) {
return;
Expand All @@ -424,6 +424,17 @@ public void stopSlowPeerCollector() {
}
}

public void restartSlowPeerCollector(long interval) {
Preconditions.checkNotNull(slowPeerCollectorDaemon,
"slowPeerCollectorDaemon thread is null, not support restart");
stopSlowPeerCollector();
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
this.slowPeerCollectionInterval = interval;
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
startSlowPeerCollector();
}
}

private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
Expand Down Expand Up @@ -2289,4 +2300,9 @@ public void setMaxSlowPeersToReport(int maxSlowPeersToReport) {
public boolean isSlowPeerCollectorInitialized() {
return slowPeerCollectorDaemon == null;
}

@VisibleForTesting
public long getSlowPeerCollectionInterval() {
return slowPeerCollectionInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT;
Expand Down Expand Up @@ -380,7 +382,8 @@ public enum OperationCategory {
IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY,
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY));
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY));

private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
Expand Down Expand Up @@ -2374,7 +2377,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) || (property.equals(
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) || (property.equals(
DFS_DATANODE_PEER_STATS_ENABLED_KEY)) || property.equals(
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY)) {
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY) || property.equals(
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY)) {
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
Expand Down Expand Up @@ -2673,6 +2677,24 @@ String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
datanodeManager.setMaxSlowPeersToReport(maxSlowPeersToReport);
break;
}
case DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY: {
if (newVal == null) {
// set to the value of the current system or default
long defaultInterval =
getConf().getTimeDuration(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
datanodeManager.restartSlowPeerCollector(defaultInterval);
result = Long.toString(defaultInterval);
} else {
// set to other value
long newInterval =
getConf().getTimeDurationHelper(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
newVal, TimeUnit.MILLISECONDS);
datanodeManager.restartSlowPeerCollector(newInterval);
result = newVal;
}
break;
}
default: {
throw new IllegalArgumentException(
"Unexpected property " + property + " in reconfigureSlowNodesParameters");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.junit.Assert.*;

Expand Down Expand Up @@ -822,6 +823,45 @@ public void testReconfigureFSNamesystemLockMetricsParameters()
}
}

@Test
public void testReconfigureSlowPeerCollectInterval() throws Exception {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager =
nameNode.namesystem.getBlockManager().getDatanodeManager();

assertFalse("SlowNode tracker is already enabled. It should be disabled by default",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertTrue(datanodeManager.isSlowPeerCollectorInitialized());

try {
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "10m");
} catch (NullPointerException e) {
assertEquals("slowPeerCollectorDaemon thread is null, not support restart", e.getMessage());
}

nameNode.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "True");
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
assertEquals(1800000, datanodeManager.getSlowPeerCollectionInterval());

try {
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "non-numeric");
} catch (ReconfigurationException e) {
assertEquals("Could not change property dfs.namenode.slowpeer.collect.interval from "
+ "'30m' to 'non-numeric'", e.getMessage());
}

nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "10m");
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
assertEquals(600000, datanodeManager.getSlowPeerCollectionInterval());

nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, null);
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
// set to the value of the current system
assertEquals(600000, datanodeManager.getSlowPeerCollectionInterval());
}

@After
public void shutDown() throws IOException {
if (cluster != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(28, outs.size());
assertEquals(29, outs.size());
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
Expand Down

0 comments on commit 95ea31f

Please sign in to comment.