Skip to content

Commit

Permalink
Merge branch 'upstream_master'
Browse files Browse the repository at this point in the history
# Conflicts:
#	go.mod
  • Loading branch information
Achillesxu committed Dec 29, 2021
2 parents 1b25575 + f3f225f commit 3448e8f
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 114 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/checkout@v2

- name: Build
run: go build -v ./cmd/kaf
run: go build -v -ldflags "-s -w -X main.version=$GITHUB_REF -X main.commit=${GITHUB_SHA::8}" ./cmd/kaf

- name: Test
run: go test -v ./...
1 change: 1 addition & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ builds:
main: ./cmd/kaf
goarch:
- amd64
- arm64
goos:
- linux
- darwin
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Kafka CLI inspired by kubectl & docker
Install from source:

```
go get -u github.com/birdayz/kaf/cmd/kaf
go install github.com/birdayz/kaf/cmd/kaf@latest
```

Install binary:
Expand All @@ -28,6 +28,10 @@ yay -S kaf

## Usage

Show the tool version

`kaf --version`

Add a local Kafka with no auth

`kaf config add-cluster local -b localhost:9092`
Expand Down
107 changes: 49 additions & 58 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"sync"
"text/tabwriter"
"time"

"strconv"

Expand All @@ -27,6 +26,7 @@ var (
groupCommitFlag bool
raw bool
follow bool
tail int32
schemaCache *avro.SchemaCache
keyfmt *prettyjson.Formatter

Expand All @@ -42,9 +42,10 @@ var (

func init() {
rootCmd.AddCommand(consumeCmd)
consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest.")
consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest, or integer.")
consumeCmd.Flags().BoolVar(&raw, "raw", false, "Print raw output of messages, without key or prettified JSON")
consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Shorthand to start consuming with offset HEAD-1 on each partition. Overrides --offset flag")
consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Continue to consume messages until program execution is interrupted/terminated")
consumeCmd.Flags().Int32VarP(&tail, "tail", "n", 0, "Print last n messages per partition")
consumeCmd.Flags().StringSliceVar(&protoFiles, "proto-include", []string{}, "Path to proto files")
consumeCmd.Flags().StringSliceVar(&protoExclude, "proto-exclude", []string{}, "Proto exclusions (path prefixes)")
consumeCmd.Flags().BoolVar(&decodeMsgPack, "decode-msgpack", false, "Enable deserializing msgpack")
Expand All @@ -60,30 +61,27 @@ func init() {
keyfmt.Indent = 0
}

func getAvailableOffsetsRetry(
ldr *sarama.Broker, req *sarama.OffsetRequest, d time.Duration,
) (*sarama.OffsetResponse, error) {
var (
err error
offsets *sarama.OffsetResponse
)

for {
select {
case <-time.After(d):
return nil, err
default:
offsets, err = ldr.GetAvailableOffsets(req)
if err == nil {
return offsets, err
}
}
}
type offsets struct {
newest int64
oldest int64
}

const (
offsetsRetry = 500 * time.Millisecond
)
func getOffsets(client sarama.Client, topic string, partition int32) (*offsets, error) {
newest, err := client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return nil, err
}

oldest, err := client.GetOffset(topic, partition, sarama.OffsetOldest)
if err != nil {
return nil, err
}

return &offsets{
newest: newest,
oldest: oldest,
}, nil
}

var consumeCmd = &cobra.Command{
Use: "consume TOPIC",
Expand All @@ -93,31 +91,23 @@ var consumeCmd = &cobra.Command{
PreRun: setupProtoDescriptorRegistry,
Run: func(cmd *cobra.Command, args []string) {
var offset int64
switch offsetFlag {
case "oldest":
offset = sarama.OffsetOldest
case "newest":
offset = sarama.OffsetNewest
default:
// TODO: normally we would parse this to int64 but it's
// difficult as we can have multiple partitions. need to
// find a way to give offsets from CLI with a good
// syntax.
offset = sarama.OffsetNewest
}
cfg := getConfig()
cfg.Consumer.Offsets.Initial = offset
topic := args[0]
client := getClientFromConfig(cfg)

// Switch offset to number provided by user, if it's a number
switch offsetFlag {
case "oldest":
offset = sarama.OffsetOldest
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
case "newest":
offset = sarama.OffsetNewest
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
if o, err := strconv.Atoi(offsetFlag); err == nil {
offset = int64(o)
o, err := strconv.ParseInt(offsetFlag, 10, 64)
if err != nil {
errorExit("Could not parse '%s' to int64: %w", offsetFlag, err)
}
offset = o
}

if groupFlag != "" {
Expand Down Expand Up @@ -184,44 +174,45 @@ func withoutConsumerGroup(ctx context.Context, client sarama.Client, topic strin
wg := sync.WaitGroup{}
mu := sync.Mutex{} // Synchronizes stderr and stdout.
for _, partition := range partitions {

wg.Add(1)

go func(partition int32, offset int64) {
req := &sarama.OffsetRequest{
Version: int16(1),
}
req.AddBlock(topic, partition, int64(-1), int32(0))
ldr, err := client.Leader(topic, partition)
defer wg.Done()

offsets, err := getOffsets(client, topic, partition)
if err != nil {
errorExit("Unable to get leader: %v\n", err)
errorExit("Failed to get %s offsets for partition %d: %w", topic, partition, err)
}

offsets, err := getAvailableOffsetsRetry(ldr, req, offsetsRetry)
if err != nil {
errorExit("Unable to get available offsets: %v\n", err)
if tail != 0 {
offset = offsets.newest - int64(tail)
if offset < offsets.oldest {
offset = offsets.oldest
}
}
followOffset := offsets.GetBlock(topic, partition).Offset - 1

if follow && followOffset > 0 {
offset = followOffset
fmt.Fprintf(errWriter, "Starting on partition %v with offset %v\n", partition, offset)
// Already at end of partition, return early
if !follow && offsets.newest == offsets.oldest {
return
}

pc, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
errorExit("Unable to consume partition: %v %v %v %v\n", topic, partition, offset, err)
}

defer wg.Done()

var count int64 = 0
for {
select {
case <-ctx.Done():
return
case msg := <-pc.Messages():
handleMessage(msg, &mu)
if limitMessagesFlag > 0 && msg.Offset >= offset+limitMessagesFlag {
count++
if limitMessagesFlag > 0 && count >= limitMessagesFlag {
return
}
if !follow && msg.Offset+1 >= pc.HighWaterMarkOffset() {
return
}
}
Expand Down
10 changes: 8 additions & 2 deletions cmd/kaf/kaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,15 @@ var (
colorableOut io.Writer = colorable.NewColorableStdout()
)

// Will be replaced by GitHub action and by goreleaser
// see https://goreleaser.com/customization/build/
var commit string = "HEAD"
var version string = "latest"

var rootCmd = &cobra.Command{
Use: "kaf",
Short: "Kafka Command Line utility for cluster management",
Use: "kaf",
Short: "Kafka Command Line utility for cluster management",
Version: fmt.Sprintf("%s (%s)", version, commit),
PersistentPreRun: func(cmd *cobra.Command, args []string) {
outWriter = cmd.OutOrStdout()
errWriter = cmd.ErrOrStderr()
Expand Down
84 changes: 71 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,37 +1,95 @@
module github.com/birdayz/kaf

go 1.16
go 1.17

require (
github.com/Landoop/schema-registry v0.0.0-20190327143759-50a5701c1891
github.com/Shopify/sarama v1.27.2
github.com/avast/retry-go v2.4.1+incompatible // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect
github.com/burdiyan/kafkautil v0.0.0-20190131162249-eaf83ed22d5b
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/go-zookeeper/zk v1.0.2
github.com/golang/protobuf v1.4.2
github.com/golang/protobuf v1.5.2
github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e
github.com/jhump/protoreflect v1.9.0
github.com/linkedin/goavro v2.1.0+incompatible
github.com/lovoo/goka v0.1.4 // indirect
github.com/magiconair/properties v1.8.1
github.com/manifoldco/promptui v0.3.2
github.com/mattn/go-colorable v0.1.2
github.com/mitchellh/go-homedir v1.1.0
github.com/nicksnyder/go-i18n v1.10.1 // indirect
github.com/orlangure/gnomock v0.9.4
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect
github.com/spf13/cobra v1.1.2-0.20201229145732-a4ab3fa09e3d
github.com/stretchr/testify v1.6.1
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.1
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
gopkg.in/yaml.v2 v2.2.8
)

require (
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/alecthomas/gometalinter v2.0.11+incompatible // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/avast/retry-go v2.4.1+incompatible // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect
github.com/client9/misspell v0.3.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v1.13.1 // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/fatih/color v1.7.0 // indirect
github.com/golang/lint v0.0.0-20181026193005-c67002cb31c3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/shlex v0.0.0-20181106134648-c34317bd91bf // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a // indirect
github.com/klauspost/compress v1.11.0 // indirect
github.com/lovoo/goka v0.1.4 // indirect
github.com/lunixbochs/vtclean v0.0.0-20180621232353-2d01aacdc34a // indirect
github.com/mattn/go-isatty v0.0.8 // indirect
github.com/nicksnyder/go-i18n v1.10.1 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/pelletier/go-toml v1.4.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect
github.com/segmentio/kafka-go v0.4.2 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tsenart/deadcode v0.0.0-20160724212837-210d2dc333e9 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/xdg/stringprep v1.0.0 // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
go.uber.org/zap v1.15.0 // indirect
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/net v0.0.0-20200904194848-62affa334b73 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 // indirect
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6 // indirect
google.golang.org/appengine v1.6.1 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c // indirect
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.5.0 // indirect
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
)
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand All @@ -162,8 +164,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand Down Expand Up @@ -567,8 +570,10 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12 h1:OwhZOOMuf7leLaSCuxtQ9FW7ui2L2L6UKOtKAUqovUQ=
google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c h1:vTxShRUnK60yd8DZU+f95p1zSLj814+5CuEh7NjF2/Y=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
Expand Down
Loading

0 comments on commit 3448e8f

Please sign in to comment.