Skip to content

Commit

Permalink
kaf consume add value-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Achillesxu committed Jan 6, 2022
1 parent 3e756c9 commit 073e926
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"regexp"
"sync"
"text/tabwriter"

Expand Down Expand Up @@ -37,6 +38,10 @@ var (

limitMessagesFlag int64

valueFilterFlag string

valReg *regexp.Regexp

reg *proto.DescriptorRegistry
)

Expand All @@ -55,6 +60,7 @@ func init() {
consumeCmd.Flags().Int64VarP(&limitMessagesFlag, "limit-messages", "l", 0, "Limit messages per partition")
consumeCmd.Flags().StringVarP(&groupFlag, "group", "g", "", "Consumer Group to use for consume")
consumeCmd.Flags().BoolVar(&groupCommitFlag, "commit", false, "Commit Group offset after receiving messages. Works only if consuming as Consumer Group")
consumeCmd.Flags().StringVar(&valueFilterFlag, "value-filter", "", "msg will display if it contains --value-filter regex string, but all msgs will display if the value of value-filter is empty")

keyfmt = prettyjson.NewFormatter()
keyfmt.Newline = " " // Replace newline with space to avoid condensed output.
Expand Down Expand Up @@ -95,6 +101,10 @@ var consumeCmd = &cobra.Command{
topic := args[0]
client := getClientFromConfig(cfg)

if len(valueFilterFlag) > 0 {
valReg = regexp.MustCompile(valueFilterFlag)
}

switch offsetFlag {
case "oldest":
offset = sarama.OffsetOldest
Expand Down Expand Up @@ -306,10 +316,21 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
w.Flush()
}

outputSign := true
if len(valueFilterFlag) > 0 {
isFound := len(valReg.Find(dataToDisplay))
if isFound <= 0 {
outputSign = false
}
}

mu.Lock()
stderr.WriteTo(errWriter)
_, _ = colorableOut.Write(dataToDisplay)
fmt.Fprintln(outWriter)

if outputSign {
_, _ = colorableOut.Write(dataToDisplay)
fmt.Fprintln(outWriter)
}
mu.Unlock()

}
Expand Down

0 comments on commit 073e926

Please sign in to comment.