Skip to content

Commit

Permalink
update current meta when keeper master switch
Browse files Browse the repository at this point in the history
  • Loading branch information
qifanwang committed Jan 17, 2025
1 parent a1edbe9 commit c1fd4f9
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ protected void makeKeepersOk(Long clusterDbId, Long shardDbId, Pair<String, Inte
currentMetaManager.getClusterRouteByDcId(currentMetaManager.getClusterMeta(clusterDbId).getActiveDc(), clusterDbId),
keyedObjectPool, 1000, 1, scheduled, executors);
try {
currentMetaManager.setKeeperMaster(clusterDbId, shardDbId, newMaster.getKey(), newMaster.getValue(), null);
// 必须先改 meta, 再修改 keepr, 不然可能被 KeeperStateAlignChecker reset 回去。
job.execute().get(waitTimeoutSeconds/2, TimeUnit.SECONDS);
logger.debug("[doRun][set]cluster_{}, shard_{}, {}", clusterDbId, shardDbId, newMaster);
executionLog.info("[makeKeepersOk]success");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error("[makeKeepersOk]" + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ public void updateUpstream(String clusterId, String shardId, String ip, int port
port);
} else {
logger.info("[updateUpstream]{},{},{},{}", clusterId, shardId, ip, port);
currentMetaManager.setKeeperMaster(clusterShard.getKey(), clusterShard.getValue(), ip, port);
currentMetaManager.setKeeperMaster(clusterShard.getKey(), clusterShard.getValue(), ip, port, null);
}
} else {
List<KeeperMeta> keepers = dcMetaCache.getShardKeepers(clusterShard.getKey(), clusterShard.getValue());
if (!CollectionUtils.isEmpty(keepers)) {
logger.info("[hetero][keeper][updateUpstream]{},{},{},{}", clusterId, shardId, ip, port);
currentMetaManager.setKeeperMaster(clusterShard.getKey(), clusterShard.getValue(), ip, port);
currentMetaManager.setKeeperMaster(clusterShard.getKey(), clusterShard.getValue(), ip, port, null);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.StringUtil;

import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -33,6 +34,7 @@ public AbstractKeeperMasterChooser(Long clusterDbId, Long shardDbId, DcMetaCache

@Override
protected void work() {
String dcName = dcMetaCache.getPrimaryDc(clusterDbId, shardDbId);
Pair<String, Integer> keeperMaster = chooseKeeperMaster();
logger.debug("[doRun]cluster_{}, shard_{}, {}", clusterDbId, shardDbId, keeperMaster);
Pair<String, Integer> currentMaster = currentMetaManager.getKeeperMaster(clusterDbId, shardDbId);
Expand All @@ -41,7 +43,7 @@ protected void work() {
return;
}
logger.debug("[doRun][set]cluster_{}, shard_{}, {}", clusterDbId, shardDbId, keeperMaster);
currentMetaManager.setKeeperMaster(clusterDbId, shardDbId, keeperMaster.getKey(), keeperMaster.getValue());
currentMetaManager.setKeeperMaster(clusterDbId, shardDbId, keeperMaster.getKey(), keeperMaster.getValue(), dcName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public interface CurrentMetaManager extends Observable {

void setKeeperMaster(Long clusterDbId, Long shardDbId, String addr);

void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port);
void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port, String expectedPrimaryDc);

void setApplierMasterAndNotify(Long clusterDbId, Long shardDbId, String ip, int port, String sids);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.IpUtils;
import com.ctrip.xpipe.utils.ObjectUtils;
import com.ctrip.xpipe.utils.StringUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -533,9 +534,13 @@ public String getSrcSids(Long clusterDbId, Long shardDbId) {
}

@Override
public void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port) {


public void setKeeperMaster(Long clusterDbId, Long shardDbId, String ip, int port, String expectedPrimaryDc) {
String dcName = dcMetaCache.getPrimaryDc(clusterDbId, shardDbId);
if(expectedPrimaryDc != null && !StringUtil.trimEquals(dcName, expectedPrimaryDc)) {
// 如果 expectedDc 为null, 不进行校验。发生了dr切换,禁止修改。
// 如果任务基于 PrimaryDc 来修改 keeper meta 就要校验检测过程是否 dc 切换。
return;
}
Pair<String, Integer> keeperMaster = new Pair<String, Integer>(ip, port);
if(currentMeta.setKeeperMaster(clusterDbId, shardDbId, keeperMaster)){
logger.info("[setKeeperMaster]cluster_{},shard_{},{}:{}", clusterDbId, shardDbId, ip, port);
Expand Down Expand Up @@ -574,7 +579,7 @@ public void setKeeperMaster(Long clusterDbId, Long shardDbId, String addr) {

logger.info("[setKeeperMaster]cluster_{},shard_{},{}", clusterDbId, shardDbId, addr);
Pair<String, Integer> inetAddr = IpUtils.parseSingleAsPair(addr);
setKeeperMaster(clusterDbId, shardDbId, inetAddr.getKey(), inetAddr.getValue());
setKeeperMaster(clusterDbId, shardDbId, inetAddr.getKey(), inetAddr.getValue(), null);
}

@Override
Expand Down

0 comments on commit c1fd4f9

Please sign in to comment.