Skip to content

Commit

Permalink
Feature/add headers and metadata v2 (#118)
Browse files Browse the repository at this point in the history
* * Fix e2e tests
  • Loading branch information
ag-ramachandran authored May 10, 2024
1 parent c5f53b5 commit 9a24b7d
Show file tree
Hide file tree
Showing 19 changed files with 60,346 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,17 @@ public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(Ku
String format = mapping.getFormat();
log.debug("Using format {} ", format);
if (format != null && !format.isEmpty()) {
if (isSchemaType(format)) {
props.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
} else {
props.setDataFormat(format);
}
props.setDataFormat(format);
// if (isSchemaType(format)) {
// props.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
// } else {
// props.setDataFormat(format);
// }
}
String mappingRef = mapping.getMapping();
if (mappingRef != null && !mappingRef.isEmpty() && format != null) {
if (isSchemaType(format)) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.JSON);
} else {
if (StringUtils.isNotEmpty(mappingRef) && StringUtils.isNotEmpty(format)) {
props.setIngestionMapping(mappingRef,
IngestionMapping.IngestionMappingKind.valueOf(format.toUpperCase(Locale.ROOT)));
}
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.ingestionProperties = props;
Expand All @@ -152,14 +149,6 @@ public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(Ku
}
}

private static boolean isSchemaType(@NotNull String format) {
return format.equalsIgnoreCase(IngestionProperties.DataFormat.JSON.name())
|| format.equalsIgnoreCase(IngestionProperties.DataFormat.SINGLEJSON.name())
|| format.equalsIgnoreCase(IngestionProperties.DataFormat.MULTIJSON.name())
|| format.equalsIgnoreCase(IngestionProperties.DataFormat.APACHEAVRO.name())
|| format.equalsIgnoreCase(IngestionProperties.DataFormat.AVRO.name());
}

/**
* This function validates whether the user has the read and write access to the
* intended table
Expand All @@ -176,9 +165,6 @@ private static void validateTableAccess(Client engineClient, TopicToTableMapping
String format = mapping.getFormat();
String mappingName = mapping.getMapping();
boolean streamingEnabled = mapping.isStreaming();
if (isSchemaType(format)) {
format = IngestionProperties.DataFormat.JSON.name();
}
boolean hasAccess = false;
boolean shouldCheckStreaming = streamingEnabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
Expand All @@ -35,6 +37,9 @@
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;

import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.*;
import static com.microsoft.azure.kusto.kafka.connect.sink.formatwriter.FormatWriterHelper.isSchemaFormat;

public class TopicPartitionWriter {

private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
Expand All @@ -53,13 +58,13 @@ public class TopicPartitionWriter {
private final String dlqTopicName;
private final Producer<byte[], byte[]> dlqProducer;
private final BehaviorOnError behaviorOnError;
private final ReentrantReadWriteLock reentrantReadWriteLock;
FileWriter fileWriter;
long currentOffset;
Long lastCommittedOffset;
private final ReentrantReadWriteLock reentrantReadWriteLock;

TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps,
@NotNull KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer) {
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer) {
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps;
Expand All @@ -76,13 +81,13 @@ public class TopicPartitionWriter {
this.dlqProducer = dlqProducer;
}

static @NotNull String getTempDirectoryName(String tempDirPath) {
static String getTempDirectoryName(String tempDirPath) {
String tempDir = String.format("kusto-sink-connector-%s", UUID.randomUUID());
Path path = Paths.get(tempDirPath, tempDir).toAbsolutePath();
return path.toString();
}

public void handleRollFile(@NotNull SourceFile fileDescriptor) {
public void handleRollFile(SourceFile fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);

/*
Expand All @@ -93,7 +98,7 @@ public void handleRollFile(@NotNull SourceFile fileDescriptor) {
*/
for (int retryAttempts = 0; true; retryAttempts++) {
try {
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProps.ingestionProperties);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, updateIngestionPropertiesWithTargetFormat());
if (ingestionProps.streaming && ingestionResult instanceof IngestionStatusResult) {
// If IngestionStatusResult returned then the ingestion status is from streaming ingest
IngestionStatus ingestionStatus = ingestionResult.getIngestionStatusCollection().get(0);
Expand All @@ -103,8 +108,11 @@ public void handleRollFile(@NotNull SourceFile fileDescriptor) {
continue;
}
}
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes,
currentOffset));
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s) " +
"to target table (%s) in database (%s)",
fileDescriptor.path, fileDescriptor.rawBytes, currentOffset,
ingestionProps.ingestionProperties.getTableName(),
ingestionProps.ingestionProperties.getDatabaseName()));
this.lastCommittedOffset = currentOffset;
return;
} catch (IngestionServiceException exception) {
Expand All @@ -124,7 +132,7 @@ public void handleRollFile(@NotNull SourceFile fileDescriptor) {
}
}

