Skip to content

Commit

Permalink
HBASE-28513 The StochasticLoadBalancer should support discrete evalua…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
Ray Mattingly committed Dec 14, 2024
1 parent 5f312dd commit e4e6bb5
Show file tree
Hide file tree
Showing 17 changed files with 1,747 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;

/**
* An efficient array based implementation similar to ClusterState for keeping the status of the
* cluster in terms of region assignment and distribution. LoadBalancers, such as
Expand Down Expand Up @@ -705,7 +708,7 @@ enum LocalityType {
RACK
}

public void doAction(BalanceAction action) {
public List<RegionPlan> doAction(BalanceAction action) {
switch (action.getType()) {
case NULL:
break;
Expand All @@ -715,30 +718,29 @@ public void doAction(BalanceAction action) {
AssignRegionAction ar = (AssignRegionAction) action;
regionsPerServer[ar.getServer()] =
addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
regionMoved(ar.getRegion(), -1, ar.getServer());
break;
return ImmutableList.of(regionMoved(ar.getRegion(), -1, ar.getServer()));
case MOVE_REGION:
assert action instanceof MoveRegionAction : action.getClass();
MoveRegionAction mra = (MoveRegionAction) action;
regionsPerServer[mra.getFromServer()] =
removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
regionsPerServer[mra.getToServer()] =
addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
break;
return ImmutableList
.of(regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()));
case SWAP_REGIONS:
assert action instanceof SwapRegionsAction : action.getClass();
SwapRegionsAction a = (SwapRegionsAction) action;
regionsPerServer[a.getFromServer()] =
replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
regionsPerServer[a.getToServer()] =
replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
return ImmutableList.of(regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()),
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()));
default:
throw new RuntimeException("Uknown action:" + action.getType());
throw new RuntimeException("Unknown action:" + action.getType());
}
return Collections.emptyList();
}

/**
Expand Down Expand Up @@ -822,7 +824,7 @@ void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
doAction(new AssignRegionAction(region, server));
}

void regionMoved(int region, int oldServer, int newServer) {
RegionPlan regionMoved(int region, int oldServer, int newServer) {
regionIndexToServerIndex[region] = newServer;
if (initialRegionIndexToServerIndex[region] == newServer) {
numMovedRegions--; // region moved back to original location
Expand Down Expand Up @@ -853,6 +855,11 @@ void regionMoved(int region, int oldServer, int newServer) {
updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
oldServer, newServer, primary, region);
}

// old server name can be null
ServerName oldServerName = oldServer == -1 ? null : servers[oldServer];

return new RegionPlan(regions[region], oldServerName, servers[newServer]);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;

/**
* Balancer conditionals supplement cost functions in the {@link StochasticLoadBalancer}. Cost
* functions are insufficient and difficult to work with when making discrete decisions; this is
* because they operate on a continuous scale, and each cost function's multiplier affects the
* relative importance of every other cost function. So it is difficult to meaningfully and clearly
* value many aspects of your region distribution via cost functions alone. Conditionals allow you
* to very clearly define discrete rules that your balancer would ideally follow. To clarify, a
* conditional violation will not block a region assignment because we would prefer to have uptime
* than have perfectly intentional balance. But conditionals allow you to, for example, define that
* a region's primary and secondary should not live on the same rack. Another example, conditionals
* make it easy to define that system tables will ideally be isolated on their own RegionServer.
*/
@InterfaceAudience.Private
public class BalancerConditionals {

private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionals.class);

protected static BalancerConditionals INSTANCE = new BalancerConditionals();
public static final String ISOLATE_SYSTEM_TABLES_KEY =
"hbase.master.balancer.stochastic.conditionals.isolateSystemTables";
public static final boolean ISOLATE_SYSTEM_TABLES_DEFAULT = false;

public static final String ISOLATE_META_TABLE_KEY =
"hbase.master.balancer.stochastic.conditionals.isolateMetaTable";
public static final boolean ISOLATE_META_TABLE_DEFAULT = false;

public static final String DISTRIBUTE_REPLICAS_CONDITIONALS_KEY =
"hbase.master.balancer.stochastic.conditionals.distributeReplicas";
public static final boolean DISTRIBUTE_REPLICAS_CONDITIONALS_DEFAULT = false;

public static final String ADDITIONAL_CONDITIONALS_KEY =
"hbase.master.balancer.stochastic.additionalConditionals";

// when this count is low, we'll be more likely to trigger a subsequent balancer run
private static final AtomicInteger BALANCE_COUNT_WITHOUT_IMPROVEMENTS = new AtomicInteger(0);
private static final int BALANCE_COUNT_WITHOUT_IMPROVEMENTS_CEILING = 10;

private Set<Class<? extends RegionPlanConditional>> conditionalClasses = Collections.emptySet();
private Set<RegionPlanConditional> conditionals = Collections.emptySet();
private Configuration conf;

