Skip to content

feat: ha swtich [failover] #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/main/java/com/aliyun/hitsdb/client/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@

import com.aliyun.hitsdb.client.callback.AbstractBatchPutCallback;
import com.aliyun.hitsdb.client.callback.AbstractMultiFieldBatchPutCallback;
import com.aliyun.hitsdb.client.exception.http.HttpClientInitException;
import com.aliyun.hitsdb.client.http.Host;

import java.io.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public interface Config {
String BASICTYPE = "Basic"; // it should be "Basic" according to RFC 2617
Expand Down
65 changes: 64 additions & 1 deletion src/main/java/com/aliyun/hitsdb/client/HAPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
public class HAPolicy {
private Pair<String, Integer> secondaryCluster;
private RetryRule retryRule = RetryRule.SecondaryPreferred;
private int queryRetryTimes = 0;
private int queryRetryTimes;
private boolean haSwitch;
private int checkCount;
private int intervalSeconds;
/*
* TODO: support queryRetryInterval
*/
Expand Down Expand Up @@ -61,6 +64,19 @@ public Builder setRetryTimes(int retryTimes) {
return this;
}

public Builder setHASwitch(int checkCount, int intervalSeconds) {
if (checkCount < 1) {
throw new IllegalArgumentException("checkCount must greater or equal than 1");
}
if (intervalSeconds < 1) {
throw new IllegalArgumentException("intervalSeconds must greater or equal than 1 second");
}
policy.haSwitch = true;
policy.checkCount = checkCount;
policy.intervalSeconds = intervalSeconds;
return this;
}

public HAPolicy build() {
return policy;
}
Expand All @@ -86,6 +102,18 @@ public long getQueryRetryInterval() {
return queryRetryInterval;
}

public boolean isHaSwitch() {
return haSwitch;
}

public int getCheckCount() {
return checkCount;
}

public int getIntervalSeconds() {
return intervalSeconds;
}

public static class QueryContext {
private HAPolicy haPolicy;
private int retryTimes = 0;
Expand Down Expand Up @@ -136,4 +164,39 @@ public HttpClient getClient() {
}
}
}

public static class HAContext {
private HAPolicy haPolicy;
private HttpClient primaryClient;
private HttpClient secondaryClient;
private int retryTimes = 0;

public HAContext(HAPolicy haPolicy, HttpClient primaryClient, HttpClient secondaryClient) {
this.haPolicy = haPolicy;
this.primaryClient = primaryClient;
this.secondaryClient = secondaryClient;
}

protected void addRetryTimes() {
retryTimes++;
if (retryTimes > haPolicy.getCheckCount()) {
HttpClient temp = secondaryClient;
secondaryClient = primaryClient;
primaryClient = temp;
retryTimes = 0;
}
}

protected void resetRetryTimes() {
retryTimes = 0;
}

public HttpClient getPrimaryClient() {
return primaryClient;
}

public HttpClient getSecondaryClient() {
return secondaryClient;
}
}
}
36 changes: 34 additions & 2 deletions src/main/java/com/aliyun/hitsdb/client/TSDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.aliyun.hitsdb.client.http.response.ResultResponse;
import com.aliyun.hitsdb.client.queue.DataQueue;
import com.aliyun.hitsdb.client.queue.DataQueueFactory;
import com.aliyun.hitsdb.client.util.HealthManager;
import com.aliyun.hitsdb.client.util.HealthWatcher;
import com.aliyun.hitsdb.client.util.LinkedHashMapUtils;
import com.aliyun.hitsdb.client.value.request.UniqueUtil;
import com.aliyun.hitsdb.client.value.JSONValue;
Expand Down Expand Up @@ -51,11 +53,13 @@ public class TSDBClient implements TSDB {
private final Consumer consumer;
private final HttpResponseCallbackFactory httpResponseCallbackFactory;
protected final boolean httpCompress;
protected final HttpClient httpclient;
private final HttpClient secondaryClient;
protected HttpClient httpclient;
private HttpClient secondaryClient;
private HealthManager healthManager;
private RateLimiter rateLimiter;
private final Config config;
private static Field queryDeleteField;
private HAPolicy.HAContext haContext;

static {
try {
Expand Down Expand Up @@ -110,6 +114,15 @@ public TSDBClient(Config config) throws HttpClientInitException {
}
LOGGER.info("The tsdb client has started.");

if (config.getHAPolicy() != null && config.getHAPolicy().isHaSwitch()) {
// start ha switch monitor
this.healthManager = new HealthManager();
this.healthManager.setIntervalSeconds(config.getHAPolicy().getIntervalSeconds());
this.healthManager.start();
this.haContext = new HAPolicy.HAContext(config.getHAPolicy(), httpclient, secondaryClient);
this.healthManager.watch(httpclient.getHost() + ":" + httpclient.getPort(), healthWatcher);
}

try {
this.checkConnection();
} catch (Exception e) {
Expand All @@ -129,6 +142,25 @@ public TSDBClient(Config config) throws HttpClientInitException {
}
}

private final HealthWatcher healthWatcher = new HealthWatcher() {
@Override
public void health(String host, boolean health) {
// 健康则直接返回
if (!health) {
haContext.addRetryTimes();
HttpClient prevClient = httpclient;
httpclient = haContext.getPrimaryClient();
secondaryClient = haContext.getSecondaryClient();
if (prevClient != httpclient) {
healthManager.unWatchAll();
healthManager.watch(httpclient.getHost() + ":" + httpclient.getPort(), healthWatcher);
}
} else {
haContext.resetRetryTimes();
}
}
};

private static final String EMPTY_HOLDER = new JSONObject().toJSONString();
private static final String VIP_API = "/api/vip_health";

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/aliyun/hitsdb/client/http/HttpAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface HttpAPI {

String DELETE_ALL_TABLE = "/api/delete_all_table";

String HEALTH = "/api/health";

String VIP_HEALTH = "/api/vip_health";

/**
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/com/aliyun/hitsdb/client/util/HealthManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.Map;
import java.util.concurrent.*;

Expand Down Expand Up @@ -52,6 +47,10 @@ public void unWatch(final String host) {
this.watchers.remove(host);
}

public void unWatchAll() {
this.watchers.clear();
}


private final class WatchRunnable implements Runnable {

Expand Down Expand Up @@ -79,7 +78,7 @@ public boolean healthCheck(String host) {
if (!host.startsWith("http")) {
host = "http://" + host;
}
String url = host + HttpAPI.VIP_HEALTH;
String url = host + HttpAPI.HEALTH;
if (LOG.isDebugEnabled()) {
LOG.debug("start to check {} ", host);
}
Expand Down
Loading