private boolean hasStreamingSucceeded(@NotNull IngestionStatus status) {
private boolean hasStreamingSucceeded(IngestionStatus status) {
switch (status.status) {
case Succeeded:
case Queued:
Expand All @@ -136,8 +144,8 @@ private boolean hasStreamingSucceeded(@NotNull IngestionStatus status) {
String details = status.getDetails();
UUID ingestionSourceId = status.getIngestionSourceId();
log.warn("A batch of streaming records has {} ingestion: table:{}, database:{}, operationId: {}," +
"ingestionSourceId: {}{}{}.\n" +
"Status is final and therefore ingestion won't be retried and data won't reach dlq",
"ingestionSourceId: {}{}{}.\n" +
"Status is final and therefore ingestion won't be retried and data won't reach dlq",
status.getStatus(),
status.getTable(),
status.getDatabase(),
Expand Down Expand Up @@ -176,9 +184,9 @@ private void backOffForRemainingAttempts(int retryAttempts, Exception exception,
}
}

public void sendFailedRecordToDlq(@NotNull SinkRecord record) {
public void sendFailedRecordToDlq(SinkRecord record) {
byte[] recordKey = String.format("Failed to write record to KustoDB with the following kafka coordinates, "
+ "topic=%s, partition=%s, offset=%s.",
+ "topic=%s, partition=%s, offset=%s.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset()).getBytes(StandardCharsets.UTF_8);
Expand All @@ -204,7 +212,7 @@ String getFilePath(@Nullable Long offset) {
long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset;

return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset,
"json", COMPRESSION_EXTENSION)).toString();
ingestionProps.ingestionProperties.getDataFormat(), COMPRESSION_EXTENSION)).toString();
}

void writeRecord(SinkRecord record) throws ConnectException {
Expand Down Expand Up @@ -264,8 +272,27 @@ void close() {
log.error("Unable to delete temporary connector folder {}", basePath);
}
}

void stop() {
fileWriter.stop();
}
private @NotNull IngestionProperties updateIngestionPropertiesWithTargetFormat() {
IngestionProperties updatedIngestionProperties = new IngestionProperties(this.ingestionProps.ingestionProperties);
IngestionProperties.DataFormat sourceFormat = ingestionProps.ingestionProperties.getDataFormat();
if (isSchemaFormat(sourceFormat)) {
log.info("Incoming dataformat {}, setting target format to MULTIJSON", sourceFormat);
updatedIngestionProperties.setDataFormat(MULTIJSON);
} else {
updatedIngestionProperties.setDataFormat(ingestionProps.ingestionProperties.getDataFormat());
}
// Just to make it clear , split the conditional
if (isSchemaFormat(sourceFormat)) {
IngestionMapping mappingReference = ingestionProps.ingestionProperties.getIngestionMapping();
if (mappingReference != null && StringUtils.isNotEmpty(mappingReference.getIngestionMappingReference())) {
String ingestionMappingReferenceName = mappingReference.getIngestionMappingReference();
updatedIngestionProperties.setIngestionMapping(ingestionMappingReferenceName, IngestionMapping.IngestionMappingKind.JSON);
}
}
return updatedIngestionProperties;
}
}

Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatwriter;

import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,18 +29,26 @@
import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.serializers.NonRecordContainer;

import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.*;
import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.SINGLEJSON;

public class FormatWriterHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(KustoRecordWriter.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().enable(DeserializationFeature.FAIL_ON_TRAILING_TOKENS);
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private static final AvroData AVRO_DATA = new AvroData(50);
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE
= new TypeReference<Map<String, Object>>() {
};

private FormatWriterHelper() {
}

public static boolean isSchemaFormat(IngestionProperties.DataFormat dataFormat) {
return dataFormat == JSON || dataFormat == MULTIJSON
|| dataFormat == AVRO || dataFormat == SINGLEJSON;

}

protected static boolean isAvro(IngestionProperties.DataFormat dataFormat) {
return IngestionProperties.DataFormat.AVRO.equals(dataFormat)
|| IngestionProperties.DataFormat.APACHEAVRO.equals(dataFormat);
Expand Down Expand Up @@ -78,13 +85,7 @@ public static boolean isCsv(IngestionProperties.DataFormat dataFormat) {
return Collections.emptyMap();
}
if (isAvro(dataformat)) {
GenericRecord genericRecord = bytesToAvroRecord(messageBytes);
if (genericRecord != null) {
return convertAvroRecordToMap(AVRO_DATA.toConnectSchema(genericRecord.getSchema()), genericRecord);
} else {
LOGGER.error("Failed to convert bytes to Avro record. Bytes: {}", ArrayUtils.toString(messageBytes));
throw new IOException("Unable to convert bytes to AVRO record");
}
return bytesToAvroRecord(defaultKeyOrValueField,messageBytes);
}
String bytesAsJson = new String(messageBytes, StandardCharsets.UTF_8);
if (isJson(dataformat)) {
Expand Down Expand Up @@ -118,6 +119,7 @@ private static boolean isValidJson(String defaultKeyOrValueField, String json) {
}
OBJECT_MAPPER.readTree(json);
} catch (IOException e) {
LOGGER.debug("Parsed data is not json {} , failed with {}", json, e.getMessage());
return false;
}
return true;
Expand All @@ -141,9 +143,44 @@ private static boolean isJson(IngestionProperties.DataFormat dataFormat) {
|| IngestionProperties.DataFormat.SINGLEJSON.equals(dataFormat);
}

private static @Nullable GenericRecord bytesToAvroRecord(byte[] received_message) throws IOException {
DatumReader<GenericRecord> avroBytesReader = new GenericDatumReader<>();
Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null);
return avroBytesReader.read(null, decoder);
private static Map<String, Object> bytesToAvroRecord(String defaultKeyOrValueField,byte[] received_message) {
Map<String, Object> returnValue = new HashMap<>();
try {
// avro input parser
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
DataFileReader<GenericRecord> dataFileReader;
try {
dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(received_message), datumReader);
} catch (Exception e) {
LOGGER.error("Failed to parse AVRO record(1)\n{}", e.getMessage());
throw new ConnectException(
"Failed to parse AVRO " + "record\n" + e.getMessage());
}
while (dataFileReader.hasNext()) {
String jsonString = dataFileReader.next().toString();
try {
Map<String, Object> nodeMap = OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE);
returnValue.putAll(nodeMap);
} catch (IOException e) {
throw new ConnectException(
"Failed to parse JSON"
+ " "
+ "record\nInput String: "
+ jsonString
+ "\n"
+ e.getMessage());
}
}
try {
dataFileReader.close();
} catch (IOException e) {
throw new ConnectException(
"Failed to parse AVRO (2) " + "record\n" + e);
}
return returnValue;
} catch (Exception e) {
LOGGER.error("Failed to parse AVRO record (3) \n", e);
return Collections.singletonMap(defaultKeyOrValueField, Base64.getEncoder().encodeToString(received_message));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.microsoft.azure.kusto.kafka.connect.sink.formatwriter;

import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -20,6 +21,8 @@

import io.confluent.kafka.serializers.NonRecordContainer;

import static com.microsoft.azure.kusto.kafka.connect.sink.formatwriter.FormatWriterHelper.isSchemaFormat;

public abstract class HeaderAndMetadataWriter {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
public static final String LINE_SEPARATOR = System.lineSeparator();
Expand Down Expand Up @@ -90,11 +93,20 @@ public Map<String, Object> convertSinkRecordToMap(@NotNull SinkRecord record, bo
return (Map<String, Object>) recordValue;
}
// is a byte array
if (recordValue instanceof byte[]) {
return FormatWriterHelper.convertBytesToMap((byte[]) recordValue, defaultKeyOrValueField, dataFormat);
if(isSchemaFormat(dataFormat)){
if (recordValue instanceof byte[]) {
return FormatWriterHelper.convertBytesToMap((byte[]) recordValue, defaultKeyOrValueField, dataFormat);
}
else {
String fieldName = isKey ? KEY_FIELD : VALUE_FIELD;
return Collections.singletonMap(fieldName, recordValue);
}
} else {
String errorMessage = String.format("DataFormat %s is not supported in the connector though " +
"it may be supported for ingestion in ADX. Please raise a feature request if a " +
"new format has to be supported.", dataFormat);
throw new ConnectException(errorMessage);
}
String fieldName = isKey ? KEY_FIELD : VALUE_FIELD;
return Collections.singletonMap(fieldName, recordValue);
}


Expand Down
Loading

0 comments on commit 9a24b7d

Please sign in to comment.