Skip to content

Commit 6943d7d

Browse files
author
qifanwang
committed
call batchMarkInstance with dc instance cnt
1 parent ae22a14 commit 6943d7d

File tree

10 files changed

+67
-5
lines changed

10 files changed

+67
-5
lines changed

core/src/main/java/com/ctrip/xpipe/api/migration/OuterClientService.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -732,13 +732,17 @@ class MarkInstanceRequest{
732732
private String clusterName;
733733
private String activeDc;
734734

735+
private Map<String, Integer> instanceCnt;
736+
735737
public MarkInstanceRequest() {
736738
}
737739

738-
public MarkInstanceRequest(Set<HostPortDcStatus> hostPortDcStatuses, String clusterName, String activeDc) {
740+
public MarkInstanceRequest(Set<HostPortDcStatus> hostPortDcStatuses, String clusterName,
741+
String activeDc, Map<String, Integer> instanceCnt) {
739742
this.hostPortDcStatuses = hostPortDcStatuses;
740743
this.clusterName = clusterName;
741744
this.activeDc = activeDc;
745+
this.instanceCnt = instanceCnt;
742746
}
743747

744748
public Set<HostPortDcStatus> getHostPortDcStatuses() {
@@ -765,6 +769,14 @@ public void setActiveDc(String activeDc) {
765769
this.activeDc = activeDc;
766770
}
767771

772+
public Map<String, Integer> getInstanceCnt() {
773+
return instanceCnt;
774+
}
775+
776+
public void setInstanceCnt(Map<String, Integer> instanceCnt) {
777+
this.instanceCnt = instanceCnt;
778+
}
779+
768780
@Override
769781
public String toString() {
770782
return String.format("[%s:%s]%s", clusterName, activeDc, hostPortDcStatuses);

core/src/main/java/com/ctrip/xpipe/migration/DefaultOuterClientService.java

+8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.ctrip.xpipe.endpoint.ClusterShardHostPort;
55
import com.ctrip.xpipe.endpoint.HostPort;
66
import com.ctrip.xpipe.utils.DateTimeUtils;
7+
import com.ctrip.xpipe.utils.VisibleForTesting;
78
import com.google.common.collect.Lists;
89

910
import java.net.InetSocketAddress;
@@ -19,6 +20,8 @@ public class DefaultOuterClientService extends AbstractOuterClientService {
1920

2021
private Map<HostPort, Boolean> instanceStatus = new ConcurrentHashMap<>();
2122

23+
private Map<String, Integer> cntMap = new ConcurrentHashMap<>();
24+
2225
@Override
2326
public void markInstanceUp(ClusterShardHostPort clusterShardHostPort) throws OuterClientException {
2427
logger.info("[markInstanceUp]{}", clusterShardHostPort);
@@ -131,6 +134,7 @@ public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws Ou
131134
for (HostPortDcStatus hostPortDcStatus : markInstanceRequest.getHostPortDcStatuses()) {
132135
instanceStatus.put(new HostPort(hostPortDcStatus.getHost(), hostPortDcStatus.getPort()), hostPortDcStatus.isCanRead());
133136
}
137+
this.cntMap = markInstanceRequest.getInstanceCnt();
134138
}
135139

136140
@Override
@@ -140,4 +144,8 @@ public OuterClientDataResp<List<ClusterExcludedIdcInfo>> getAllExcludedIdcs() th
140144
resp.setResult(Collections.emptyList());
141145
return resp;
142146
}
147+
148+
public Map<String, Integer> getCntMap() {
149+
return cntMap;
150+
}
143151
}

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@
1212
import com.ctrip.xpipe.redis.checker.alert.AlertManager;
1313
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
1414
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder;
15+
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
16+
import com.ctrip.xpipe.redis.core.entity.ShardMeta;
1517
import com.ctrip.xpipe.redis.core.meta.MetaCache;
18+
import com.ctrip.xpipe.redis.core.meta.XpipeMetaManager;
1619
import com.ctrip.xpipe.utils.XpipeThreadFactory;
1720
import com.google.common.annotations.VisibleForTesting;
1821
import org.slf4j.Logger;
1922
import org.slf4j.LoggerFactory;
2023
import org.springframework.beans.factory.annotation.Autowired;
2124
import org.springframework.stereotype.Component;
25+
import com.ctrip.xpipe.redis.core.entity.DcMeta;
2226

2327
import javax.annotation.PostConstruct;
2428
import java.util.*;
@@ -87,7 +91,8 @@ public Set<HostPortDcStatus> getNeedAdjustInstances(String cluster, Set<HostPort
8791
@Override
8892
public void doMarkInstances(String clusterName, String activeDc, Set<HostPortDcStatus> instances) throws OuterClientException {
8993
alertMarkInstance(clusterName, instances);
90-
MarkInstanceRequest markInstanceRequest = new MarkInstanceRequest(instances, clusterName, activeDc);
94+
Map<String, Integer> dcInstancesCnt = metaCache.getClusterCntMap(clusterName);
95+
MarkInstanceRequest markInstanceRequest = new MarkInstanceRequest(instances, clusterName, activeDc, dcInstancesCnt);
9196
outerClientService.batchMarkInstance(markInstanceRequest);
9297
}
9398

@@ -170,4 +175,9 @@ protected void setExecutors(Executor executors) {
170175
this.executors = executors;
171176
}
172177

178+
@VisibleForTesting
179+
protected void setMetaCache(MetaCache metaCache) {
180+
this.metaCache = metaCache;
181+
}
182+
173183
}

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/TestMetaCache.java

+5
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,9 @@ public Set<String> getAllShardNamesByClusterName(String clusterName) {
227227
return Collections.emptySet();
228228
}
229229

230+
@Override
231+
public Map<String, Integer> getClusterCntMap(String clusterName) {
232+
return Collections.emptyMap();
233+
}
234+
230235
}

redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullServiceTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -141,5 +141,4 @@ public void testQuorumWithLastMark() throws Exception {
141141
Assert.assertEquals("jq", status.getDc());
142142
Assert.assertTrue(status.isCanRead());
143143
}
144-
145144
}

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/AbstractMetaCache.java

+18
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,23 @@ public Set<String> getAllShardNamesByClusterName(String clusterName) {
703703
return shards;
704704
}
705705

706+
@Override
707+
public Map<String, Integer> getClusterCntMap(String clusterName) {
708+
Map<String, Integer> clusterCntMap = new HashMap<>();
709+
for (DcMeta dcMeta : meta.getKey().getDcs().values()) {
710+
ClusterMeta clusterMeta = dcMeta.getClusters().get(clusterName);
711+
if(clusterMeta == null) {
712+
continue;
713+
}
714+
int cnt = 0;
715+
for(ShardMeta shardMeta : clusterMeta.getShards().values()) {
716+
cnt += shardMeta.getRedises().size();
717+
}
718+
clusterCntMap.put(dcMeta.getId(), cnt);
719+
}
720+
return clusterCntMap;
721+
}
722+
706723
@VisibleForTesting
707724
public AbstractMetaCache setMeta(Pair<XpipeMeta, XpipeMetaManager> meta) {
708725
this.meta = meta;
@@ -714,4 +731,5 @@ public AbstractMetaCache setMonitor2ClusterShard(Map<String, Triple<String, Stri
714731
this.monitor2ClusterShard = monitorMap;
715732
return this;
716733
}
734+
717735
}

redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCacheTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,14 @@ public void checkClustersCntTest() {
356356
}
357357
}
358358

