Skip to content

Commit

Permalink
Support schema reference in record header for avro deserialization
Browse files Browse the repository at this point in the history
supports producers like Spring Cloud Stream

close tchiotludo#231
relate to tchiotludo#234
  • Loading branch information
tine2k authored Mar 8, 2020
1 parent 8471123 commit 75f4763
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/kafkahq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public class Record {
private final Map<String, String> headers = new HashMap<>();
private final KafkaAvroDeserializer kafkaAvroDeserializer;

public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer) {
public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer, byte[] value) {
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
this.timestamp = record.timestamp();
this.timestampType = record.timestampType();
this.key = record.key();
this.keySchemaId = getAvroSchemaId(this.key);
this.value = record.value();
this.value = value;
this.valueSchemaId = getAvroSchemaId(this.value);
for (Header header: record.headers()) {
this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.kafkahq.repositories;

import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

import javax.inject.Singleton;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Converts an avro payload to the kafka avro wire format (https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format)
* Some producers (like Spring Cloud Stream) do write this wire format, but use the raw avro binary encoding (without magic byte and schema id)
* and put the reference to the schema in a header field. This converter will add the magic byte and schema id to the byte[] to
* be wire format compatible if the following conditions are met:
* - magic byte is not already present
* - schema reference (subject and version) can be found in the message header
* - schema can be fetch from the registry
*/
@Singleton
@Slf4j
public class AvroWireFormatConverter {

private static final byte MAGIC_BYTE = 0;
private static final Pattern AVRO_CONTENT_TYPE_PATTERN = Pattern.compile("\"?application/vnd\\.(.+)\\.v(\\d+)\\+avro\"?");

public byte[] convertValueToWireFormat(ConsumerRecord<byte[], byte[]> record, SchemaRegistryClient registryClient) {
Iterator<Header> contentTypeIter = record.headers().headers("contentType").iterator();
byte[] value = record.value();
if (contentTypeIter.hasNext() &&
ArrayUtils.isNotEmpty(value) &&
ByteBuffer.wrap(value).get() != MAGIC_BYTE) {
String headerValue = new String(contentTypeIter.next().value());
Matcher matcher = AVRO_CONTENT_TYPE_PATTERN.matcher(headerValue);
if (matcher.matches()) {
String subject = matcher.group(1);
int version = Integer.parseInt(matcher.group(2));
value = prependWireFormatHeader(value, registryClient, subject, version);
}
}
return value;
}

private byte[] prependWireFormatHeader(byte[] value, SchemaRegistryClient registryClient, String subject, int version) {
try {
SchemaMetadata schemaMetadata = registryClient.getSchemaMetadata(subject, version);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(4).putInt(schemaMetadata.getId()).array());
out.write(value);
value = out.toByteArray();
} catch (IOException | RestClientException e) {
// ignore on purpose, dont prepend anything
}
return value;
}
}
53 changes: 25 additions & 28 deletions src/main/java/org/kafkahq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,24 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.formatter.AvroMessageFormatter;
import io.confluent.kafka.formatter.AvroMessageReader;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Wither;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -42,14 +35,14 @@

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -67,6 +60,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private SchemaRegistryRepository schemaRegistryRepository;

@Inject
private AvroWireFormatConverter avroWireFormatConverter;

@Value("${kafkahq.topic-data.poll-timeout:1000}")
protected int pollTimeout;

Expand Down Expand Up @@ -357,7 +353,8 @@ private ConsumerRecords<byte[], byte[]> poll(KafkaConsumer<byte[], byte[]> consu
}

private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions options) {
return new Record(record, this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId));
return new Record(record, this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId)));
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.kafkahq.repositories;

import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AvroSchemaUtils;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Slf4j
public class AvroWireFormatConverterTest {

private AvroWireFormatConverter avroWireFormatConverter;
private SchemaRegistryClient schemaRegistryClient;

@Data
@AllArgsConstructor
private static class MyRecord {
private int anInt;
private String aString;
}

@BeforeEach
@SneakyThrows
public void before() {
avroWireFormatConverter = new AvroWireFormatConverter();
schemaRegistryClient = mock(SchemaRegistryClient.class);

ReflectData reflectData = ReflectData.get();
Schema schema = reflectData.getSchema(MyRecord.class);
int id = 100;
when(schemaRegistryClient.getById(id)).thenReturn(schema);
when(schemaRegistryClient.getSchemaMetadata("mySubject", 1)).thenReturn(new SchemaMetadata(id, 1, ""));
}

@Test
public void convertValueToWireFormatNull() {
byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(new ConsumerRecord<>("topic", 1, 0, new byte[0], null), schemaRegistryClient);
assertNull(convertedValue);
}

@Test
public void convertValueToWireFormatEmptyValue() {
byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(new ConsumerRecord<>("topic", 1, 0, new byte[0], new byte[0]), schemaRegistryClient);
assertEquals(0, convertedValue.length);
}

@Test
@SneakyThrows
public void convertValueToWireFormatWrongContentType() {
MyRecord record = new MyRecord(42, "leet");
byte[] avroPayload = serializeAvro(record);

ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic", 1, 0, new byte[0], avroPayload);
consumerRecord.headers().add(new RecordHeader("contentType", "mySubject.v1".getBytes()));
byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(consumerRecord, schemaRegistryClient);

assertEquals(convertedValue, avroPayload);
}

@Test
@SneakyThrows
public void convertValueToWireFormatWireFormat() {
MyRecord record = new MyRecord(42, "leet");
byte[] avroPayload = serializeAvro(record);

ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic", 1, 0, new byte[0], avroPayload);
consumerRecord.headers().add(new RecordHeader("contentType", "application/vnd.mySubject.v1+avro".getBytes()));
byte[] convertedValue = avroWireFormatConverter.convertValueToWireFormat(consumerRecord, schemaRegistryClient);

KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
GenericData.Record deserializedRecord = (GenericData.Record) kafkaAvroDeserializer.deserialize(null, convertedValue);
assertEquals(record.getAnInt(), deserializedRecord.get(0));
assertEquals(record.getAString(), deserializedRecord.get(1).toString());
}

@SneakyThrows
private byte[] serializeAvro(MyRecord record) {
Schema schema = AvroSchemaUtils.getSchema(record, true);
DatumWriter<MyRecord> writer = new ReflectDatumWriter<>(schema);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
writer.write(record, encoder);
encoder.flush();
return stream.toByteArray();
}
}

0 comments on commit 75f4763

Please sign in to comment.