Skip to content

Commit

Permalink
Ensure replica generators are used when appropriate. Speed up many ba…
Browse files Browse the repository at this point in the history
…lancer tests
  • Loading branch information
Ray Mattingly committed Jan 4, 2025
1 parent 46b30be commit 1cd2c34
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntCounterMap;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
Expand All @@ -41,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Supplier;
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;

/**
Expand Down Expand Up @@ -126,6 +129,14 @@ class BalancerClusterState {
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;

private Supplier<List<Integer>> shuffledServerIndicesSupplier =
Suppliers.memoizeWithExpiration(() -> {
Collection<Integer> serverIndices = serversToIndex.values();
List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
Collections.shuffle(shuffledServerIndices);
return shuffledServerIndices;
}, 5, TimeUnit.SECONDS);

static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {
Expand Down Expand Up @@ -782,7 +793,7 @@ public List<RegionPlan> doAction(BalanceAction action) {
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
}
for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
Set<Integer> regionsToAdd = mba.getServerToRegionsToRemove().get(serverIndex);
Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex);
regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
}
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
Expand Down Expand Up @@ -1100,6 +1111,10 @@ void setNumMovedRegions(int numMovedRegions) {
this.numMovedRegions = numMovedRegions;
}

List<Integer> getShuffledServerIndices() {
return shuffledServerIndicesSupplier.get();
}

@Override
public String toString() {
StringBuilder desc = new StringBuilder("Cluster={servers=[");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import static java.util.Collections.shuffle;
import static org.apache.hadoop.hbase.master.balancer.DistributeReplicasConditional.getReplicaKey;

import java.util.ArrayList;
Expand Down Expand Up @@ -57,18 +56,10 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing

BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing,
boolean isForced) {
// Shuffle server indices to add some randomness to the moves
// Should we cache these shuffled servers? Doesn't seem necessary at the moment
List<Integer> shuffledServerIndices = new ArrayList<>(cluster.numServers);
for (int i = 0; i < cluster.servers.length; i++) {
shuffledServerIndices.add(i);
}
shuffle(shuffledServerIndices);

// Iterate through each server to find colocated replicas
// Iterate through shuffled servers to find colocated replicas
boolean foundColocatedReplicas = false;
List<MoveRegionAction> moveRegionActions = new ArrayList<>();
for (int sourceIndex : shuffledServerIndices) {
for (int sourceIndex : cluster.getShuffledServerIndices()) {
int[] serverRegions = cluster.regionsPerServer[sourceIndex];
Set<DistributeReplicasConditional.ReplicaKey> replicaKeys =
new HashSet<>(serverRegions.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,25 @@ private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {

// Check host
regionReplicaHostCostFunction.prepare(c);
boolean colocatedAtHost =
(Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON);
double hostCost = Math.abs(regionReplicaHostCostFunction.cost());
boolean colocatedAtHost = hostCost > CostFunction.COST_EPSILON;
if (colocatedAtHost) {
return true;
}
LOG.trace("No host colocation detected with host cost={}", hostCost);

// Check rack
regionReplicaRackCostFunction.prepare(c);
return (Math.abs(regionReplicaRackCostFunction.cost()) > CostFunction.COST_EPSILON);
double rackCost = Math.abs(regionReplicaRackCostFunction.cost());
boolean colocatedAtRack =
(Math.abs(regionReplicaRackCostFunction.cost()) > CostFunction.COST_EPSILON);
if (colocatedAtRack) {
return true;
}
LOG.trace("No rack colocation detected with rack cost={}", rackCost);

return DistributeReplicasCandidateGenerator.INSTANCE.generateCandidate(c, true)
!= BalanceAction.NULL_ACTION;
}

private String getBalanceReason(double total, double sumMultiplier) {
Expand Down Expand Up @@ -467,13 +477,21 @@ protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
}
}

double rand = ThreadLocalRandom.current().nextDouble();
if (
areSomeRegionReplicasColocated(cluster)
&& candidateGenerators.containsKey(RegionReplicaCandidateGenerator.class)
!balancerConditionals.isReplicaDistributionEnabled()
&& areSomeRegionReplicasColocated(cluster)
) {
// If we aren't use conditional replica distribution, then we should prioritize the
// region replica cost functions' candidate generator
return candidateGenerators.get(RegionReplicaCandidateGenerator.class);
// If we aren't use conditional replica distribution, then we should ensure that
// the region replica cost functions' candidate generators are used often
if (rand < 0.25 && candidateGenerators.containsKey(RegionReplicaCandidateGenerator.class)) {
return candidateGenerators.get(RegionReplicaCandidateGenerator.class);
}
if (
rand < 0.5 && candidateGenerators.containsKey(RegionReplicaRackCandidateGenerator.class)
) {
return candidateGenerators.get(RegionReplicaRackCandidateGenerator.class);
}
}

List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
Expand All @@ -500,7 +518,6 @@ protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
}

// Generate a random number and pick the first generator whose partial sum is >= rand
double rand = ThreadLocalRandom.current().nextDouble();
for (int i = 0; i < partialSums.size(); i++) {
if (rand <= partialSums.get(i)) {
return candidateGenerators.get(generatorClasses.get(i));
Expand Down Expand Up @@ -639,7 +656,16 @@ protected List<RegionPlan> balanceTable(TableName tableName,
}

// Change state and evaluate costs
cluster.doAction(action);
try {
cluster.doAction(action);
} catch (IllegalStateException | ArrayIndexOutOfBoundsException e) {
LOG.warn(
"Generator {} produced invalid action! "
+ "Debug your candidate generator as this is likely a bug, "
+ "and may cause a balancer deadlock. {}",
generator.getClass().getSimpleName(), action, e);
continue;
}
updateCostsAndWeightsWithAction(cluster, action);
newCost = computeCost(cluster, currentCost);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
}

List<MoveRegionAction> moves = new ArrayList<>();
for (int serverIdx = 0; serverIdx < cluster.numServers; serverIdx++) {
for (int serverIdx : cluster.getShuffledServerIndices()) {
boolean hasRegionsToIsolate = false;
Set<Integer> regionsToMove = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ protected List<ServerAndLoad> convertToList(final Map<ServerName, List<RegionInf
}

protected String printMock(List<ServerAndLoad> balancedCluster) {
if (balancedCluster == null) {
return "null";
}
NavigableSet<ServerAndLoad> sorted = new TreeSet<>(balancedCluster);
ServerAndLoad[] arr = sorted.toArray(new ServerAndLoad[sorted.size()]);
StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends StochasticBalancer
*/
@Test
public void testBalanceCluster() throws Exception {
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testLargeCluster() {
int numRegionsPerServer = 80; // all servers except one
int numTables = 100;
int replication = 1;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30_000);
loadBalancer.onConfigurationChange(conf);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public void testMidCluster() {
int numRegionsPerServer = 60; // all servers except one
int replication = 1;
int numTables = 40;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}

@Test
Expand All @@ -50,7 +53,9 @@ public void testMidCluster2() {
int numTables = 400;
// num large num regions means may not always get to best balance with one run
boolean assertFullyBalanced = false;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables,
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
assertFullyBalanced, false);
}

