Skip to content

Commit

Permalink
add -p/--partition flag to produce command
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Jun 28, 2020
1 parent c2e8857 commit 6454620
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions cmd/kaf/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (
"github.com/spf13/cobra"
)

var keyFlag string
var headerFlag []string
var numFlag int
var partitionerFlag string
var timestampFlag string
var (
keyFlag string
headerFlag []string
numFlag int
partitionerFlag string
timestampFlag string
partitionFlag int32
)

func init() {
rootCmd.AddCommand(produceCmd)
Expand All @@ -34,6 +37,7 @@ func init() {
produceCmd.Flags().StringVar(&keyProtoType, "key-proto-type", "", "Fully qualified name of the proto key type. Example: com.test.SampleMessage")
produceCmd.Flags().StringVar(&partitionerFlag, "partitioner", "", "Select partitioner: Default or jvm")
produceCmd.Flags().StringVar(&timestampFlag, "timestamp", "", "Select timestamp for record")
produceCmd.Flags().Int32VarP(&partitionFlag, "partition", "p", -1, "Partition to produce to")

}

Expand All @@ -47,6 +51,11 @@ var produceCmd = &cobra.Command{
if partitionerFlag != "" {
cfg.Producer.Partitioner = kafkautil.NewJVMCompatiblePartitioner
}

if partitionFlag != int32(-1) {
cfg.Producer.Partitioner = sarama.NewManualPartitioner
}

producer, err := sarama.NewSyncProducer(currentCluster.Brokers, cfg)
if err != nil {
errorExit("Unable to create new sync producer: %v\n", err)
Expand Down Expand Up @@ -117,13 +126,17 @@ var produceCmd = &cobra.Command{
}

for i := 0; i < numFlag; i++ {
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
msg := &sarama.ProducerMessage{
Topic: args[0],
Key: key,
Headers: headers,
Timestamp: ts,
Value: sarama.ByteEncoder(data),
})
}
if partitionFlag != -1 {
msg.Partition = partitionFlag
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Printf("Failed to send record: %v.", err)
os.Exit(1)
Expand Down

0 comments on commit 6454620

Please sign in to comment.