Skip to content

Commit

Permalink
Way more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Dec 13, 2024
1 parent aafaa1c commit e1283f4
Show file tree
Hide file tree
Showing 15 changed files with 730 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.RegionPlan;
Expand Down Expand Up @@ -62,16 +63,46 @@ public class BalancerConditionals {
public static final String ADDITIONAL_CONDITIONALS_KEY =
"hbase.master.balancer.stochastic.additionalConditionals";

// when this count is low, we'll be more likely to trigger a subsequent balancer run
private static final AtomicInteger BALANCE_COUNT_WITHOUT_IMPROVEMENTS = new AtomicInteger(0);
private static final int BALANCE_COUNT_WITHOUT_IMPROVEMENTS_CEILING = 10;

private Set<Class<? extends RegionPlanConditional>> conditionalClasses = Collections.emptySet();
private Set<RegionPlanConditional> conditionals = Collections.emptySet();

private int lastViolationCount = 0;
private Configuration conf;

private BalancerConditionals() {
}

protected int getLastViolationCount() {
return lastViolationCount;
protected boolean isTableIsolationEnabled() {
return conditionalClasses.contains(SystemTableIsolationConditional.class)
|| conditionalClasses.contains(MetaTableIsolationConditional.class);
}

protected boolean shouldRunBalancer() {
return BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get() < BALANCE_COUNT_WITHOUT_IMPROVEMENTS_CEILING;
}

protected int getConsecutiveBalancesWithoutImprovement() {
return BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get();
}

protected void incConsecutiveBalancesWithoutImprovement() {
if (BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get() == Integer.MAX_VALUE) {
return;
}
this.BALANCE_COUNT_WITHOUT_IMPROVEMENTS.getAndIncrement();
LOG.trace("Set balanceCountWithoutImprovements={}",
this.BALANCE_COUNT_WITHOUT_IMPROVEMENTS.get());
}

protected void resetConsecutiveBalancesWithoutImprovement() {
this.BALANCE_COUNT_WITHOUT_IMPROVEMENTS.set(0);
LOG.trace("Set balanceCountWithoutImprovements=0");
}

protected Set<Class<? extends RegionPlanConditional>> getConditionalClasses() {
return Set.copyOf(conditionalClasses);
}

protected boolean shouldSkipSloppyServerEvaluation() {
Expand All @@ -81,6 +112,7 @@ protected boolean shouldSkipSloppyServerEvaluation() {
}

protected void loadConf(Configuration conf) {
this.conf = conf;
ImmutableSet.Builder<Class<? extends RegionPlanConditional>> conditionalClasses =
ImmutableSet.builder();

Expand Down Expand Up @@ -112,20 +144,21 @@ protected void loadConf(Configuration conf) {
this.conditionalClasses = conditionalClasses.build();
}

protected void loadConditionals(BalancerClusterState cluster) {
conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, cluster))
protected void loadClusterState(BalancerClusterState cluster) {
conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, conf, cluster))
.collect(Collectors.toSet());
}

protected int getConditionalViolationChange(List<RegionPlan> regionPlans) {
if (conditionals.isEmpty()) {
lastViolationCount = 0;
incConsecutiveBalancesWithoutImprovement();
return 0;
}
int violations = regionPlans.stream()
.mapToInt(regionPlan -> getConditionalViolationChange(conditionals, regionPlan)).sum();
lastViolationCount = violations;
return violations;
int conditionalViolationChange = 0;
for (RegionPlan regionPlan : regionPlans) {
conditionalViolationChange += getConditionalViolationChange(conditionals, regionPlan);
}
return conditionalViolationChange;
}

private static int getConditionalViolationChange(Set<RegionPlanConditional> conditionals,
Expand All @@ -135,7 +168,13 @@ private static int getConditionalViolationChange(Set<RegionPlanConditional> cond
int currentConditionalViolationCount =
getConditionalViolationCount(conditionals, inverseRegionPlan);
int newConditionalViolationCount = getConditionalViolationCount(conditionals, regionPlan);
return newConditionalViolationCount - currentConditionalViolationCount;
int violationChange = newConditionalViolationCount - currentConditionalViolationCount;
if (violationChange < 0) {
LOG.trace("Should move region {}_{} from {} to {}", regionPlan.getRegionName(),
regionPlan.getRegionInfo().getReplicaId(), regionPlan.getSource().getServerName(),
regionPlan.getDestination().getServerName());
}
return violationChange;
}

private static int getConditionalViolationCount(Set<RegionPlanConditional> conditionals,
Expand All @@ -150,13 +189,14 @@ private static int getConditionalViolationCount(Set<RegionPlanConditional> condi
}

private RegionPlanConditional createConditional(Class<? extends RegionPlanConditional> clazz,
BalancerClusterState cluster) {
Configuration conf, BalancerClusterState cluster) {
try {
Constructor<? extends RegionPlanConditional> ctor =
clazz.getDeclaredConstructor(BalancerClusterState.class);
return ReflectionUtils.instantiate(clazz.getName(), ctor, cluster);
clazz.getDeclaredConstructor(Configuration.class, BalancerClusterState.class);
return ReflectionUtils.instantiate(clazz.getName(), ctor, conf, cluster);
} catch (NoSuchMethodException e) {
LOG.warn("Cannot find constructor with BalancerClusterState parameter for class '{}': {}",
LOG.warn(
"Cannot find constructor with Configuration and BalancerClusterState parameters for class '{}': {}",
clazz.getName(), e.getMessage());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -28,12 +31,19 @@
*/
public class DistributeReplicasConditional extends RegionPlanConditional {

/**
* Local mini cluster tests can only one on one server/rack by design. If enabled, this will
* pretend that localhost RegionServer threads are actually running on separate hosts/racks. This
* should only be used in unit tests.
*/
public static boolean TEST_MODE_ENABLED = false;

private static final Logger LOG = LoggerFactory.getLogger(DistributeReplicasConditional.class);

private final BalancerClusterState cluster;

public DistributeReplicasConditional(BalancerClusterState cluster) {
super(cluster);
public DistributeReplicasConditional(Configuration conf, BalancerClusterState cluster) {
super(conf, cluster);
this.cluster = cluster;
}

Expand Down Expand Up @@ -63,15 +73,17 @@ boolean isViolating(RegionPlan regionPlan) {
}

if (
checkViolation(destinationServerIndex, cluster.serversPerHost, cluster.serverIndexToHostIndex,
cluster.regionsPerServer, primaryRegionIndex, cluster.regionIndexToPrimaryIndex, "host")
checkViolation(cluster.regions, regionPlan.getRegionInfo(), destinationServerIndex,
cluster.serversPerHost, cluster.serverIndexToHostIndex, cluster.regionsPerServer,
primaryRegionIndex, "host")
) {
return true;
}

if (
checkViolation(destinationServerIndex, cluster.serversPerRack, cluster.serverIndexToRackIndex,
cluster.regionsPerServer, primaryRegionIndex, cluster.regionIndexToPrimaryIndex, "rack")
checkViolation(cluster.regions, regionPlan.getRegionInfo(), destinationServerIndex,
cluster.serversPerRack, cluster.serverIndexToRackIndex, cluster.regionsPerServer,
primaryRegionIndex, "rack")
) {
return true;
}
Expand All @@ -89,23 +101,61 @@ boolean isViolating(RegionPlan regionPlan) {
* @param locationType Type of location being checked ("Host" or "Rack").
* @return True if a violation is found, false otherwise.
*/
static boolean checkViolation(int destinationServerIndex, int[][] serversPerLocation,
int[] serverToLocationIndex, int[][] regionsPerServer, int primaryRegionIndex,
int[] regionIndexToPrimaryIndex, String locationType) {
if (serversPerLocation == null || serversPerLocation.length <= 1) {
LOG.debug("{} violation check skipped: serversPerLocation is null or has <= 1 location",
locationType);
static boolean checkViolation(RegionInfo[] regions, RegionInfo regionToBeMoved,
int destinationServerIndex, int[][] serversPerLocation, int[] serverToLocationIndex,
int[][] regionsPerServer, int primaryRegionIndex, String locationType) {

if (TEST_MODE_ENABLED) {
// Take the flat serversPerLocation, like {0: [0, 1, 2, 3, 4]}
// and pretend it is multi-location, like {0: [1], 1: [2] ...}
int numServers = serversPerLocation[0].length;
// Create a new serversPerLocation array where each server gets its own "location"
int[][] simulatedServersPerLocation = new int[numServers][];
for (int i = 0; i < numServers; i++) {
simulatedServersPerLocation[i] = new int[] { serversPerLocation[0][i] };
}
// Adjust serverToLocationIndex to map each server to its simulated location
int[] simulatedServerToLocationIndex = new int[numServers];
for (int i = 0; i < numServers; i++) {
simulatedServerToLocationIndex[serversPerLocation[0][i]] = i;
}
LOG.trace("Test mode enabled: Simulated {} locations for servers.", numServers);
// Use the simulated arrays for test mode
serversPerLocation = simulatedServersPerLocation;
serverToLocationIndex = simulatedServerToLocationIndex;
}

if (serversPerLocation == null) {
LOG.trace("{} violation check skipped: serversPerLocation is null", locationType);
return false;
}

if (serversPerLocation.length == 1) {
LOG.warn(
"{} violation inevitable: serversPerLocation has only 1 entry. You probably should not be using read replicas.",
locationType);
return true;
}

int destinationLocationIndex = serverToLocationIndex[destinationServerIndex];
LOG.debug("Checking {} violations for destination server index {} at location index {}",
LOG.trace("Checking {} violations for destination server index {} at location index {}",
locationType, destinationServerIndex, destinationLocationIndex);

// For every RegionServer on host/rack
for (int serverIndex : serversPerLocation[destinationLocationIndex]) {
// For every Region on RegionServer
for (int hostedRegion : regionsPerServer[serverIndex]) {
if (regionIndexToPrimaryIndex[hostedRegion] == primaryRegionIndex) {
LOG.debug("{} violation detected: region {} on {} {}", locationType, primaryRegionIndex,
RegionInfo targetRegion = regions[hostedRegion];
if (targetRegion.getEncodedName().equals(regionToBeMoved.getEncodedName())) {
// The balancer state will already show this region as having moved.
// A region's replicas will also have unique encoded names.
// So we should skip this check if the encoded name is the same.
continue;
}
boolean isReplicaForSameRegion =
RegionReplicaUtil.isReplicasForSameRegion(targetRegion, regionToBeMoved);
if (isReplicaForSameRegion) {
LOG.trace("{} violation detected: region {} on {} {}", locationType, primaryRegionIndex,
locationType, destinationLocationIndex);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.client.RegionInfo;

public class MetaTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {
@Override
boolean shouldBeIsolated(RegionInfo regionInfo) {
return regionInfo.isMetaRegion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -30,13 +31,11 @@
*/
class MetaTableIsolationConditional extends RegionPlanConditional {

private final Set<ServerName> emptyServers;
private final Set<ServerName> serversHostingMeta;
private final Set<ServerName> emptyServers = new HashSet<>();
private final Set<ServerName> serversHostingMeta = new HashSet<>();

public MetaTableIsolationConditional(BalancerClusterState cluster) {
super(cluster);
this.emptyServers = new HashSet<>();
this.serversHostingMeta = new HashSet<>();
public MetaTableIsolationConditional(Configuration conf, BalancerClusterState cluster) {
super(conf, cluster);

for (int i = 0; i < cluster.servers.length; i++) {
ServerName server = cluster.servers[i];
Expand Down Expand Up @@ -76,7 +75,6 @@ protected static boolean checkViolation(RegionPlan regionPlan, Set<ServerName> s
Set<ServerName> emptyServers) {
boolean isMeta = regionPlan.getRegionInfo().getTable().equals(TableName.META_TABLE_NAME);
ServerName destination = regionPlan.getDestination();

if (isMeta) {
// meta must go to an empty server or a server already hosting meta
return !(serversHostingMeta.contains(destination) || emptyServers.contains(destination));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.RegionPlan;

public abstract class RegionPlanConditional {
public RegionPlanConditional(BalancerClusterState cluster) {
public RegionPlanConditional(Configuration conf, BalancerClusterState cluster) {
}

abstract boolean isViolating(RegionPlan regionPlan);
Expand Down
Loading

0 comments on commit e1283f4

Please sign in to comment.