Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Dec 31, 2024
1 parent e5ab24c commit 8ac0c7a
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -778,11 +778,12 @@ public List<RegionPlan> doAction(BalanceAction action) {
List<RegionPlan> mbRegionPlans = new ArrayList<>();
for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
regionsPerServer[serverIndex] =
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
}
for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
Set<Integer> regionsToAdd = mba.getServerToRegionsToRemove().get(serverIndex);
addRegions(regionsPerServer[serverIndex], regionsToAdd);
regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
}
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
mbRegionPlans.add(regionMoved(moveRegionAction.getRegion(),
Expand Down Expand Up @@ -978,8 +979,8 @@ int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {

// If the newIndex is smaller than newSize, some regions were missing from the input array
if (newIndex != newSize) {
throw new IllegalStateException(
"Region indices mismatch: some regions in the removal set were not found in the regions array");
throw new IllegalStateException("Region indices mismatch: some regions in the removal "
+ "set were not found in the regions array");
}

return newRegions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand All @@ -47,7 +48,7 @@
* make it easy to define that system tables will ideally be isolated on their own RegionServer.
*/
@InterfaceAudience.Private
public final class BalancerConditionals {
final class BalancerConditionals implements Configurable {

private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionals.class);

Expand Down Expand Up @@ -109,40 +110,6 @@ void clearConditionalWeightCaches() {
.flatMap(Optional::stream).forEach(RegionPlanConditionalCandidateGenerator::clearWeightCache);
}

void loadConf(Configuration conf) {
this.conf = conf;
ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses =
ImmutableSet.builder();

boolean isolateSystemTables =
conf.getBoolean(ISOLATE_SYSTEM_TABLES_KEY, ISOLATE_SYSTEM_TABLES_DEFAULT);
if (isolateSystemTables) {
conditionalClasses.add(SystemTableIsolationConditional.class);
}

boolean isolateMetaTable = conf.getBoolean(ISOLATE_META_TABLE_KEY, ISOLATE_META_TABLE_DEFAULT);
if (isolateMetaTable) {
conditionalClasses.add(MetaTableIsolationConditional.class);
}

boolean distributeReplicas = conf.getBoolean(DISTRIBUTE_REPLICAS_CONDITIONALS_KEY,
DISTRIBUTE_REPLICAS_CONDITIONALS_DEFAULT);
if (distributeReplicas) {
conditionalClasses.add(DistributeReplicasConditional.class);
}

Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
for (Class<?> clazz : classes) {
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
continue;
}
conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
}
this.conditionalClasses = conditionalClasses.build();
loadClusterState(null);
}

void loadClusterState(BalancerClusterState cluster) {
conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, conf, cluster))
.filter(Objects::nonNull).collect(Collectors.toSet());
Expand Down Expand Up @@ -216,4 +183,44 @@ private RegionPlanConditional createConditional(Class<? extends RegionPlanCondit
private boolean isConditionalBalancingEnabled() {
return !conditionalClasses.isEmpty();
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses =
ImmutableSet.builder();

boolean isolateSystemTables =
conf.getBoolean(ISOLATE_SYSTEM_TABLES_KEY, ISOLATE_SYSTEM_TABLES_DEFAULT);
if (isolateSystemTables) {
conditionalClasses.add(SystemTableIsolationConditional.class);
}

boolean isolateMetaTable = conf.getBoolean(ISOLATE_META_TABLE_KEY, ISOLATE_META_TABLE_DEFAULT);
if (isolateMetaTable) {
conditionalClasses.add(MetaTableIsolationConditional.class);
}

boolean distributeReplicas = conf.getBoolean(DISTRIBUTE_REPLICAS_CONDITIONALS_KEY,
DISTRIBUTE_REPLICAS_CONDITIONALS_DEFAULT);
if (distributeReplicas) {
conditionalClasses.add(DistributeReplicasConditional.class);
}

Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
for (Class<?> clazz : classes) {
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
continue;
}
conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
}
this.conditionalClasses = conditionalClasses.build();
loadClusterState(null);
}

@Override
public Configuration getConf() {
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* CandidateGenerator to distribute colocated replicas across different servers.
*/
@InterfaceAudience.Private
class DistributeReplicasCandidateGenerator extends RegionPlanConditionalCandidateGenerator {
final class DistributeReplicasCandidateGenerator extends RegionPlanConditionalCandidateGenerator {

static DistributeReplicasCandidateGenerator INSTANCE = new DistributeReplicasCandidateGenerator();

Expand Down Expand Up @@ -94,6 +94,7 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
if (isForced) {
return possibleAction;
} else if (willBeAccepted(cluster, possibleAction)) {
cluster.doAction(possibleAction); // Update cluster state to reflect move
moveRegionActions.add(possibleAction);
break;
}
Expand All @@ -112,7 +113,9 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
}

if (!moveRegionActions.isEmpty()) {
return new MoveBatchAction(moveRegionActions);
MoveBatchAction batchAction = new MoveBatchAction(moveRegionActions);
undoBatchAction(cluster, batchAction); // Reset cluster state to before batch
return batchAction;
}
// If no colocated replicas are found, return NULL_ACTION
if (foundColocatedReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -109,8 +110,8 @@ private boolean checkViolation(ReplicaKey movingReplicaKey, Set<RegionInfo> dest

/**
* This is necessary because it would be too expensive to use
* {@link org.apache.hadoop.hbase.client.RegionReplicaUtil#isReplicasForSameRegion(RegionInfo, RegionInfo)}
* for every combo of regions.
* {@link RegionReplicaUtil#isReplicasForSameRegion(RegionInfo, RegionInfo)} for every combo of
* regions.
*/
static class ReplicaKey {
private final Pair<ByteArrayWrapper, ByteArrayWrapper> startAndStopKeys;
Expand All @@ -122,8 +123,12 @@ static class ReplicaKey {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ReplicaKey)) return false;
if (this == o) {
return true;
}
if (!(o instanceof ReplicaKey)) {
return false;
}
ReplicaKey other = (ReplicaKey) o;
return this.startAndStopKeys.equals(other.startAndStopKeys);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void setFavoredNodesManager(FavoredNodesManager fnm) {
/** Returns any candidate generator in random */
@Override
protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
return candidateGenerators.values().stream().toList()
.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class MetaTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {
public final class MetaTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {

static MetaTableIsolationCandidateGenerator INSTANCE = new MetaTableIsolationCandidateGenerator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.hadoop.hbase.master.balancer;

import java.util.List;
import org.apache.curator.shaded.com.google.common.collect.HashMultimap;
import org.apache.curator.shaded.com.google.common.collect.Multimaps;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;

@InterfaceAudience.Private
public class MoveBatchAction extends BalanceAction {
private final List<MoveRegionAction> moveActions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RegionPlanConditional { // todo rename to BalancerConditional
public abstract class RegionPlanConditional {
private static final Logger LOG = LoggerFactory.getLogger(RegionPlanConditional.class);
private BalancerClusterState cluster;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.master.balancer;

import java.time.Duration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RegionPlanConditionalCandidateGenerator extends CandidateGenerator {

private static final Logger LOG =
Expand All @@ -46,6 +50,13 @@ boolean willBeAccepted(BalancerClusterState cluster, BalanceAction action) {
return !BalancerConditionals.INSTANCE.isViolating(cluster, action);
}

void undoBatchAction(BalancerClusterState cluster, MoveBatchAction batchAction) {
for (int i = batchAction.getMoveActions().size() - 1; i >= 0; i--) {
MoveRegionAction action = batchAction.getMoveActions().get(i);
cluster.doAction(action.undoAction());
}
}

void clearWeightCache() {
lastWeighedAt = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
private Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators;

public enum GeneratorType {
RANDOM,
LOAD,
LOCALITY,
RACK,
SYSTEM_TABLE_ISOLATION,
META_TABLE_ISOLATION,
}

private final BalancerConditionals balancerConditionals = BalancerConditionals.INSTANCE;

/**
Expand Down Expand Up @@ -270,7 +261,7 @@ protected void loadConf(Configuration conf) {
rackLocalityCost = new RackLocalityCostFunction(conf);

// Order is important here. We need to construct conditionals to load candidate generators
balancerConditionals.loadConf(conf);
balancerConditionals.setConf(conf);
this.candidateGenerators = createCandidateGenerators();

regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
Expand Down Expand Up @@ -466,20 +457,25 @@ protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
}

double rand = ThreadLocalRandom.current().nextDouble();
Class<? extends CandidateGenerator> greatestWeightClazz = null;
CandidateGenerator greatestWeightGenerator = null;
double greatestWeight = 0;
for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
for (Map.Entry<Class<? extends CandidateGenerator>,
CandidateGenerator> entry : candidateGenerators.entrySet()) {
Class<? extends CandidateGenerator> clazz = entry.getKey();
double generatorWeight = weightsOfGenerators.get(clazz);
if (generatorWeight > greatestWeight) {
greatestWeight = generatorWeight;
greatestWeightClazz = clazz;
greatestWeightGenerator = entry.getValue();
}
if (rand <= generatorWeight) {
return candidateGenerators.get(clazz);
return entry.getValue();
}
}

return candidateGenerators.getOrDefault(greatestWeightClazz, new RandomCandidateGenerator());
if (greatestWeightGenerator != null) {
return greatestWeightGenerator;
}
return candidateGenerators.values().stream().findAny().orElseThrow();
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand All @@ -497,6 +493,7 @@ private long calculateMaxSteps(BalancerClusterState cluster) {
* approach the optimal state given enough steps.
*/
@Override
@SuppressWarnings("checkstyle:MethodLength")
protected List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
// On clusters with lots of HFileLinks or lots of reference files,
Expand Down Expand Up @@ -636,7 +633,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
StringBuilder logMessage = new StringBuilder("CandidateGenerator activity summary:\n");
generatorToStepCount.forEach((generator, count) -> {
long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
logMessage.append(String.format(" - %s: %d steps, %d approvals\n", generator.getSimpleName(),
logMessage.append(String.format(" - %s: %d steps, %d approvals%n", generator.getSimpleName(),
count, approvals));
});
// Log the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class SystemTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {
public final class SystemTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {

static SystemTableIsolationCandidateGenerator INSTANCE =
new SystemTableIsolationCandidateGenerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
if (isWeighing) {
return possibleMove;
}
cluster.doAction(possibleMove); // Update cluster state to reflect move
moves.add(possibleMove);
break;
}
Expand All @@ -86,7 +87,9 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
if (moves.isEmpty()) {
return BalanceAction.NULL_ACTION;
} else {
return new MoveBatchAction(moves);
MoveBatchAction batchAction = new MoveBatchAction(moves);
undoBatchAction(cluster, batchAction); // Reset cluster state to before batch action
return batchAction;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public final class CandidateGeneratorTestUtil {

private static final Logger LOG = LoggerFactory.getLogger(CandidateGeneratorTestUtil.class);

private CandidateGeneratorTestUtil() {
}

static void runBalancerToExhaustion(Configuration conf,
Map<ServerName, List<RegionInfo>> serverToRegions,
Set<Function<BalancerClusterState, Boolean>> expectations) {
Expand Down Expand Up @@ -144,9 +147,8 @@ static void printClusterDistribution(BalancerClusterState cluster, long actionsT
}

/**
* Partitions the given serverToRegions map by table, returning a structure of Map<TableName,
* Map<ServerName, List<RegionInfo>>>. The tables are derived from the RegionInfo objects found in
* serverToRegions.
* Partitions the given serverToRegions map by table The tables are derived from the RegionInfo
* objects found in serverToRegions.
* @param serverToRegions The map of servers to their assigned regions.
* @return A map of tables to their server-to-region assignments.
*/
Expand Down
Loading

0 comments on commit 8ac0c7a

Please sign in to comment.