Expand All @@ -61,7 +66,10 @@ public void testMidCluster3() {
int numRegionsPerServer = 9; // all servers except one
int replication = 1;
int numTables = 110;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
// TODO(eclark): Make sure that the tables are well distributed.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ public void testRegionReplicasOnSmallCluster() {
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 80; // all regions are mostly balanced
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}

private static class ForTestRackManagerOne extends RackManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class TestStochasticLoadBalancerRegionReplicaHighReplication
@Test
public void testRegionReplicasOnMidClusterHighReplication() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
int numNodes = 40;
int numRegions = 6 * numNodes;
int replication = 40; // 40 replicas per region, one for each server
int numRegionsPerServer = 5;
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ public void testRegionReplicasOnLargeCluster() {
// ignore these two cost functions to allow us to make any move that helps other functions.
conf.setFloat("hbase.master.balancer.stochastic.moveCost", 0f);
conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0f);
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
int numNodes = 1000;
int numRegions = 20 * numNodes; // 20 * replication regions per RS
int numRegionsPerServer = 19; // all servers except one
int numTables = 100;
int replication = 3;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ public class TestStochasticLoadBalancerRegionReplicaMidCluster extends Stochasti

@Test
public void testRegionReplicasOnMidCluster() {
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
int numNodes = 200;
int numRegions = 40 * 200;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 30; // all regions are mostly balanced
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ public class TestStochasticLoadBalancerRegionReplicaReplicationGreaterThanNumNod

@Test
public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() {
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
int numNodes = 40;
int numRegions = 6 * 50;
int replication = 50; // 50 replicas per region, more than numNodes
int numRegionsPerServer = 6;
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, false);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class TestStochasticLoadBalancerRegionReplicaSameHosts extends Stochastic
@Test
public void testRegionReplicationOnMidClusterSameHosts() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
int numHosts = 30;
int numRegions = 30 * 30;
Expand All @@ -62,6 +62,6 @@ public void testRegionReplicationOnMidClusterSameHosts() {
}
}

testWithCluster(newServerMap, null, true, true);
testWithClusterWithIteration(newServerMap, null, true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public void testSmallCluster() {
int numRegionsPerServer = 40; // all servers except one
int replication = 1;
int numTables = 10;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}

Expand All @@ -48,6 +50,8 @@ public void testSmallCluster2() {
int numRegionsPerServer = 40; // all servers except one
int replication = 1;
int numTables = 10;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}

Expand All @@ -58,6 +62,8 @@ public void testSmallCluster3() {
int numRegionsPerServer = 1; // all servers except one
int replication = 1;
int numTables = 10;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10_000);
loadBalancer.onConfigurationChange(conf);
// fails because of max moves
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false,
false);
Expand Down

0 comments on commit 1cd2c34

Please sign in to comment.