Skip to content

Commit

Permalink
merge upstream_master
Browse files Browse the repository at this point in the history
  • Loading branch information
Achillesxu committed Aug 30, 2021
2 parents 9c8f75a + 79af9b5 commit 1b25575
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.13
go-version: ^1.16
id: go

- name: Check out code into the Go module directory
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ List consumer groups

Describe a given consumer group called _dispatcher_

`kafa group describe dispatcher`
`kaf group describe dispatcher`

Write message into given topic from stdin

`echo test | kaf produce mqtt.messages.incoming`

Set offset for consumer group _dispatcher_ consuming from topic _mqtt.messages.incoming_ to latest for all partitions

`kaf group commit dispatcher -t mqtt.messages.incoming --offset latest --all-partitions`

## Configuration
See the [examples](examples) folder

Expand Down
28 changes: 27 additions & 1 deletion cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"strconv"
"sync"
"text/tabwriter"
"time"

"strconv"

"github.com/Shopify/sarama"
"github.com/birdayz/kaf/pkg/avro"
"github.com/birdayz/kaf/pkg/proto"
"github.com/golang/protobuf/jsonpb"
prettyjson "github.com/hokaccha/go-prettyjson"
"github.com/spf13/cobra"
"github.com/vmihailenco/msgpack/v5"
)

