Skip to content

Commit

Permalink
Merge pull request #445 from ClickHouse/adjusting-exactly-once-mismatch
Browse files Browse the repository at this point in the history
Adding new state and logic around it
  • Loading branch information
Paultagoras authored Sep 25, 2024
2 parents 5191584 + 6f8ffe1 commit 7bf0a11
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 24 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 1.2.2
* Adding a new property `tolerateStateMismatch` to allow for the connector to continue processing even if the state stored in ClickHouse does not match the current offset in Kafka

# 1.2.1
* Adding some additional logging details to help debug issues

# 1.2.0
* Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.2.1
v1.2.2
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ClickHouseSinkConfig {
public static final String DB_TOPIC_SPLIT_CHAR = "dbTopicSplitChar";
public static final String KEEPER_ON_CLUSTER = "keeperOnCluster";
public static final String DATE_TIME_FORMAT = "dateTimeFormats";
public static final String TOLERATE_STATE_MISMATCH = "tolerateStateMismatch";

public static final int MILLI_IN_A_SEC = 1000;
private static final String databaseDefault = "default";
Expand Down Expand Up @@ -92,6 +93,7 @@ public enum StateStores {
private final String keeperOnCluster;
private final Map<String, DateTimeFormatter> dateTimeFormats;
private final String clientVersion;
private final boolean tolerateStateMismatch;

public enum InsertFormats {
NONE,
Expand Down Expand Up @@ -263,6 +265,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
}
}
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1");
this.tolerateStateMismatch = Boolean.parseBoolean(props.getOrDefault(TOLERATE_STATE_MISMATCH, "false"));

LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
Expand Down Expand Up @@ -558,6 +561,16 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.SHORT,
"Client version"
);
configDef.define(TOLERATE_STATE_MISMATCH,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Tolerate state mismatch. default: false",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Tolerate state mismatch."
);
return configDef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public RangeState getOverLappingState(RangeContainer rangeContainer) {
// ZEROED [10, 20] Actual [0, 10]
if (actualMinOffset == 0)
return RangeState.ZERO;
// PREVIOUS [10, 20] Actual [5, 8]
if (actualMaxOffset < minOffset)
return RangeState.PREVIOUS;
// ERROR [10, 20] Actual [8, 19]
return RangeState.ERROR;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.clickhouse.kafka.connect.sink.kafka;

public enum RangeState {

ZERO(0), //This is for when it seems like the topic has been deleted/recreated
SAME(1),
PREFIX(2),
SUFFIX(3),
CONTAINS(4),
OVER_LAPPING(5),
NEW(6),
ERROR(7);
ERROR(7),
PREVIOUS(8);


private int rangeState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ public class Processing {

private ErrorReporter errorReporter = null;

public Processing(StateProvider stateProvider, DBWriter dbWriter) {
this.stateProvider = stateProvider;
this.dbWriter = dbWriter;
this.errorReporter = null;
}

public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter, ClickHouseSinkConfig clickHouseSinkConfig) {
this.stateProvider = stateProvider;
this.dbWriter = dbWriter;
Expand Down Expand Up @@ -177,8 +171,8 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
doInsert(rightRecords, rightRangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case ERROR:
throw new RuntimeException(String.format("State MISMATCH given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
default: //case ERROR:
throw new RuntimeException(String.format("ERROR State given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset()));
}
break;
Expand Down Expand Up @@ -211,8 +205,17 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
doInsert(rightRecords, rightRangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case ERROR:
throw new RuntimeException(String.format("State MISMATCH given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
case PREVIOUS:
if (clickHouseSinkConfig.isTolerateStateMismatch()) {
LOGGER.warn("State MISMATCH as batch already processed - skipping [{}] records for topic: [{}], partition: [{}], minOffset: [{}], maxOffset: [{}], storedMinOffset: [{}], storedMaxOffset: [{}]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset());
} else {
throw new RuntimeException(String.format("State MISMATCH as batch already processed - skipping [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], storedMinOffset: [%s], storedMaxOffset: [%s]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset()));
}
break;
default: //case ERROR:
throw new RuntimeException(String.format("ERROR State given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset()));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.clickhouse.kafka.connect.sink.processing;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -18,6 +19,7 @@
import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -58,7 +60,7 @@ public void ProcessAllAtOnceNewTest() throws IOException, ExecutionException, In
List<Record> records = createRecords("test", 1);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
}
Expand All @@ -73,7 +75,7 @@ public void ProcessSplitNewTest() throws IOException, ExecutionException, Interr
assertEquals(records.size(), recordsHead.size() + recordsTail.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
processing.doLogic(recordsTail);
Expand All @@ -86,7 +88,7 @@ public void ProcessAllNewTwiceTest() throws IOException, ExecutionException, Int
List<Record> records = createRecords("test", 1);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
processing.doLogic(records);
Expand All @@ -102,7 +104,7 @@ public void ProcessAllNewFailedSetStateAfterProcessingTest() throws IOException,
//List<Record> recordsTail = records.subList(splitPoint, records.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -118,7 +120,7 @@ public void ProcessContainsBeforeProcessingTest() throws IOException, ExecutionE
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -133,7 +135,7 @@ public void ProcessContainsAfterProcessingTest() throws IOException, ExecutionEx
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
processing.doLogic(containsRecords);
Expand All @@ -148,7 +150,7 @@ public void ProcessOverlappingBeforeProcessingTest() throws IOException, Executi
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
processing.doLogic(containsRecords);
Expand All @@ -166,7 +168,7 @@ public void ProcessSplitNewWithBeforeProcessingTest() throws IOException, Execut
assertEquals(records.size(), recordsHead.size() + recordsTail.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -183,7 +185,7 @@ public void ProcessDeletedTopicBeforeProcessingTest() throws IOException, Execut
List<Record> containsRecords = records.subList(0,150);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand Down Expand Up @@ -218,7 +220,7 @@ public void ProcessPartialOverlappingBeforeProcessingTest() throws IOException,
List<Record> recordsTail = records.subList(splitPointLow, records.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -237,11 +239,30 @@ public void ProcessPartialOverlappingAfterProcessingTest() throws IOException, E
List<Record> recordsTail = records.subList(splitPointLow, records.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
processing.doLogic(recordsTail);
assertEquals(records.size(), dbWriter.recordsInserted());
}

@Test
@DisplayName("ProcessOldRecordsTest")
public void ProcessOldRecordsTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
List<Record> recordsHead = records.subList(1, 2);
StateProvider stateProvider = new InMemoryState();
stateProvider.setStateRecord(new StateRecord("test", 1, 5000, 4000, State.AFTER_PROCESSING));
DBWriter dbWriter = new InMemoryDBWriter();
Processing processingWithoutConfig = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
Assert.assertThrows(RuntimeException.class, () -> processingWithoutConfig.doLogic(recordsHead));

HashMap<String, String> config = new HashMap<>();
config.put(ClickHouseSinkConfig.TOLERATE_STATE_MISMATCH, "true");
ClickHouseSinkConfig clickHouseConfig = new ClickHouseSinkConfig(config);
Processing processing = new Processing(stateProvider, dbWriter, null, clickHouseConfig);
processing.doLogic(recordsHead);
assertEquals(0, dbWriter.recordsInserted());
}

}

0 comments on commit 7bf0a11

Please sign in to comment.