Skip to content

Commit

Permalink
Merge pull request #151 from ClickHouse/date_conversion_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Paultagoras authored Jul 18, 2023
2 parents 4ad019c + 90804a5 commit c28d788
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.0.18 2023-07-17
* Support inline schema with org.apache.kafka.connect.data.Timestamp type
* Support inline schema with org.apache.kafka.connect.data.Time type

## 0.0.17 2023-07-12
* Updating Logo

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.0.17
v0.0.18
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -196,7 +197,13 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va
break;
case Date32:
if (value.getFieldType().equals(Schema.Type.INT32)) {
BinaryStreamUtils.writeInt32(stream, (Integer) value.getObject());
if (value.getObject().getClass().getName().endsWith(".Date")) {
Date date = (Date)value.getObject();
int time = (int)date.getTime();
BinaryStreamUtils.writeInt32(stream, time);
} else {
BinaryStreamUtils.writeInt32(stream, (Integer) value.getObject());
}
} else {
unsupported = true;
}
Expand All @@ -205,12 +212,19 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va
if (value.getFieldType().equals(Schema.Type.INT64)) {
BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value.getObject());
} else {

unsupported = true;
}
break;
case DateTime64:
if (value.getFieldType().equals(Schema.Type.INT64)) {
BinaryStreamUtils.writeInt64(stream, (Long) value.getObject());
if (value.getObject().getClass().getName().endsWith(".Date")) {
Date date = (Date)value.getObject();
long time = date.getTime();
BinaryStreamUtils.writeInt64(stream, time);
} else {
BinaryStreamUtils.writeInt64(stream, ((Long) value.getObject()).longValue());
}
} else {
unsupported = true;
}
Expand All @@ -237,10 +251,22 @@ private void doWritePrimitive(Type type, ClickHousePipedOutputStream stream, Obj
BinaryStreamUtils.writeInt16(stream, (Short) value);
break;
case INT32:
BinaryStreamUtils.writeInt32(stream, (Integer) value);
if (value.getClass().getName().endsWith(".Date")) {
Date date = (Date)value;
int time = (int)date.getTime();
BinaryStreamUtils.writeInt32(stream, time);
} else {
BinaryStreamUtils.writeInt32(stream, (Integer) value);
}
break;
case INT64:
BinaryStreamUtils.writeInt64(stream, (Long) value);
if (value.getClass().getName().endsWith(".Date")) {
Date date = (Date)value;
long time = date.getTime();
BinaryStreamUtils.writeInt64(stream, time);
} else {
BinaryStreamUtils.writeInt64(stream, (Long) value);
}
break;
case UINT8:
BinaryStreamUtils.writeUnsignedInt8(stream, (Byte) value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -390,6 +392,10 @@ public Collection<SinkRecord> createDateType(String topic, int partition) {
.field("date32_number", Schema.OPTIONAL_INT32_SCHEMA)
.field("datetime_number", Schema.INT64_SCHEMA)
.field("datetime64_number", Schema.INT64_SCHEMA)
.field("timestamp_int64", Timestamp.SCHEMA)
.field("timestamp_date", Timestamp.SCHEMA)
.field("time_int32" , Time.SCHEMA)
.field("time_date32" , Time.SCHEMA)
.build();


Expand All @@ -411,6 +417,10 @@ public Collection<SinkRecord> createDateType(String topic, int partition) {
.put("date32_number", localDateInt)
.put("datetime_number", localDateTimeLong)
.put("datetime64_number", currentTime)
.put("timestamp_int64", new Date(System.currentTimeMillis()))
.put("timestamp_date", new Date(System.currentTimeMillis()))
.put("time_int32", new Date(System.currentTimeMillis()))
.put("time_date32", new Date(System.currentTimeMillis()))
;


Expand Down Expand Up @@ -624,7 +634,7 @@ public void supportDatesTest() {

String topic = "support-dates-table-test";
dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Nullable(Date), date32_number Nullable(Date32), datetime_number DateTime, datetime64_number DateTime64 ) Engine = MergeTree ORDER BY off16");
createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Nullable(Date), date32_number Nullable(Date32), datetime_number DateTime, datetime64_number DateTime64, timestamp_int64 Int64, timestamp_date DateTime64, time_int32 Int32, time_date32 Date32 ) Engine = MergeTree ORDER BY off16");
// https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98
Collection<SinkRecord> sr = createDateType(topic, 1);

Expand All @@ -649,7 +659,7 @@ public void detectUnsupportedDataConversions() {

String topic = "support-unsupported-dates-table-test";
dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Date, date32_number Date32, datetime_number DateTime, datetime64_number DateTime64 ) Engine = MergeTree ORDER BY off16");
createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Date, date32_number Date32, datetime_number DateTime, datetime64_number DateTime64) Engine = MergeTree ORDER BY off16");

Collection<SinkRecord> sr = createUnsupportedDataConversions(topic, 1);

Expand Down

0 comments on commit c28d788

Please sign in to comment.