private BalancerConditionals() {
}

protected boolean isTableIsolationEnabled() {
return conditionalClasses.contains(SystemTableIsolationConditional.class)
|| conditionalClasses.contains(MetaTableIsolationConditional.class);
}

protected boolean shouldRunBalancer() {
return BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get() < BALANCE_COUNT_WITHOUT_IMPROVEMENTS_CEILING;
}

protected int getConsecutiveBalancesWithoutImprovement() {
return BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get();
}

protected void incConsecutiveBalancesWithoutImprovement() {
if (BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get() == Integer.MAX_VALUE) {
return;
}
this.BALANCE_COUNT_WITHOUT_IMPROVEMENTS.getAndIncrement();
LOG.trace("Set balanceCountWithoutImprovements={}",
this.BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get());
}

protected void resetConsecutiveBalancesWithoutImprovement() {
this.BALANCE_COUNT_WITHOUT_IMPROVEMENTS.set(0);
LOG.trace("Set balanceCountWithoutImprovements=0");
}

protected Set<Class<? extends RegionPlanConditional>> getConditionalClasses() {
return Set.copyOf(conditionalClasses);
}

protected boolean shouldSkipSloppyServerEvaluation() {
return conditionals.stream()
.anyMatch(conditional -> conditional instanceof SystemTableIsolationConditional
|| conditional instanceof MetaTableIsolationConditional);
}

protected void loadConf(Configuration conf) {
this.conf = conf;
ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses =
ImmutableSet.builder();

boolean isolateSystemTables =
conf.getBoolean(ISOLATE_SYSTEM_TABLES_KEY, ISOLATE_SYSTEM_TABLES_DEFAULT);
if (isolateSystemTables) {
conditionalClasses.add(SystemTableIsolationConditional.class);
}

boolean isolateMetaTable = conf.getBoolean(ISOLATE_META_TABLE_KEY, ISOLATE_META_TABLE_DEFAULT);
if (isolateMetaTable) {
conditionalClasses.add(MetaTableIsolationConditional.class);
}

boolean distributeReplicas = conf.getBoolean(DISTRIBUTE_REPLICAS_CONDITIONALS_KEY,
DISTRIBUTE_REPLICAS_CONDITIONALS_DEFAULT);
if (distributeReplicas) {
conditionalClasses.add(DistributeReplicasConditional.class);
}

Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
for (Class<?> clazz : classes) {
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName());
continue;
}
conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class));
}
this.conditionalClasses = conditionalClasses.build();
}

protected void loadClusterState(BalancerClusterState cluster) {
conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, conf, cluster))
.collect(Collectors.toSet());
}

protected int getConditionalViolationChange(List<RegionPlan> regionPlans) {
if (conditionals.isEmpty()) {
incConsecutiveBalancesWithoutImprovement();
return 0;
}
int conditionalViolationChange = 0;
for (RegionPlan regionPlan : regionPlans) {
conditionalViolationChange += getConditionalViolationChange(conditionals, regionPlan);
}
return conditionalViolationChange;
}

private static int getConditionalViolationChange(Set<RegionPlanConditional> conditionals,
RegionPlan regionPlan) {
RegionPlan inverseRegionPlan = new RegionPlan(regionPlan.getRegionInfo(),
regionPlan.getDestination(), regionPlan.getSource());
int currentConditionalViolationCount =
getConditionalViolationCount(conditionals, inverseRegionPlan);
int newConditionalViolationCount = getConditionalViolationCount(conditionals, regionPlan);
int violationChange = newConditionalViolationCount - currentConditionalViolationCount;
if (violationChange < 0) {
LOG.trace("Should move region {}_{} from {} to {}", regionPlan.getRegionName(),
regionPlan.getRegionInfo().getReplicaId(), regionPlan.getSource().getServerName(),
regionPlan.getDestination().getServerName());
}
return violationChange;
}

private static int getConditionalViolationCount(Set<RegionPlanConditional> conditionals,
RegionPlan regionPlan) {
int regionPlanConditionalViolationCount = 0;
for (RegionPlanConditional regionPlanConditional : conditionals) {
if (regionPlanConditional.isViolating(regionPlan)) {
regionPlanConditionalViolationCount++;
}
}
return regionPlanConditionalViolationCount;
}

private RegionPlanConditional createConditional(Class<? extends RegionPlanConditional> clazz,
Configuration conf, BalancerClusterState cluster) {
try {
Constructor<? extends RegionPlanConditional> ctor =
clazz.getDeclaredConstructor(Configuration.class, BalancerClusterState.class);
return ReflectionUtils.instantiate(clazz.getName(), ctor, conf, cluster);
} catch (NoSuchMethodException e) {
LOG.warn(
"Cannot find constructor with Configuration and BalancerClusterState parameters for class '{}': {}",
clazz.getName(), e.getMessage());
}
return null;
}
}
Loading

0 comments on commit e4e6bb5

Please sign in to comment.