Skip to content

Commit

Permalink
consume: format key correctly; avoid truncating in case of non-json v…
Browse files Browse the repository at this point in the history
…alue

Fix #113
  • Loading branch information
birdayz committed May 30, 2020
1 parent 6a2f421 commit 1541bc6
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -216,27 +217,32 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err)
}

keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err)
}
} else {
dataToDisplay, err = avroDecode(msg.Value)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
}

if keyProtoType != "" {
keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType)
if err != nil {
fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err)
}
} else {
keyToDisplay, err = avroDecode(msg.Key)
if err != nil {
fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err)
}
}

if !raw {
formatted, err := prettyjson.Format(dataToDisplay)
if err == nil {
dataToDisplay = formatted
if isJSON(dataToDisplay) {
dataToDisplay = formatValue(dataToDisplay)
}

if isJSON(keyToDisplay) {
keyToDisplay = formatKey(keyToDisplay)
}

w := tabwriter.NewWriter(&stderr, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
Expand Down Expand Up @@ -264,7 +270,7 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
}

if msg.Key != nil && len(msg.Key) > 0 {
fmt.Fprintf(w, "Key:\t%v\n", formatKey(keyToDisplay))
fmt.Fprintf(w, "Key:\t%v\n", string(keyToDisplay))
}
fmt.Fprintf(w, "Partition:\t%v\nOffset:\t%v\nTimestamp:\t%v\n", msg.Partition, msg.Offset, msg.Timestamp)
w.Flush()
Expand Down Expand Up @@ -308,10 +314,25 @@ func avroDecode(b []byte) ([]byte, error) {
return b, nil
}

func formatKey(key []byte) string {
b, err := keyfmt.Format(key)
if err != nil {
return string(key)
func formatKey(key []byte) []byte {
if b, err := keyfmt.Format(key); err == nil {
return b
}
return key

}

func formatValue(key []byte) []byte {
if b, err := prettyjson.Format(key); err == nil {
return b
}
return key
}

func isJSON(data []byte) bool {
var i interface{}
if err := json.Unmarshal(data, &i); err == nil {
return true
}
return string(b)
return false
}

0 comments on commit 1541bc6

Please sign in to comment.