From 75f47637a5c473d40e2377d1d2c9bf8b7c6ed696 Mon Sep 17 00:00:00 2001 From: Martin Maier-Moessner Date: Sun, 8 Mar 2020 20:30:57 +0100 Subject: [PATCH] Support schema reference in record header for avro deserialization supports producers like Spring Cloud Stream close #231 relate to #234 --- src/main/java/org/kafkahq/models/Record.java | 4 +- .../repositories/AvroWireFormatConverter.java | 65 +++++++++++ .../repositories/RecordRepository.java | 53 ++++----- .../AvroWireFormatConverterTest.java | 107 ++++++++++++++++++ 4 files changed, 199 insertions(+), 30 deletions(-) create mode 100644 src/main/java/org/kafkahq/repositories/AvroWireFormatConverter.java create mode 100644 src/test/java/org/kafkahq/repositories/AvroWireFormatConverterTest.java diff --git a/src/main/java/org/kafkahq/models/Record.java b/src/main/java/org/kafkahq/models/Record.java index 7b1638550..d93a8fb80 100644 --- a/src/main/java/org/kafkahq/models/Record.java +++ b/src/main/java/org/kafkahq/models/Record.java @@ -30,7 +30,7 @@ public class Record { private final Map headers = new HashMap<>(); private final KafkaAvroDeserializer kafkaAvroDeserializer; - public Record(ConsumerRecord record, KafkaAvroDeserializer kafkaAvroDeserializer) { + public Record(ConsumerRecord record, KafkaAvroDeserializer kafkaAvroDeserializer, byte[] value) { this.topic = record.topic(); this.partition = record.partition(); this.offset = record.offset(); @@ -38,7 +38,7 @@ public Record(ConsumerRecord record, KafkaAvroDeserializer kafka this.timestampType = record.timestampType(); this.key = record.key(); this.keySchemaId = getAvroSchemaId(this.key); - this.value = record.value(); + this.value = value; this.valueSchemaId = getAvroSchemaId(this.value); for (Header header: record.headers()) { this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null); diff --git a/src/main/java/org/kafkahq/repositories/AvroWireFormatConverter.java b/src/main/java/org/kafkahq/repositories/AvroWireFormatConverter.java new file mode 100644 index 000000000..666da01a6 --- /dev/null +++ b/src/main/java/org/kafkahq/repositories/AvroWireFormatConverter.java @@ -0,0 +1,65 @@ +package org.kafkahq.repositories; + +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; + +import javax.inject.Singleton; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Converts an avro payload to the kafka avro wire format (https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format) + * Some producers (like Spring Cloud Stream) do write this wire format, but use the raw avro binary encoding (without magic byte and schema id) + * and put the reference to the schema in a header field. This converter will add the magic byte and schema id to the byte[] to + * be wire format compatible if the following conditions are met: + * - magic byte is not already present + * - schema reference (subject and version) can be found in the message header + * - schema can be fetch from the registry + */ +@Singleton +@Slf4j +public class AvroWireFormatConverter { + + private static final byte MAGIC_BYTE = 0; + private static final Pattern AVRO_CONTENT_TYPE_PATTERN = Pattern.compile("\"?application/vnd\\.(.+)\\.v(\\d+)\\+avro\"?"); + + public byte[] convertValueToWireFormat(ConsumerRecord record, SchemaRegistryClient registryClient) { + Iterator
contentTypeIter = record.headers().headers("contentType").iterator(); + byte[] value = record.value(); + if (contentTypeIter.hasNext() && + ArrayUtils.isNotEmpty(value) && + ByteBuffer.wrap(value).get() != MAGIC_BYTE) { + String headerValue = new String(contentTypeIter.next().value()); + Matcher matcher = AVRO_CONTENT_TYPE_PATTERN.matcher(headerValue); + if (matcher.matches()) { + String subject = matcher.group(1); + int version = Integer.parseInt(matcher.group(2)); + value = prependWireFormatHeader(value, registryClient, subject, version); + } + } + return value; + } + + private byte[] prependWireFormatHeader(byte[] value, SchemaRegistryClient registryClient, String subject, int version) { + try { + SchemaMetadata schemaMetadata = registryClient.getSchemaMetadata(subject, version); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(MAGIC_BYTE); + out.write(ByteBuffer.allocate(4).putInt(schemaMetadata.getId()).array()); + out.write(value); + value = out.toByteArray(); + } catch (IOException | RestClientException e) { + // ignore on purpose, dont prepend anything + } + return value; + } +} diff --git a/src/main/java/org/kafkahq/repositories/RecordRepository.java b/src/main/java/org/kafkahq/repositories/RecordRepository.java index 6a252487c..e2ffe79cd 100644 --- a/src/main/java/org/kafkahq/repositories/RecordRepository.java +++ b/src/main/java/org/kafkahq/repositories/RecordRepository.java @@ -3,31 +3,24 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; -import io.confluent.kafka.formatter.AvroMessageFormatter; -import io.confluent.kafka.formatter.AvroMessageReader; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.micronaut.context.annotation.Value; import io.micronaut.context.env.Environment; import io.micronaut.http.sse.Event; import io.reactivex.Flowable; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; import lombok.experimental.Wither; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -42,14 +35,14 @@ import javax.inject.Inject; import javax.inject.Singleton; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -67,6 +60,9 @@ public class RecordRepository extends AbstractRepository { @Inject private SchemaRegistryRepository schemaRegistryRepository; + @Inject + private AvroWireFormatConverter avroWireFormatConverter; + @Value("${kafkahq.topic-data.poll-timeout:1000}") protected int pollTimeout; @@ -357,7 +353,8 @@ private ConsumerRecords poll(KafkaConsumer consu } private Record newRecord(ConsumerRecord record, BaseOptions options) { - return new Record(record, this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId)); + return new Record(record, this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId), + avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId))); } diff --git a/src/test/java/org/kafkahq/repositories/AvroWireFormatConverterTest.java b/src/test/java/org/kafkahq/repositories/AvroWireFormatConverterTest.java new file mode 100644 index 000000000..50c52c7b6 --- /dev/null +++ b/src/test/java/org/kafkahq/repositories/AvroWireFormatConverterTest.java @@ -0,0 +1,107 @@ +package org.kafkahq.repositories; + +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.AvroSchemaUtils; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Slf4j +public class AvroWireFormatConverterTest { + + private AvroWireFormatConverter avroWireFormatConverter; + private SchemaRegistryClient schemaRegistryClient; + + @Data + @AllArgsConstructor + private static class MyRecord { + private int anInt; + private String aString; + } + + @BeforeEach + @SneakyThrows + public void before() { + avroWireFormatConverter = new AvroWireFormatConverter(); + schemaRegistryClient = mock(SchemaRegistryClient.class); + + ReflectData reflectData = ReflectData.get(); + Schema schema = reflectData.getSchema(MyRecord.class); + int id = 100; + when(schemaRegistryClient.getById(id)).thenReturn(schema); + when(schemaRegistryClient.getSchemaMetadata("mySubject", 1)).thenReturn(new SchemaMetadata(id, 1, "")); + } + + @Test + public void convertValueToWireFormatNull() { + byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(new ConsumerRecord<>("topic", 1, 0, new byte[0], null), schemaRegistryClient); + assertNull(convertedValue); + } + + @Test + public void convertValueToWireFormatEmptyValue() { + byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(new ConsumerRecord<>("topic", 1, 0, new byte[0], new byte[0]), schemaRegistryClient); + assertEquals(0, convertedValue.length); + } + + @Test + @SneakyThrows + public void convertValueToWireFormatWrongContentType() { + MyRecord record = new MyRecord(42, "leet"); + byte[] avroPayload = serializeAvro(record); + + ConsumerRecord consumerRecord = new ConsumerRecord<>("topic", 1, 0, new byte[0], avroPayload); + consumerRecord.headers().add(new RecordHeader("contentType", "mySubject.v1".getBytes())); + byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(consumerRecord, schemaRegistryClient); + + assertEquals(convertedValue, avroPayload); + } + + @Test + @SneakyThrows + public void convertValueToWireFormatWireFormat() { + MyRecord record = new MyRecord(42, "leet"); + byte[] avroPayload = serializeAvro(record); + + ConsumerRecord consumerRecord = new ConsumerRecord<>("topic", 1, 0, new byte[0], avroPayload); + consumerRecord.headers().add(new RecordHeader("contentType", "application/vnd.mySubject.v1+avro".getBytes())); + byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(consumerRecord, schemaRegistryClient); + + KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); + GenericData.Record deserializedRecord = (GenericData.Record) kafkaAvroDeserializer.deserialize(null, convertedValue); + assertEquals(record.getAnInt(), deserializedRecord.get(0)); + assertEquals(record.getAString(), deserializedRecord.get(1).toString()); + } + + @SneakyThrows + private byte[] serializeAvro(MyRecord record) { + Schema schema = AvroSchemaUtils.getSchema(record, true); + DatumWriter writer = new ReflectDatumWriter<>(schema); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null); + writer.write(record, encoder); + encoder.flush(); + return stream.toByteArray(); + } +}