forked from tchiotludo/akhq
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(topicdata) support JSONSchema while producing message (tchiotlud…
- Loading branch information
Showing
18 changed files
with
406 additions
and
82 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,15 @@ | ||
package org.akhq.configs; | ||
|
||
import lombok.Getter; | ||
|
||
@Getter | ||
public enum SchemaRegistryType { | ||
CONFLUENT, | ||
TIBCO | ||
CONFLUENT((byte) 0x0), | ||
TIBCO((byte) 0x80); | ||
|
||
private byte magicByte; | ||
|
||
SchemaRegistryType(byte magicByte) { | ||
this.magicByte = magicByte; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
70 changes: 70 additions & 0 deletions
70
src/main/java/org/akhq/modules/schemaregistry/JsonSchemaSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package org.akhq.modules.schemaregistry; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import io.confluent.kafka.schemaregistry.ParsedSchema; | ||
import io.confluent.kafka.schemaregistry.json.JsonSchema; | ||
import io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.akhq.configs.SchemaRegistryType; | ||
import org.everit.json.schema.ValidationException; | ||
import org.json.JSONObject; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Objects; | ||
|
||
@Slf4j | ||
public class JsonSchemaSerializer extends AbstractKafkaJsonSchemaSerializer<String> implements SchemaSerializer { | ||
private final int schemaId; | ||
private final JsonSchema jsonSchema; | ||
private final SchemaRegistryType schemaRegistryType; | ||
|
||
public static JsonSchemaSerializer newInstance(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) { | ||
if (supports(parsedSchema)) { | ||
return new JsonSchemaSerializer(schemaId, (JsonSchema) parsedSchema, schemaRegistryType); | ||
} | ||
String errorMsg = String.format("Schema %s has not supported schema type expected %s but found %s", parsedSchema.name(), JsonSchema.TYPE, parsedSchema.schemaType()); | ||
throw new IllegalArgumentException(errorMsg); | ||
} | ||
|
||
@Override | ||
public byte[] serialize(String json) { | ||
try { | ||
JSONObject jsonObject = new JSONObject(json); | ||
jsonSchema.validate(jsonObject); | ||
} catch (JsonProcessingException e) { | ||
String errorMsg = String.format("Provided json [%s] is not valid according to schema", json); | ||
log.error(errorMsg); | ||
throw new RuntimeException(errorMsg, e); | ||
} catch (ValidationException e) { | ||
String validationErrorMsg = String.format( | ||
"Provided json message is not valid according to jsonSchema (id=%d): %s", | ||
schemaId, | ||
e.getMessage() | ||
); | ||
throw new IllegalArgumentException(validationErrorMsg); | ||
} | ||
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { | ||
out.write(schemaRegistryType.getMagicByte()); | ||
out.write(ByteBuffer.allocate(idSize).putInt(schemaId).array()); | ||
out.write(json.getBytes(StandardCharsets.UTF_8)); | ||
byte[] bytes = out.toByteArray(); | ||
out.close(); | ||
return bytes; | ||
} catch (IOException e) { | ||
throw new RuntimeException(String.format("Could not serialize json [%s]", json), e); | ||
} | ||
} | ||
|
||
public static boolean supports(ParsedSchema parsedSchema) { | ||
return Objects.equals(JsonSchema.TYPE, parsedSchema.schemaType()); | ||
} | ||
|
||
private JsonSchemaSerializer(int schemaId, JsonSchema jsonSchema, SchemaRegistryType schemaRegistryType) { | ||
this.schemaId = schemaId; | ||
this.jsonSchema = jsonSchema; | ||
this.schemaRegistryType = schemaRegistryType; | ||
} | ||
} |
57 changes: 57 additions & 0 deletions
57
src/main/java/org/akhq/modules/schemaregistry/RecordWithSchemaSerializerFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package org.akhq.modules.schemaregistry; | ||
|
||
import io.confluent.kafka.schemaregistry.ParsedSchema; | ||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; | ||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.akhq.configs.Connection; | ||
import org.akhq.configs.SchemaRegistryType; | ||
import org.akhq.modules.KafkaModule; | ||
|
||
import javax.inject.Singleton; | ||
import java.io.IOException; | ||
|
||
@Singleton | ||
@RequiredArgsConstructor | ||
@Slf4j | ||
public class RecordWithSchemaSerializerFactory { | ||
private final KafkaModule kafkaModule; | ||
|
||
public SchemaSerializer createSerializer(String clusterId, int schemaId) { | ||
ParsedSchema parsedSchema = retrieveSchema(clusterId, schemaId); | ||
SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId); | ||
return createSerializer(schemaId, parsedSchema, schemaRegistryType); | ||
} | ||
|
||
public SchemaSerializer createSerializer(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) { | ||
if (JsonSchemaSerializer.supports(parsedSchema)) { | ||
return JsonSchemaSerializer.newInstance(schemaId, parsedSchema, schemaRegistryType); | ||
} if (AvroSerializer.supports(parsedSchema)) { | ||
return AvroSerializer.newInstance(schemaId, parsedSchema, schemaRegistryType); | ||
} else { | ||
String errorMsg = String.format("Schema with id %d has unsupported schema type %s", schemaId, parsedSchema.schemaType()); | ||
throw new IllegalStateException(errorMsg); | ||
} | ||
} | ||
|
||
private ParsedSchema retrieveSchema(String clusterId, int schemaId) { | ||
SchemaRegistryClient registryClient = kafkaModule.getRegistryClient(clusterId); | ||
try { | ||
return registryClient.getSchemaById(schemaId); | ||
} catch (IOException|RestClientException e) { | ||
String errorMsg = String.format("Can't retrieve schema %d in registry", schemaId); | ||
log.error(errorMsg, e); | ||
throw new RuntimeException(errorMsg, e); | ||
} | ||
} | ||
|
||
private SchemaRegistryType getSchemaRegistryType(String clusterId) { | ||
SchemaRegistryType schemaRegistryType = SchemaRegistryType.CONFLUENT; | ||
Connection.SchemaRegistry schemaRegistry = this.kafkaModule.getConnection(clusterId).getSchemaRegistry(); | ||
if (schemaRegistry != null) { | ||
schemaRegistryType = schemaRegistry.getType(); | ||
} | ||
return schemaRegistryType; | ||
} | ||
} |
7 changes: 7 additions & 0 deletions
7
src/main/java/org/akhq/modules/schemaregistry/SchemaSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package org.akhq.modules.schemaregistry; | ||
|
||
public interface SchemaSerializer { | ||
|
||
byte[] serialize(String value); | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.