Skip to content

Commit

Permalink
Adding configuration properties for the state storage location (#245)
Browse files Browse the repository at this point in the history
* Adding configuration properties for the state storage location
  • Loading branch information
Paultagoras authored Nov 13, 2023
1 parent 94c8701 commit 7e09ba0
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.0.5 2023-11-12
* Added 'zkPath' and 'zkDatabase' properties to customize exactly-once state storage

## 1.0.4 2023-11-08
* Bugfix

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.4
v1.0.5
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.config.ClickHouseProxyType;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -36,6 +37,8 @@ public class ClickHouseSinkConfig {
public static final String PROXY_TYPE = "proxyType";
public static final String PROXY_HOST = "proxyHost";
public static final String PROXY_PORT = "proxyPort";
public static final String ZK_PATH = "zkPath";
public static final String ZK_DATABASE = "zkDatabase";



Expand Down Expand Up @@ -75,6 +78,8 @@ public enum StateStores {
private final ClickHouseProxyType proxyType;
private final String proxyHost;
private final int proxyPort;
private final String zkPath;
private final String zkDatabase;

public enum InsertFormats {
NONE,
Expand All @@ -99,6 +104,16 @@ public String toString() {
return "utf-8 string";
}
}
public static final class ZKPathValidator implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object o) {
String s = (String) o;
if (s == null || s.isBlank() || !s.startsWith("/")) {
throw new ConfigException("zkPath cannot be empty and must begin with a forward slash");
}
}
}

public static final class InsertFormatValidatorAndRecommender implements ConfigDef.Recommender {
@Override
Expand Down Expand Up @@ -202,6 +217,8 @@ public ClickHouseSinkConfig(Map<String, String> props) {
}
this.proxyHost = props.getOrDefault(PROXY_HOST, "");
this.proxyPort = Integer.parseInt(props.getOrDefault(PROXY_PORT, "-1"));
this.zkPath = props.getOrDefault(ZK_PATH, "/kafka-connect");
this.zkDatabase = props.getOrDefault(ZK_DATABASE, "connect_state");

LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
Expand Down Expand Up @@ -409,6 +426,27 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.SHORT,
"Proxy port"
);
configDef.define(ZK_PATH,
ConfigDef.Type.STRING,
"/kafka-connect",
new ZKPathValidator(),
ConfigDef.Importance.LOW,
"This is the zookeeper path for the state store",
group,
++orderInGroup,
ConfigDef.Width.MEDIUM,
"Zookeeper path"
);
configDef.define(ZK_DATABASE,
ConfigDef.Type.STRING,
"connect_state",
ConfigDef.Importance.LOW,
"This is the database for the state store",
group,
++orderInGroup,
ConfigDef.Width.MEDIUM,
"State store database"
);

return configDef;
}
Expand Down Expand Up @@ -460,4 +498,10 @@ public String getProxyHost() {
public int getProxyPort() {
return proxyPort;
}
public String getZkPath() {
return zkPath;
}
public String getZkDatabase() {
return zkDatabase;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ public class KeeperStateProvider implements StateProvider {


private ClickHouseHelperClient chc = null;
private ClickHouseSinkConfig csc = null;

/*
create table connect_state (`key` String, minOffset BIGINT, maxOffset BIGINT, state String) ENGINE=KeeperMap('/kafka-coonect', 'localhost:9181') PRIMARY KEY `key`;
*/
public KeeperStateProvider(ClickHouseSinkConfig csc) {
this.csc = csc;

String hostname = csc.getHostname();
int port = csc.getPort();
Expand Down Expand Up @@ -66,17 +65,20 @@ public KeeperStateProvider(ClickHouseHelperClient chc) {
init();
}

private boolean init() {
String createTable = String.format("CREATE TABLE IF NOT EXISTS connect_state (`key` String, minOffset BIGINT, maxOffset BIGINT, state String) ENGINE=KeeperMap('/kafka-connect') PRIMARY KEY `key`;" );
chc.query(createTable);
return true;
private void init() {
String createTable = String.format("CREATE TABLE IF NOT EXISTS `%s` " +
"(`key` String, minOffset BIGINT, maxOffset BIGINT, state String) " +
"ENGINE=KeeperMap('%s') PRIMARY KEY `key`;",
csc.getZkDatabase(),
csc.getZkPath());
ClickHouseResponse r = chc.query(createTable);
r.close();
}

@Override
public StateRecord getStateRecord(String topic, int partition) {
//SELECT * from connect_state where `key`= ''
String key = String.format("%s-%d", topic, partition);
String selectStr = String.format("SELECT * from connect_state where `key`= '%s'", key);
String selectStr = String.format("SELECT * FROM `%s` WHERE `key`= '%s'", csc.getZkDatabase(), key);
try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
Expand All @@ -97,7 +99,6 @@ public StateRecord getStateRecord(String topic, int partition) {
LOGGER.debug(String.format("read state record: topic %s partition %s with %s state max %d min %d", topic, partition, state, maxOffset, minOffset));
return new StateRecord(topic, partition, maxOffset, minOffset, state);
} catch (ClickHouseException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Expand All @@ -108,9 +109,10 @@ public void setStateRecord(StateRecord stateRecord) {
long maxOffset = stateRecord.getMaxOffset();
String key = stateRecord.getTopicAndPartitionKey();
String state = stateRecord.getState().toString();
String insertStr = String.format("INSERT INTO connect_state values ('%s', %d, %d, '%s');", key, minOffset, maxOffset, state);
String insertStr = String.format("INSERT INTO `%s` values ('%s', %d, %d, '%s');", csc.getZkDatabase() ,key, minOffset, maxOffset, state);
ClickHouseResponse response = this.chc.query(insertStr);
LOGGER.info(String.format("write state record: topic %s partition %s with %s state max %d min %d", stateRecord.getTopic(), stateRecord.getPartition(), state, maxOffset, minOffset));
LOGGER.debug(String.format("Number of written rows [%d]", response.getSummary().getWrittenRows()));
response.close();
}
}

0 comments on commit 7e09ba0

Please sign in to comment.