Skip to content

Commit

Permalink
Merge pull request #474 from ClickHouse/remove-redis-keystore-support
Browse files Browse the repository at this point in the history
Removed RedisStateProvider + optimze imports for project
  • Loading branch information
Paultagoras authored Nov 21, 2024
2 parents bce144d + 5c2af87 commit fef6bb7
Show file tree
Hide file tree
Showing 56 changed files with 217 additions and 250 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# 1.2.5
* Remove redis state provide since we are using KeeperMap for state storage
* Remove unused avro property from `build.gradle.kts`
* Trim schemaless data to only pass the fields that are in the table
* Allow bypassing the schema validation
Expand Down
7 changes: 5 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
*/

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import org.gradle.api.tasks.testing.logging.TestExceptionFormat

val defaultJdkVersion = 17
java {
Expand Down Expand Up @@ -72,7 +72,7 @@ dependencies {
implementation("com.clickhouse:clickhouse-http-client:${project.extra["clickHouseDriverVersion"]}")
implementation("com.clickhouse:clickhouse-data:${project.extra["clickHouseDriverVersion"]}")
implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}")
implementation("io.lettuce:lettuce-core:6.5.0.RELEASE")
implementation("io.projectreactor:reactor-core:3.7.0")
implementation("com.google.code.gson:gson:2.11.0")
// https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5
implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
Expand Down Expand Up @@ -148,6 +148,9 @@ tasks.create("integrationTest", Test::class.java) {
systemProperties = System.getProperties() as Map<String, Any>
}

tasks.withType<JavaCompile> {
options.compilerArgs.addAll(listOf("-Xlint:unchecked", "-Xlint:deprecation"))
}

tasks.withType<Test> {
maxParallelForks = (Runtime.getRuntime().availableProcessors() / 2).takeIf { it > 0 } ?: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertTrue;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.*;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.kafka.connect.ClickHouseSinkConnector;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import org.junit.jupiter.api.*;
import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.clickhouse.ClickHouseContainer;
Expand All @@ -19,10 +22,13 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ClickHouseSinkConnectorIntegrationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkConnectorIntegrationTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI;
import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;

import java.io.*;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.*;
import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.createReplicatedMergeTreeTable;
import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.dropTable;
import static org.junit.jupiter.api.Assertions.assertTrue;


Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.clickhouse.kafka.connect.sink.helper;

import com.clickhouse.client.*;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,7 +18,11 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.*;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.clickhouse.kafka.connect.sink.helper;

import com.clickhouse.client.*;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.clickhouse.kafka.connect.sink.helper;




import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -21,9 +19,11 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Stream;


public class ConfluentPlatform {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package com.clickhouse.kafka.connect.sink.helper;

import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.LongStream;

public class SchemaTestData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import org.apache.kafka.connect.sink.SinkRecord;

import java.math.BigDecimal;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.LongStream;

public class SchemalessTestData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ public class ClickHouseSinkConfig {
public static final Integer tableRefreshIntervalDefault = 0;
public static final Boolean exactlyOnceDefault = Boolean.FALSE;
public static final Boolean customInsertFormatDefault = Boolean.FALSE;
public enum StateStores {
NONE,
IN_MEMORY,
REDIS,
KEEPER_MAP
}

private final String hostname;
private final int port;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,18 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.*;
import com.clickhouse.kafka.connect.ClickHouseSinkConnector;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.db.ClickHouseWriter;
import com.clickhouse.kafka.connect.sink.db.DBWriter;
import com.clickhouse.kafka.connect.sink.db.InMemoryDBWriter;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.sink.kafka.RangeContainer;
import com.clickhouse.kafka.connect.sink.processing.Processing;
import com.clickhouse.kafka.connect.sink.state.State;
import com.clickhouse.kafka.connect.sink.state.StateProvider;
import com.clickhouse.kafka.connect.sink.state.StateRecord;
import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
import com.clickhouse.kafka.connect.sink.state.provider.RedisStateProvider;
import com.clickhouse.kafka.connect.util.Utils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Collectors;
import java.util.Collection;
import java.util.Map;

public class ClickHouseSinkTask extends SinkTask {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import com.clickhouse.kafka.connect.sink.state.StateProvider;
import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
import com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider;
import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer;
import com.clickhouse.kafka.connect.util.jmx.MBeanServerUtils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,10 +20,10 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Timer;

public class ProxySinkTask {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.clickhouse.kafka.connect.sink.data.convert.EmptyRecordConvertor;
import com.clickhouse.kafka.connect.sink.data.convert.RecordConvertor;
import com.clickhouse.kafka.connect.sink.data.convert.SchemalessRecordConvertor;
import com.clickhouse.kafka.connect.sink.data.convert.SchemaRecordConvertor;
import com.clickhouse.kafka.connect.sink.data.convert.SchemalessRecordConvertor;
import com.clickhouse.kafka.connect.sink.data.convert.StringRecordConvertor;
import com.clickhouse.kafka.connect.sink.kafka.OffsetContainer;
import lombok.Getter;
Expand All @@ -12,8 +12,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.clickhouse.kafka.connect.sink.data;

import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -9,7 +15,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class StructToJsonMap {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package com.clickhouse.kafka.connect.sink.data.convert;

import com.clickhouse.kafka.connect.sink.data.Data;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.data.SchemaType;
import com.clickhouse.kafka.connect.sink.kafka.OffsetContainer;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class EmptyRecordConvertor extends RecordConvertor {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.clickhouse.kafka.connect.sink.data.convert;

import java.util.regex.Pattern;
import org.apache.kafka.connect.data.Schema;
import com.clickhouse.kafka.connect.sink.data.Record;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.regex.Pattern;

public abstract class RecordConvertor {
public Record convert(SinkRecord sinkRecord, boolean splitDBTopic, String dbTopicSeparatorChar, String configurationDatabase) {
String database = configurationDatabase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.clickhouse.kafka.connect.sink.kafka.OffsetContainer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

Expand Down
Loading

0 comments on commit fef6bb7

Please sign in to comment.