Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Jan 2, 2025
1 parent e5ab24c commit b9c9c5a
Show file tree
Hide file tree
Showing 22 changed files with 145 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ enum Type {
ASSIGN_REGION,
MOVE_REGION,
SWAP_REGIONS,
ISOLATE_TABLE,
MOVE_BATCH,
NULL,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,11 +778,12 @@ public List<RegionPlan> doAction(BalanceAction action) {
List<RegionPlan> mbRegionPlans = new ArrayList<>();
for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
regionsPerServer[serverIndex] =
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
}
for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
Set<Integer> regionsToAdd = mba.getServerToRegionsToRemove().get(serverIndex);
addRegions(regionsPerServer[serverIndex], regionsToAdd);
regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
}
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
mbRegionPlans.add(regionMoved(moveRegionAction.getRegion(),
Expand Down Expand Up @@ -978,8 +979,8 @@ int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {

// If the newIndex is smaller than newSize, some regions were missing from the input array
if (newIndex != newSize) {
throw new IllegalStateException(
"Region indices mismatch: some regions in the removal set were not found in the regions array");
throw new IllegalStateException("Region indices mismatch: some regions in the removal "
+ "set were not found in the regions array");
}

return newRegions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand All @@ -47,7 +48,7 @@
* make it easy to define that system tables will ideally be isolated on their own RegionServer.
*/
@InterfaceAudience.Private
public final class BalancerConditionals {
final class BalancerConditionals implements Configurable {

private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionals.class);

Expand Down Expand Up @@ -109,40 +110,6 @@ void clearConditionalWeightCaches() {
.flatMap(Optional::stream).forEach(RegionPlanConditionalCandidateGenerator::clearWeightCache);
}

void loadConf(Configuration conf) {
this.conf = conf;
ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses =
ImmutableSet.builder();

boolean isolateSystemTables =
conf.getBoolean(ISOLATE_SYSTEM_TABLES_KEY, ISOLATE_SYSTEM_TABLES_DEFAULT);
if (isolateSystemTables) {
conditionalClasses.add(SystemTableIsolationConditional.class);
}

boolean isolateMetaTable = conf.getBoolean(ISOLATE_META_TABLE_KEY, ISOLATE_META_TABLE_DEFAULT);
if (isolateMetaTable) {
conditionalClasses.add(MetaTableIsolationConditional.class);
}

boolean distributeReplicas = conf.getBoolean(DISTRIBUTE_REPLICAS_CONDITIONALS_KEY,
DISTRIBUTE_REPLICAS_CONDITIONALS_DEFAULT);
if (distributeReplicas) {
conditionalClasses.add(DistributeReplicasConditional.class);
}

Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
for (Class<?> clazz : classes) {
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
continue;
}
conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
}
this.conditionalClasses = conditionalClasses.build();
loadClusterState(null);
}

void loadClusterState(BalancerClusterState cluster) {
conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, conf, cluster))
.filter(Objects::nonNull).collect(Collectors.toSet());
Expand Down Expand Up @@ -185,7 +152,7 @@ boolean isViolating(BalancerClusterState cluster, BalanceAction action) {
return false;
}

boolean isViolating(RegionPlan regionPlan) {
private boolean isViolating(RegionPlan regionPlan) {
for (RegionPlanConditional conditional : conditionals) {
if (conditional.isViolating(regionPlan)) {
return true;
Expand Down Expand Up @@ -216,4 +183,44 @@ private RegionPlanConditional createConditional(Class<? extends RegionPlanCondit
private boolean isConditionalBalancingEnabled() {
return !conditionalClasses.isEmpty();
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses =
ImmutableSet.builder();

boolean isolateSystemTables =
conf.getBoolean(ISOLATE_SYSTEM_TABLES_KEY, ISOLATE_SYSTEM_TABLES_DEFAULT);
if (isolateSystemTables) {
conditionalClasses.add(SystemTableIsolationConditional.class);
}

boolean isolateMetaTable = conf.getBoolean(ISOLATE_META_TABLE_KEY, ISOLATE_META_TABLE_DEFAULT);
if (isolateMetaTable) {
conditionalClasses.add(MetaTableIsolationConditional.class);
}

boolean distributeReplicas = conf.getBoolean(DISTRIBUTE_REPLICAS_CONDITIONALS_KEY,
DISTRIBUTE_REPLICAS_CONDITIONALS_DEFAULT);
if (distributeReplicas) {
conditionalClasses.add(DistributeReplicasConditional.class);
}

Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
for (Class<?> clazz : classes) {
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
continue;
}
conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
}
this.conditionalClasses = conditionalClasses.build();
loadClusterState(null);
}

@Override
public Configuration getConf() {
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public synchronized void loadConf(Configuration configuration) {
createCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
new HashMap<>(2);
candidateGenerators.put(CacheAwareCandidateGenerator.class,
candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
new CacheAwareSkewnessCandidateGenerator());
candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator());
return candidateGenerators;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* CandidateGenerator to distribute colocated replicas across different servers.
*/
@InterfaceAudience.Private
class DistributeReplicasCandidateGenerator extends RegionPlanConditionalCandidateGenerator {
final class DistributeReplicasCandidateGenerator extends RegionPlanConditionalCandidateGenerator {

static DistributeReplicasCandidateGenerator INSTANCE = new DistributeReplicasCandidateGenerator();

Expand Down Expand Up @@ -94,6 +94,7 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
if (isForced) {
return possibleAction;
} else if (willBeAccepted(cluster, possibleAction)) {
cluster.doAction(possibleAction); // Update cluster state to reflect move
moveRegionActions.add(possibleAction);
break;
}
Expand All @@ -112,7 +113,9 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
}

if (!moveRegionActions.isEmpty()) {
return new MoveBatchAction(moveRegionActions);
MoveBatchAction batchAction = new MoveBatchAction(moveRegionActions);
undoBatchAction(cluster, batchAction); // Reset cluster state to before batch
return batchAction;
}
// If no colocated replicas are found, return NULL_ACTION
if (foundColocatedReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -85,21 +86,28 @@ Optional<RegionPlanConditionalCandidateGenerator> getCandidateGenerator() {

@Override
boolean isViolatingServer(RegionPlan regionPlan, Set<RegionInfo> serverRegions) {
return checkViolation(getReplicaKey(regionPlan.getRegionInfo()), serverRegions);
return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()),
serverRegions);
}

@Override
boolean isViolatingHost(RegionPlan regionPlan, Set<RegionInfo> hostRegions) {
return checkViolation(getReplicaKey(regionPlan.getRegionInfo()), hostRegions);
return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()),
hostRegions);
}

@Override
boolean isViolatingRack(RegionPlan regionPlan, Set<RegionInfo> rackRegions) {
return checkViolation(getReplicaKey(regionPlan.getRegionInfo()), rackRegions);
return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()),
rackRegions);
}

private boolean checkViolation(ReplicaKey movingReplicaKey, Set<RegionInfo> destinationRegions) {
private boolean checkViolation(RegionInfo movingRegion, ReplicaKey movingReplicaKey,
Set<RegionInfo> destinationRegions) {
for (RegionInfo regionInfo : destinationRegions) {
if (regionInfo.equals(movingRegion)) {
continue;
}
if (getReplicaKey(regionInfo).equals(movingReplicaKey)) {
return true;
}
Expand All @@ -109,8 +117,8 @@ private boolean checkViolation(ReplicaKey movingReplicaKey, Set<RegionInfo> dest

/**
* This is necessary because it would be too expensive to use
* {@link org.apache.hadoop.hbase.client.RegionReplicaUtil#isReplicasForSameRegion(RegionInfo, RegionInfo)}
* for every combo of regions.
* {@link RegionReplicaUtil#isReplicasForSameRegion(RegionInfo, RegionInfo)} for every combo of
* regions.
*/
static class ReplicaKey {
private final Pair<ByteArrayWrapper, ByteArrayWrapper> startAndStopKeys;
Expand All @@ -122,8 +130,12 @@ static class ReplicaKey {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ReplicaKey)) return false;
if (this == o) {
return true;
}
if (!(o instanceof ReplicaKey)) {
return false;
}
ReplicaKey other = (ReplicaKey) o;
return this.startAndStopKeys.equals(other.startAndStopKeys);
}
Expand All @@ -143,8 +155,12 @@ static class ByteArrayWrapper {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ByteArrayWrapper)) return false;
if (this == o) {
return true;
}
if (!(o instanceof ByteArrayWrapper)) {
return false;
}
ByteArrayWrapper other = (ByteArrayWrapper) o;
return Arrays.equals(this.bytes, other.bytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void setFavoredNodesManager(FavoredNodesManager fnm) {
/** Returns any candidate generator in random */
@Override
protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
return candidateGenerators.values().stream().toList()
.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class MetaTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {
public final class MetaTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {

static MetaTableIsolationCandidateGenerator INSTANCE = new MetaTableIsolationCandidateGenerator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.hadoop.hbase.master.balancer;

import java.util.List;
import org.apache.curator.shaded.com.google.common.collect.HashMultimap;
import org.apache.curator.shaded.com.google.common.collect.Multimaps;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;

@InterfaceAudience.Private
public class MoveBatchAction extends BalanceAction {
private final List<MoveRegionAction> moveActions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RegionPlanConditional { // todo rename to BalancerConditional
public abstract class RegionPlanConditional {
private static final Logger LOG = LoggerFactory.getLogger(RegionPlanConditional.class);
private BalancerClusterState cluster;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.master.balancer;

import java.time.Duration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RegionPlanConditionalCandidateGenerator extends CandidateGenerator {

private static final Logger LOG =
Expand All @@ -46,6 +50,13 @@ boolean willBeAccepted(BalancerClusterState cluster, BalanceAction action) {
return !BalancerConditionals.INSTANCE.isViolating(cluster, action);
}

void undoBatchAction(BalancerClusterState cluster, MoveBatchAction batchAction) {
for (int i = batchAction.getMoveActions().size() - 1; i >= 0; i--) {
MoveRegionAction action = batchAction.getMoveActions().get(i);
cluster.doAction(action.undoAction());
}
}

void clearWeightCache() {
lastWeighedAt = -1;
}
Expand Down
Loading

0 comments on commit b9c9c5a

Please sign in to comment.