Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encoder interface #194

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
58 changes: 17 additions & 41 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (

"github.com/Shopify/sarama"
"github.com/birdayz/kaf/pkg/avro"
"github.com/birdayz/kaf/pkg/proto"
"github.com/golang/protobuf/jsonpb"
"github.com/birdayz/kaf/pkg/codec"
prettyjson "github.com/hokaccha/go-prettyjson"
"github.com/spf13/cobra"
"github.com/vmihailenco/msgpack/v5"
Expand All @@ -37,7 +36,11 @@ var (

limitMessagesFlag int64

reg *proto.DescriptorRegistry
reg *codec.DescriptorRegistry

protoDecoder *codec.ProtoCodec
protoKeyDecoder *codec.ProtoCodec
avroDecoder *codec.AvroCodec
)

func init() {
Expand Down Expand Up @@ -95,6 +98,11 @@ var consumeCmd = &cobra.Command{
topic := args[0]
client := getClientFromConfig(cfg)

schemaCache = getSchemaCache()
avroDecoder = codec.NewAvroCodec(-1, false, schemaCache)
protoDecoder = codec.NewProtoCodec(protoType, reg)
protoKeyDecoder = codec.NewProtoCodec(keyProtoType, reg)

switch offsetFlag {
case "oldest":
offset = sarama.OffsetOldest
Expand Down Expand Up @@ -169,8 +177,6 @@ func withoutConsumerGroup(ctx context.Context, client sarama.Client, topic strin
partitions = flagPartitions
}

schemaCache = getSchemaCache()

wg := sync.WaitGroup{}
mu := sync.Mutex{} // Synchronizes stderr and stdout.
for _, partition := range partitions {
Expand Down Expand Up @@ -230,24 +236,24 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
var err error

if protoType != "" {
dataToDisplay, err = protoDecode(reg, msg.Value, protoType)
dataToDisplay, err = protoDecoder.Decode(msg.Value)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err)
fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary output. Error: %v\n", err)
}
} else {
dataToDisplay, err = avroDecode(msg.Value)
dataToDisplay, err = avroDecoder.Decode(msg.Value)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
}

if keyProtoType != "" {
keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType)
keyToDisplay, err = protoKeyDecoder.Decode(msg.Key)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err)
fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary output. Error: %v\n", err)
}
} else {
keyToDisplay, err = avroDecode(msg.Key)
keyToDisplay, err = avroDecoder.Decode(msg.Key)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
Expand Down Expand Up @@ -314,36 +320,6 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {

}

// proto to JSON
func protoDecode(reg *proto.DescriptorRegistry, b []byte, _type string) ([]byte, error) {
dynamicMessage := reg.MessageForType(_type)
if dynamicMessage == nil {
return b, nil
}

err := dynamicMessage.Unmarshal(b)
if err != nil {
return nil, err
}

var m jsonpb.Marshaler
var w bytes.Buffer

err = m.Marshal(&w, dynamicMessage)
if err != nil {
return nil, err
}
return w.Bytes(), nil

}

func avroDecode(b []byte) ([]byte, error) {
if schemaCache != nil {
return schemaCache.DecodeMessage(b)
}
return b, nil
}

func formatKey(key []byte) []byte {
if b, err := keyfmt.Format(key); err == nil {
return b
Expand Down
4 changes: 2 additions & 2 deletions cmd/kaf/kaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/spf13/cobra"

"github.com/birdayz/kaf/pkg/avro"
"github.com/birdayz/kaf/pkg/codec"
"github.com/birdayz/kaf/pkg/config"
"github.com/birdayz/kaf/pkg/proto"
)

var cfgFile string
Expand Down Expand Up @@ -174,7 +174,7 @@ func init() {

var setupProtoDescriptorRegistry = func(cmd *cobra.Command, args []string) {
if protoType != "" {
r, err := proto.NewDescriptorRegistry(protoFiles, protoExclude)
r, err := codec.NewDescriptorRegistry(protoFiles, protoExclude)
if err != nil {
errorExit("Failed to load protobuf files: %v\n", err)
}
Expand Down
82 changes: 34 additions & 48 deletions cmd/kaf/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

"github.com/Masterminds/sprig"
"github.com/Shopify/sarama"
"github.com/birdayz/kaf/pkg/codec"
"github.com/birdayz/kaf/pkg/partitioner"
pb "github.com/golang/protobuf/proto"
"github.com/spf13/cobra"
)

Expand All @@ -32,6 +32,7 @@ var (
inputModeFlag string
avroSchemaID int
avroKeySchemaID int
avroStrictFlag bool
templateFlag bool
)

Expand All @@ -54,6 +55,7 @@ func init() {

produceCmd.Flags().IntVarP(&avroSchemaID, "avro-schema-id", "", -1, "Value schema id for avro messsage encoding")
produceCmd.Flags().IntVarP(&avroKeySchemaID, "avro-key-schema-id", "", -1, "Key schema id for avro messsage encoding")
produceCmd.Flags().BoolVar(&avroStrictFlag, "avro-strict", false, "Uses strict version of the input json to parse unions")

produceCmd.Flags().StringVarP(&inputModeFlag, "input-mode", "", "line", "Scanning input mode: [line|full]")
produceCmd.Flags().IntVarP(&bufferSizeFlag, "line-length-limit", "", 0, "line length limit in line input mode")
Expand Down Expand Up @@ -87,6 +89,26 @@ func readFull(reader io.Reader, out chan []byte) {
close(out)
}

func valueEncoder() codec.Encoder {
if protoType != "" {
return codec.NewProtoCodec(protoType, reg)
} else if avroSchemaID != -1 {
return codec.NewAvroCodec(avroSchemaID, avroStrictFlag, schemaCache)
} else {
return &codec.BypassCodec{}
}
}

func keyEncoder() codec.Encoder {
if keyProtoType != "" {
Copy link
Owner

@birdayz birdayz Apr 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be the time for a new flag edit: (it's actually not a breaking change)

We could add:
--key-codec auto|proto|avro-json|avro

auto would be default, and behave as-is. it would try its best to decide what do do: if proto key/value flags are given, it will try proto. if avro flags are given, it will try avro-json. but it can be set to specifically pick avro (see #195)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one, for proto I think it would be a little redundant because you have to specify --key-proto-type anyways. I guess we don't have the same problem we have with avro here.

Maybe a flag specific for avro would be enough. It also doesn't need to be specific to keys, It can apply for both keys and values at the same time. Something like:

value | kaf produce --avro-key-schema-id x -- avro-schema-id y --key 'key' --avro-input [json|avro-json]

What do you think?

There's a separate issue with avro and proto though. The current behaviour is a little misleading imo. If you specify both proto and avro, proto takes precedence. Maybe we should have some validation and throw an error in this case.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok for me. do you know if "avro-json" is the right term? is there any "better" term in the avro world for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, I'll check their docs see if I find anything. Another option would be just a boolean flag like --avro-strict then the docs can say something regarding being strict about unions.

A wiki page with examples of what we've been talking would be nice too.

return codec.NewProtoCodec(keyProtoType, reg)
} else if avroKeySchemaID != -1 {
return codec.NewAvroCodec(avroKeySchemaID, avroStrictFlag, schemaCache)
} else {
return &codec.BypassCodec{}
}
}

var produceCmd = &cobra.Command{
Use: "produce TOPIC",
Short: "Produce record. Reads data from stdin.",
Expand Down Expand Up @@ -128,6 +150,9 @@ var produceCmd = &cobra.Command{
go readLines(inReader, out)
}

valueEncoder := valueEncoder()
keyEncoder := keyEncoder()

var key sarama.Encoder
if rawKeyFlag {
keyBytes, err := base64.RawStdEncoding.DecodeString(keyFlag)
Expand All @@ -136,31 +161,11 @@ var produceCmd = &cobra.Command{
}
key = sarama.ByteEncoder(keyBytes)
} else {
key = sarama.StringEncoder(keyFlag)
}
if keyProtoType != "" {
if dynamicMessage := reg.MessageForType(keyProtoType); dynamicMessage != nil {
err = dynamicMessage.UnmarshalJSON([]byte(keyFlag))
if err != nil {
errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err)
}

pb, err := pb.Marshal(dynamicMessage)
if err != nil {
errorExit("Failed to marshal proto: %v", err)
}

key = sarama.ByteEncoder(pb)
} else {
errorExit("Failed to load key proto type")
}

} else if avroKeySchemaID != -1 {
avroKey, err := schemaCache.EncodeMessage(avroKeySchemaID, []byte(keyFlag))
encodedKey, err := keyEncoder.Encode([]byte(keyFlag))
if err != nil {
errorExit("Failed to encode avro key", err)
errorExit("Error encoding key: %v", err)
}
key = sarama.ByteEncoder(avroKey)
key = sarama.ByteEncoder(encodedKey)
}

var headers []sarama.RecordHeader
Expand All @@ -175,30 +180,6 @@ var produceCmd = &cobra.Command{
}

for data := range out {
if protoType != "" {
if dynamicMessage := reg.MessageForType(protoType); dynamicMessage != nil {
err = dynamicMessage.UnmarshalJSON(data)
if err != nil {
errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err)
}

pb, err := pb.Marshal(dynamicMessage)
if err != nil {
errorExit("Failed to marshal proto: %v", err)
}

data = pb
} else {
errorExit("Failed to load payload proto type")
}
} else if avroSchemaID != -1 {
avro, err := schemaCache.EncodeMessage(avroSchemaID, data)
if err != nil {
errorExit("Failed to encode avro value", err)
}
data = avro
}

var ts time.Time
t, err := time.Parse(time.RFC3339, timestampFlag)
if err != nil {
Expand Down Expand Up @@ -230,6 +211,11 @@ var produceCmd = &cobra.Command{
input = buf.Bytes()
}

input, err = valueEncoder.Encode(input)
if err != nil {
errorExit("Error encoding value: %v", err)
}

msg := &sarama.ProducerMessage{
Topic: args[0],
Key: key,
Expand Down
7 changes: 5 additions & 2 deletions cmd/kaf/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/Shopify/sarama"
"github.com/birdayz/kaf/pkg/codec"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -46,6 +47,8 @@ var queryCmd = &cobra.Command{
}

schemaCache = getSchemaCache()
protoDecoder := codec.NewProtoCodec(protoType, reg)
protoKeyDecoder := codec.NewProtoCodec(keyProtoType, reg)

wg := sync.WaitGroup{}

Expand All @@ -72,7 +75,7 @@ var queryCmd = &cobra.Command{
var keyTextRaw string
var valueTextRaw string
if protoType != "" {
d, err := protoDecode(reg, msg.Value, protoType)
d, err := protoDecoder.Decode(msg.Value)
if err != nil {
fmt.Println("Failed proto decode")
}
Expand All @@ -82,7 +85,7 @@ var queryCmd = &cobra.Command{
}

if keyProtoType != "" {
d, err := protoDecode(reg, msg.Key, keyProtoType)
d, err := protoKeyDecoder.Decode(msg.Key)
if err != nil {
fmt.Println("Failed proto decode")
}
Expand Down
66 changes: 6 additions & 60 deletions pkg/avro/schema.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package avro

import (
"encoding/binary"
"sync"

schemaregistry "github.com/Landoop/schema-registry"
Expand Down Expand Up @@ -38,7 +37,7 @@ func NewSchemaCache(url string) (*SchemaCache, error) {
}

// getCodecForSchemaID returns a goavro codec for transforming data.
func (c *SchemaCache) getCodecForSchemaID(schemaID int) (codec *goavro.Codec, err error) {
func (c *SchemaCache) GetCodecForSchemaID(schemaID int, strict bool) (codec *goavro.Codec, err error) {
c.mu.RLock()
cc, ok := c.codecsBySchemaID[schemaID]
c.mu.RUnlock()
Expand Down Expand Up @@ -74,67 +73,14 @@ func (c *SchemaCache) getCodecForSchemaID(schemaID int) (codec *goavro.Codec, er
return nil, err
}

codec, err = goavro.NewCodec(schema)
if err != nil {
return nil, err
}

return codec, nil
}

// DecodeMessage returns a text representation of an Avro-encoded message.
func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) {
// Ensure avro header is present with the magic start-byte.
if len(b) < 5 || b[0] != 0x00 {
// The message does not contain Avro-encoded data
return b, nil
if strict {
codec, err = goavro.NewCodec(schema)
} else {
codec, err = goavro.NewCodecForStandardJSON(schema)
}

// Schema ID is stored in the 4 bytes following the magic byte.
schemaID := binary.BigEndian.Uint32(b[1:5])
codec, err := c.getCodecForSchemaID(int(schemaID))
if err != nil {
return b, err
}

// Convert binary Avro data back to native Go form
native, _, err := codec.NativeFromBinary(b[5:])
if err != nil {
return b, err
}

// Convert native Go form to textual Avro data
message, err = codec.TextualFromNative(nil, native)
if err != nil {
return b, err
}

return message, nil
}

// EncodeMessage returns a binary representation of an Avro-encoded message.
func (c *SchemaCache) EncodeMessage(schemaID int, json []byte) (message []byte, err error) {
codec, err := c.getCodecForSchemaID(schemaID)
if err != nil {
return nil, err
}

// Creates a header with an initial zero byte and
// the schema id encoded as a big endian uint32
buf := make([]byte, 5)
binary.BigEndian.PutUint32(buf[1:5], uint32(schemaID))

// Convert textual json data to native Go form
native, _, err := codec.NativeFromTextual(json)
if err != nil {
return nil, err
}

// Convert native Go form to binary Avro data
message, err = codec.BinaryFromNative(buf, native)
if err != nil {
return nil, err
}

return message, nil
return codec, nil
}
Loading