359+
@Test
360+
public void testClusterDnCnt() {
361+
Map<String, Integer> cntMap = metaCache.getClusterCntMap("cluster1");
362+
Assert.assertEquals(cntMap.size(), 2);
363+
Assert.assertEquals(cntMap.get("jq").intValue(), 3);
364+
Assert.assertEquals(cntMap.get("oy").intValue(), 2);
365+
}
366+
359367
protected String getXpipeMetaConfigFile() {
360368
return "dc-meta-test.xml";
361369
}

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaCache.java

+2
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,6 @@ public interface MetaCache {
9898
boolean isAsymmetricCluster(String clusterName);
9999

100100
Set<String> getAllShardNamesByClusterName(String clusterName);
101+
102+
Map<String, Integer> getClusterCntMap(String clusterName);
101103
}

redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RordbReplicationSupportTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public void endWriteRdb() {
198198
}
199199
});
200200
psync.execute();
201-
future.get(5, TimeUnit.SECONDS);
201+
future.get(10, TimeUnit.SECONDS);
202202

203203
return psync;
204204
}

redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisHalfRdbServerFail.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void redisFailKeeperRestartDumpNewRdb() throws Exception {
6767
SimplePsyncObserver simplePsyncObserver = new SimplePsyncObserver();
6868
InMemoryPsync inMemoryPsync = sendInmemoryPsync("localhost", redisKeeperServer.getListeningPort(), simplePsyncObserver);
6969
//wait
70-
simplePsyncObserver.getOnline().get(6000, TimeUnit.MILLISECONDS);
70+
simplePsyncObserver.getOnline().get(8000, TimeUnit.MILLISECONDS);
7171
//wait for commands
7272
sleep(1000);
7373

0 commit comments

Comments
 (0)