var (
Expand Down Expand Up @@ -45,6 +47,7 @@ func init() {
consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Shorthand to start consuming with offset HEAD-1 on each partition. Overrides --offset flag")
consumeCmd.Flags().StringSliceVar(&protoFiles, "proto-include", []string{}, "Path to proto files")
consumeCmd.Flags().StringSliceVar(&protoExclude, "proto-exclude", []string{}, "Proto exclusions (path prefixes)")
consumeCmd.Flags().BoolVar(&decodeMsgPack, "decode-msgpack", false, "Enable deserializing msgpack")
consumeCmd.Flags().StringVar(&protoType, "proto-type", "", "Fully qualified name of the proto message type. Example: com.test.SampleMessage")
consumeCmd.Flags().StringVar(&keyProtoType, "key-proto-type", "", "Fully qualified name of the proto key type. Example: com.test.SampleMessage")
consumeCmd.Flags().Int32SliceVarP(&flagPartitions, "partitions", "p", []int32{}, "Partitions to consume from")
Expand Down Expand Up @@ -107,6 +110,16 @@ var consumeCmd = &cobra.Command{
topic := args[0]
client := getClientFromConfig(cfg)

// Switch offset to number provided by user, if it's a number
switch offsetFlag {
case "oldest":
case "newest":
default:
if o, err := strconv.Atoi(offsetFlag); err == nil {
offset = int64(o)
}
}

if groupFlag != "" {
withConsumerGroup(cmd.Context(), client, topic, groupFlag)
} else {
Expand Down Expand Up @@ -249,6 +262,19 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) {
}
}

if decodeMsgPack {
var obj interface{}
err = msgpack.Unmarshal(msg.Value, &obj)
if err != nil {
fmt.Fprintf(&stderr, "could not decode msgpack data: %v\n", err)
}

dataToDisplay, err = json.Marshal(obj)
if err != nil {
fmt.Fprintf(&stderr, "could not decode msgpack data: %v\n", err)
}
}

if !raw {
if isJSON(dataToDisplay) {
dataToDisplay = formatValue(dataToDisplay)
Expand Down
92 changes: 60 additions & 32 deletions cmd/kaf/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ func createGroupCommitOffsetCmd() *cobra.Command {
var allPartitions bool
var noconfirm bool
res := &cobra.Command{
Use: "commit",
Args: cobra.ExactArgs(1),
Use: "commit",
Short: "Set offset for given consumer group",
Long: "Set offset for a given consumer group, creates one if it does not exist. Offsets cannot be set on a consumer group with active consumers.",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client := getClient()

Expand All @@ -164,47 +166,73 @@ func createGroupCommitOffsetCmd() *cobra.Command {
errorExit("Either --partition or --all-partitions flag must be provided")
}

admin := getClusterAdmin()
groupDescs, err := admin.DescribeConsumerGroups([]string{args[0]})
if err != nil {
errorExit("Unable to describe consumer groups: %v\n", err)
}
for _, detail := range groupDescs {
state := detail.State
if state != "Empty" {
errorExit("Consumer group %s has active consumers in it, cannot set offset\n", group)
}
}

sort.Slice(partitions, func(i int, j int) bool { return partitions[i] < partitions[j] })

partitionOffsets := make(map[int32]int64)
type Assignment struct {
partition int32
offset int64
}
assignments := make(chan Assignment, len(partitions))

// TODO offset must be calced per partition

var wg sync.WaitGroup
for _, partition := range partitions {
i, err := strconv.ParseInt(offset, 10, 64)
if err != nil {
// Try oldest/newest/..
if offset == "oldest" {
i = sarama.OffsetOldest
} else if offset == "newest" || offset == "latest" {
i = sarama.OffsetNewest
} else {
// Try timestamp
t, err := time.Parse(time.RFC3339, offset)
if err != nil {
errorExit("offset is neither offset nor timestamp", nil)
wg.Add(1)
go func(partition int32) {
defer wg.Done()
i, err := strconv.ParseInt(offset, 10, 64)
if err != nil {
// Try oldest/newest/..
if offset == "oldest" {
i = sarama.OffsetOldest
} else if offset == "newest" || offset == "latest" {
i = sarama.OffsetNewest
} else {
// Try timestamp
t, err := time.Parse(time.RFC3339, offset)
if err != nil {
errorExit("offset is neither offset nor timestamp", nil)
}
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}
i = t.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}

o, err := client.GetOffset(topic, partition, i)
if err != nil {
errorExit("Failed to determine offset for timestamp: %v", err)
}
o, err := client.GetOffset(topic, partition, i)
if err != nil {
errorExit("Failed to determine offset for timestamp: %v", err)
}

if o == -1 {
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
continue
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
}
if o == -1 {
fmt.Printf("Partition %v: could not determine offset from timestamp. Skipping.\n", partition)
return
//errorExit("Determined offset -1 from timestamp. Skipping.", o)
}

partitionOffsets[partition] = o
assignments <- Assignment{partition: partition, offset: o}

fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
} else {
partitionOffsets[partition] = i
}
fmt.Printf("Partition %v: determined offset %v from timestamp.\n", partition, o)
} else {
assignments <- Assignment{partition: partition, offset: i}
}
}(partition)
}
wg.Wait()
close(assignments)

partitionOffsets := make(map[int32]int64, len(partitions))
for assign := range assignments {
partitionOffsets[assign.partition] = assign.offset
}

fmt.Printf("Resetting offsets to: %v\n", partitionOffsets)
Expand Down
1 change: 1 addition & 0 deletions cmd/kaf/kaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ var (
schemaRegistryURL string
protoFiles []string
protoExclude []string
decodeMsgPack bool
verbose bool
clusterOverride string
)
Expand Down
6 changes: 4 additions & 2 deletions cmd/kaf/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ func TestMain(m *testing.M) {
}

func testMain(m *testing.M) (code int) {
p := kafka.Preset(kafka.WithTopics("kaf-testing", "gnomock-kafka"))
c, err := gnomock.Start(p)
c, err := gnomock.Start(
kafka.Preset(kafka.WithTopics("kaf-testing", "gnomock-kafka")),
gnomock.WithContainerName("kaf-kafka"),
)
if err != nil {
return 1
}
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/birdayz/kaf

go 1.13
go 1.16

require (
github.com/Landoop/schema-registry v0.0.0-20190327143759-50a5701c1891
Expand All @@ -12,9 +12,9 @@ require (
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/go-zookeeper/zk v1.0.2
github.com/golang/protobuf v1.3.2
github.com/golang/protobuf v1.4.2
github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e
github.com/jhump/protoreflect v1.5.0
github.com/jhump/protoreflect v1.9.0
github.com/linkedin/goavro v2.1.0+incompatible
github.com/lovoo/goka v0.1.4 // indirect
github.com/magiconair/properties v1.8.1
Expand All @@ -27,6 +27,7 @@ require (
github.com/spf13/cobra v1.1.2-0.20201229145732-a4ab3fa09e3d
github.com/stretchr/testify v1.6.1
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.1
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
Expand Down
Loading

0 comments on commit 1b25575

Please sign in to comment.