diff --git a/src/main/java/com/aliyun/hitsdb/client/Config.java b/src/main/java/com/aliyun/hitsdb/client/Config.java index bb944771..3efd6a09 100644 --- a/src/main/java/com/aliyun/hitsdb/client/Config.java +++ b/src/main/java/com/aliyun/hitsdb/client/Config.java @@ -2,14 +2,9 @@ import com.aliyun.hitsdb.client.callback.AbstractBatchPutCallback; import com.aliyun.hitsdb.client.callback.AbstractMultiFieldBatchPutCallback; -import com.aliyun.hitsdb.client.exception.http.HttpClientInitException; import com.aliyun.hitsdb.client.http.Host; -import java.io.*; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; public interface Config { String BASICTYPE = "Basic"; // it should be "Basic" according to RFC 2617 diff --git a/src/main/java/com/aliyun/hitsdb/client/HAPolicy.java b/src/main/java/com/aliyun/hitsdb/client/HAPolicy.java index 92f38772..b24bbf86 100644 --- a/src/main/java/com/aliyun/hitsdb/client/HAPolicy.java +++ b/src/main/java/com/aliyun/hitsdb/client/HAPolicy.java @@ -10,7 +10,10 @@ public class HAPolicy { private Pair secondaryCluster; private RetryRule retryRule = RetryRule.SecondaryPreferred; - private int queryRetryTimes = 0; + private int queryRetryTimes; + private boolean haSwitch; + private int checkCount; + private int intervalSeconds; /* * TODO: support queryRetryInterval */ @@ -61,6 +64,19 @@ public Builder setRetryTimes(int retryTimes) { return this; } + public Builder setHASwitch(int checkCount, int intervalSeconds) { + if (checkCount < 1) { + throw new IllegalArgumentException("checkCount must greater or equal than 1"); + } + if (intervalSeconds < 1) { + throw new IllegalArgumentException("intervalSeconds must greater or equal than 1 second"); + } + policy.haSwitch = true; + policy.checkCount = checkCount; + policy.intervalSeconds = intervalSeconds; + return this; + } + public HAPolicy build() { return policy; } @@ -86,6 +102,18 @@ public long getQueryRetryInterval() { return queryRetryInterval; } + public boolean isHaSwitch() { + return haSwitch; + } + + public int getCheckCount() { + return checkCount; + } + + public int getIntervalSeconds() { + return intervalSeconds; + } + public static class QueryContext { private HAPolicy haPolicy; private int retryTimes = 0; @@ -136,4 +164,39 @@ public HttpClient getClient() { } } } + + public static class HAContext { + private HAPolicy haPolicy; + private HttpClient primaryClient; + private HttpClient secondaryClient; + private int retryTimes = 0; + + public HAContext(HAPolicy haPolicy, HttpClient primaryClient, HttpClient secondaryClient) { + this.haPolicy = haPolicy; + this.primaryClient = primaryClient; + this.secondaryClient = secondaryClient; + } + + protected void addRetryTimes() { + retryTimes++; + if (retryTimes > haPolicy.getCheckCount()) { + HttpClient temp = secondaryClient; + secondaryClient = primaryClient; + primaryClient = temp; + retryTimes = 0; + } + } + + protected void resetRetryTimes() { + retryTimes = 0; + } + + public HttpClient getPrimaryClient() { + return primaryClient; + } + + public HttpClient getSecondaryClient() { + return secondaryClient; + } + } } diff --git a/src/main/java/com/aliyun/hitsdb/client/TSDBClient.java b/src/main/java/com/aliyun/hitsdb/client/TSDBClient.java index 3f3d3ba1..4ff439a6 100644 --- a/src/main/java/com/aliyun/hitsdb/client/TSDBClient.java +++ b/src/main/java/com/aliyun/hitsdb/client/TSDBClient.java @@ -18,6 +18,8 @@ import com.aliyun.hitsdb.client.http.response.ResultResponse; import com.aliyun.hitsdb.client.queue.DataQueue; import com.aliyun.hitsdb.client.queue.DataQueueFactory; +import com.aliyun.hitsdb.client.util.HealthManager; +import com.aliyun.hitsdb.client.util.HealthWatcher; import com.aliyun.hitsdb.client.util.LinkedHashMapUtils; import com.aliyun.hitsdb.client.value.request.UniqueUtil; import com.aliyun.hitsdb.client.value.JSONValue; @@ -51,11 +53,13 @@ public class TSDBClient implements TSDB { private final Consumer consumer; private final HttpResponseCallbackFactory httpResponseCallbackFactory; protected final boolean httpCompress; - protected final HttpClient httpclient; - private final HttpClient secondaryClient; + protected HttpClient httpclient; + private HttpClient secondaryClient; + private HealthManager healthManager; private RateLimiter rateLimiter; private final Config config; private static Field queryDeleteField; + private HAPolicy.HAContext haContext; static { try { @@ -110,6 +114,15 @@ public TSDBClient(Config config) throws HttpClientInitException { } LOGGER.info("The tsdb client has started."); + if (config.getHAPolicy() != null && config.getHAPolicy().isHaSwitch()) { + // start ha switch monitor + this.healthManager = new HealthManager(); + this.healthManager.setIntervalSeconds(config.getHAPolicy().getIntervalSeconds()); + this.healthManager.start(); + this.haContext = new HAPolicy.HAContext(config.getHAPolicy(), httpclient, secondaryClient); + this.healthManager.watch(httpclient.getHost() + ":" + httpclient.getPort(), healthWatcher); + } + try { this.checkConnection(); } catch (Exception e) { @@ -129,6 +142,25 @@ public TSDBClient(Config config) throws HttpClientInitException { } } + private final HealthWatcher healthWatcher = new HealthWatcher() { + @Override + public void health(String host, boolean health) { + // 健康则直接返回 + if (!health) { + haContext.addRetryTimes(); + HttpClient prevClient = httpclient; + httpclient = haContext.getPrimaryClient(); + secondaryClient = haContext.getSecondaryClient(); + if (prevClient != httpclient) { + healthManager.unWatchAll(); + healthManager.watch(httpclient.getHost() + ":" + httpclient.getPort(), healthWatcher); + } + } else { + haContext.resetRetryTimes(); + } + } + }; + private static final String EMPTY_HOLDER = new JSONObject().toJSONString(); private static final String VIP_API = "/api/vip_health"; diff --git a/src/main/java/com/aliyun/hitsdb/client/http/HttpAPI.java b/src/main/java/com/aliyun/hitsdb/client/http/HttpAPI.java index 2bd5fee9..b1088a21 100644 --- a/src/main/java/com/aliyun/hitsdb/client/http/HttpAPI.java +++ b/src/main/java/com/aliyun/hitsdb/client/http/HttpAPI.java @@ -18,6 +18,8 @@ public interface HttpAPI { String DELETE_ALL_TABLE = "/api/delete_all_table"; + String HEALTH = "/api/health"; + String VIP_HEALTH = "/api/vip_health"; /** diff --git a/src/main/java/com/aliyun/hitsdb/client/util/HealthManager.java b/src/main/java/com/aliyun/hitsdb/client/util/HealthManager.java index 8d99280f..8123fc3e 100644 --- a/src/main/java/com/aliyun/hitsdb/client/util/HealthManager.java +++ b/src/main/java/com/aliyun/hitsdb/client/util/HealthManager.java @@ -4,11 +4,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.File; -import java.io.InputStreamReader; -import java.net.URL; -import java.net.URLConnection; import java.util.Map; import java.util.concurrent.*; @@ -52,6 +47,10 @@ public void unWatch(final String host) { this.watchers.remove(host); } + public void unWatchAll() { + this.watchers.clear(); + } + private final class WatchRunnable implements Runnable { @@ -79,7 +78,7 @@ public boolean healthCheck(String host) { if (!host.startsWith("http")) { host = "http://" + host; } - String url = host + HttpAPI.VIP_HEALTH; + String url = host + HttpAPI.HEALTH; if (LOG.isDebugEnabled()) { LOG.debug("start to check {} ", host); } diff --git a/src/test/java/com/aliyun/hitsdb/client/TestClientHA.java b/src/test/java/com/aliyun/hitsdb/client/TestClientHA.java new file mode 100644 index 00000000..0eca9d99 --- /dev/null +++ b/src/test/java/com/aliyun/hitsdb/client/TestClientHA.java @@ -0,0 +1,255 @@ +package com.aliyun.hitsdb.client; + +import com.aliyun.hitsdb.client.value.request.*; +import com.aliyun.hitsdb.client.value.response.LastDataValue; +import com.aliyun.hitsdb.client.value.response.MultiFieldQueryLastResult; +import com.aliyun.hitsdb.client.value.response.MultiFieldQueryResult; +import com.aliyun.hitsdb.client.value.response.QueryResult; +import com.aliyun.hitsdb.client.value.type.Aggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * Created by yunxing on 2022/2/25. + */ +public class TestClientHA { + /** + * @author yunxing + * @description Test case for 4 query HA policy under HA switch : + * SecondaryPreferred, PrimaryPreferred, Primary and Secondary. + * Attention: We must start up 2 cluster before testing, and follow the instruction during testing + * @throws InterruptedException + */ + @Test + public void testHASwitch() throws InterruptedException { + final String metric = "TestClientQueryHA"; + final String tagName = "_id"; + final String tagValue = "tagv"; + final String mainIp = "127.0.0.1"; + final int mainPort = 3002; + final String secondaryIp = "127.0.0.1"; + final int secondaryPort = 3003; + int checkInterval = 5; + int checkCount = 1; + HashMap tags = new HashMap() {{ put(tagName, tagValue);}}; + + TSDBConfig config = TSDBConfig.address(mainIp, mainPort).config(); + TSDBClient tsdb = new TSDBClient(config); + + TSDBConfig config2 = TSDBConfig.address(secondaryIp, secondaryPort).config(); + TSDBClient tsdb2 = new TSDBClient(config2); + + long currentSecond = System.currentTimeMillis() / 1000; + int value = 1; + Boolean failed = false; + + Point point = Point.metric(metric).tag(tagName, tagValue).value(currentSecond, value).build(); + MultiFieldPoint mpoint = MultiFieldPoint.metric(metric).tag(tagName, tagValue).timestamp(currentSecond).field("field", value).build(); + tsdb.putSync(point); + tsdb.multiFieldPutSync(mpoint); + tsdb2.putSync(point); + tsdb2.multiFieldPutSync(mpoint); + + + // STEP 1: TEST FOR HAPolicy.RetryRule.Secondary AND HAPolicy.RetryRule.SecondaryPreferred + HAPolicy SecondaryPreferredpolicy = HAPolicy.addSecondaryCluster(secondaryIp, secondaryPort) + .setRetryRule(HAPolicy.RetryRule.SecondaryPreferred).setRetryTimes(1).setHASwitch(checkCount, checkInterval).build(); + TSDBConfig SecondaryPreferredConfig = TSDBConfig.address(mainIp, mainPort).addHAPolicy(SecondaryPreferredpolicy).config(); + TSDBClient secondaryPreferredTsdb = new TSDBClient(SecondaryPreferredConfig); + + HAPolicy secondaryPolicy = HAPolicy.addSecondaryCluster(secondaryIp, secondaryPort) + .setRetryRule(HAPolicy.RetryRule.Secondary).setRetryTimes(1).setHASwitch(checkCount, checkInterval).build(); + TSDBConfig secondaryConfig = TSDBConfig.address(mainIp, mainPort).addHAPolicy(secondaryPolicy).config(); + TSDBClient secondaryTsdb = new TSDBClient(secondaryConfig); + + System.out.println("please kill primary cluster!"); + while (true) { + try { + TSDBConfig tempConfig = TSDBConfig.address(mainIp, mainPort).config(); + new TSDBClient(tempConfig); + } catch (Exception e) { + break; + } + System.out.println("please kill primary cluster!"); + Thread.sleep(3 * 1000); + } + + + Thread.sleep(checkCount * checkInterval * 2 * 1000 + 1000); + // write should recover now + secondaryPreferredTsdb.putSync(point); + secondaryTsdb.putSync(point); + + Query query = Query.timeRange(currentSecond - 0, currentSecond + 1) + .sub(SubQuery.metric(metric).aggregator(Aggregator.NONE).tag(tags).build()).build(); + List results = secondaryPreferredTsdb.query(query); + Assert.assertTrue(results.size() > 0); + failed = false; + try { + secondaryTsdb.query(query); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + + MultiFieldQuery mquery = MultiFieldQuery.timeRange(currentSecond - 0, currentSecond + 1) + .sub(MultiFieldSubQuery.metric(metric).tags(tags) + .fieldsInfo(new MultiFieldSubQueryDetails.Builder("*", Aggregator.NONE).build()) + .build()) + .build(); + List mresults = secondaryPreferredTsdb.multiFieldQuery(mquery); + Assert.assertTrue(mresults.size() > 0); + failed = false; + try { + secondaryTsdb.multiFieldQuery(mquery); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + + LastPointQuery lastQuery = LastPointQuery + .builder().backScan(0).msResolution(true) + .sub(LastPointSubQuery.builder(metric, tags).build()).build(); + List lresults = secondaryPreferredTsdb.queryLast(lastQuery); + Assert.assertTrue(lresults.size() > 0); + failed = false; + try { + secondaryTsdb.queryLast(lastQuery); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + + LastPointQuery mlastQuery = LastPointQuery + .builder().backScan(0).msResolution(true) + .sub(LastPointSubQuery.builder(metric, new ArrayList() {{add("*");}}, tags).build()).build(); + List mlresults = secondaryPreferredTsdb.multiFieldQueryLast(mlastQuery); + Assert.assertTrue(mlresults.size() > 0); + failed = false; + try { + secondaryTsdb.multiFieldQueryLast(mlastQuery); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + + // STEP 2: TEST FOR HAPolicy.RetryRule.Primary AND HAPolicy.RetryRule.PrimaryPreferred + while (true) { + try { + TSDBConfig tempConfig = TSDBConfig.address(mainIp, mainPort).config(); + new TSDBClient(tempConfig); + break; + } catch (Exception e) { + System.out.println("please start up primary cluster!"); + Thread.sleep(5 * 1000); + } + } + + HAPolicy PrimaryPreferredPolicy = HAPolicy.addSecondaryCluster(secondaryIp, secondaryPort) + .setRetryRule(HAPolicy.RetryRule.PrimaryPreferred).setRetryTimes(1).setHASwitch(checkCount, checkInterval).build(); + TSDBConfig PrimaryPreferredConfig = TSDBConfig.address(mainIp, mainPort).addHAPolicy(PrimaryPreferredPolicy).config(); + TSDBClient primaryPreferredTsdb = new TSDBClient(PrimaryPreferredConfig); + + HAPolicy mainPolicy = HAPolicy.addSecondaryCluster(secondaryIp, secondaryPort) + .setRetryRule(HAPolicy.RetryRule.Primary).setRetryTimes(1).setHASwitch(checkCount, checkInterval).build(); + TSDBConfig mainConfig = TSDBConfig.address(mainIp, mainPort).addHAPolicy(mainPolicy).config(); + TSDBClient mainTsdb = new TSDBClient(mainConfig); + + System.out.println("please kill primary cluster again!"); + Thread.sleep(5 * 1000); + while (true) { + try { + TSDBConfig tempConfig = TSDBConfig.address(mainIp, mainPort).config(); + new TSDBClient(tempConfig); + } catch (Exception e) { + break; + } + System.out.println("please kill primary cluster again!"); + Thread.sleep(5 * 1000); + } + + results = primaryPreferredTsdb.query(query); + Assert.assertTrue(results.size() > 0); + failed = false; + try { + mainTsdb.query(query); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + + mresults = primaryPreferredTsdb.multiFieldQuery(mquery); + Assert.assertTrue(mresults.size() > 0); + failed = false; + try { + mainTsdb.multiFieldQuery(mquery); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + + lresults = primaryPreferredTsdb.queryLast(lastQuery); + Assert.assertTrue(lresults.size() > 0); + failed = false; + try { + mainTsdb.queryLast(lastQuery); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + + mlresults = primaryPreferredTsdb.multiFieldQueryLast(mlastQuery); + Assert.assertTrue(mlresults.size() > 0); + failed = false; + try { + mainTsdb.multiFieldQueryLast(mlastQuery); + } catch (Exception e) { + failed = true; + } + Assert.assertTrue(failed); + } + + /** + * @author yunxing + * @description this testcase will not stop, + * kill primary cluster or secondary cluster in turn to check ha swtich + * @throws InterruptedException + */ + @Test + public void testForeverWrite() throws InterruptedException { + final String metric = "TestForeverWrite"; + final String tagName = "_id"; + final String tagValue = "tagv"; + final String mainIp = "127.0.0.1"; + final int mainPort = 3002; + final String secondaryIp = "127.0.0.1"; + final int secondaryPort = 3003; + int checkInterval = 5; + int checkCount = 1; + + long currentSecond = System.currentTimeMillis() / 1000; + int value = 1; + MultiFieldPoint mpoint = MultiFieldPoint.metric(metric).tag(tagName, tagValue).timestamp(currentSecond).field("field", value).build(); + + + // STEP 1: TEST FOR HAPolicy.RetryRule.Secondary AND HAPolicy.RetryRule.SecondaryPreferred + HAPolicy SecondaryPreferredpolicy = HAPolicy.addSecondaryCluster(secondaryIp, secondaryPort) + .setRetryRule(HAPolicy.RetryRule.SecondaryPreferred).setRetryTimes(1).setHASwitch(checkCount, checkInterval).build(); + TSDBConfig SecondaryPreferredConfig = TSDBConfig.address(mainIp, mainPort).addHAPolicy(SecondaryPreferredpolicy).config(); + TSDBClient secondaryPreferredTsdb = new TSDBClient(SecondaryPreferredConfig); + + while (true) { + try { + secondaryPreferredTsdb.multiFieldPutSync(mpoint); + System.out.println("write success"); + } catch (Exception e) { + System.out.println("write failed"); + } + Thread.sleep(1000); + } + } +}