Skip to content

Commit

Permalink
fix: fix kafka writer configure (#3517)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Jan 17, 2025
1 parent f3cb876 commit d5ab84d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 45 deletions.
50 changes: 26 additions & 24 deletions extensions/sinks/kafka/ext/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type sinkConf struct {
}

type kafkaConf struct {
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
WriterConf kafkaWriterConf `json:"writerConf"`
kafkaWriterConf
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
}

type kafkaWriterConf struct {
Expand Down Expand Up @@ -101,7 +101,7 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error {
}
m.tlsConfig = tlsConfig
kc := getDefaultKafkaConf()
if err := cast.MapToStruct(props, kc); err != nil {
if err := kc.configure(props); err != nil {
return err
}
m.kc = kc
Expand All @@ -112,15 +112,6 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error {
return m.buildKafkaWriter()
}

func (m *kafkaSink) ConfigureBatch(batchSize int, lingerInterval time.Duration) {
if batchSize > 0 {
m.kc.WriterConf.BatchSize = batchSize
}
if lingerInterval > 0 {
m.kc.WriterConf.BatchTimeout = lingerInterval
}
}

func (m *kafkaSink) buildKafkaWriter() error {
mechanism, err := m.sc.GetMechanism()
if err != nil {
Expand All @@ -136,15 +127,15 @@ func (m *kafkaSink) buildKafkaWriter() error {
AllowAutoTopicCreation: true,
MaxAttempts: m.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(m.kc.RequiredACKs),
BatchSize: m.kc.WriterConf.BatchSize,
BatchBytes: m.kc.WriterConf.BatchBytes,
BatchTimeout: m.kc.WriterConf.BatchTimeout,
BatchSize: m.kc.BatchSize,
BatchBytes: m.kc.BatchBytes,
BatchTimeout: m.kc.BatchTimeout,
Transport: &kafkago.Transport{
SASL: mechanism,
TLS: m.tlsConfig,
},
}
conf.Log.Infof("kafka writer batchSize:%v, batchTimeout:%v", m.kc.WriterConf.BatchSize, m.kc.WriterConf.BatchTimeout.String())
conf.Log.Infof("kafka writer batchSize:%v, batchTimeout:%v", m.kc.BatchSize, m.kc.BatchTimeout.String())
m.writer = w
return nil
}
Expand Down Expand Up @@ -332,11 +323,22 @@ func getDefaultKafkaConf() *kafkaConf {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
WriterConf: kafkaWriterConf{
BatchSize: 5000,
BatchTimeout: 200 * time.Millisecond,
BatchBytes: 1048576 * 10, // 10MB
},
}
c.kafkaWriterConf = kafkaWriterConf{
BatchSize: 5000,
// send batch ASAP
BatchTimeout: time.Microsecond,
BatchBytes: 1048576 * 10, // 10MB
}
return c
}

func (kc *kafkaConf) configure(props map[string]interface{}) error {
if err := cast.MapToStruct(props, kc); err != nil {
return err
}
if err := cast.MapToStruct(props, &kc.kafkaWriterConf); err != nil {
return err
}
return nil
}
11 changes: 0 additions & 11 deletions internal/topo/node/sink_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package node
import (
"fmt"
"sync"
"time"

"github.com/lf-edge/ekuiper/internal/binder/io"
"github.com/lf-edge/ekuiper/internal/conf"
Expand Down Expand Up @@ -555,11 +554,6 @@ func resendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{},
}
}

type batchConf struct {
BatchSize int `json:"batchSize"`
LingerInterval time.Duration `json:"lingerInterval"`
}

func getSink(name string, action map[string]interface{}) (api.Sink, error) {
var (
s api.Sink
Expand All @@ -572,11 +566,6 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
if err != nil {
return nil, err
}
if bas, ok := s.(api.BatchAbleSink); ok {
bc := batchConf{}
cast.MapToStruct(newAction, &bc)
bas.ConfigureBatch(bc.BatchSize, bc.LingerInterval)
}
return s, nil
} else {
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions internal/topo/planner/sink_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func buildActions(tp *topo.Topo, rule *api.Rule, inputs []api.Emitter) error {
}
// Split sink node
sinkName := fmt.Sprintf("%s_%d", name, i)
newInputs, err := splitSink(s, tp, inputs, sinkName, rule.Options, commonConf)
newInputs, err := splitSink(tp, inputs, sinkName, rule.Options, commonConf)
if err != nil {
return err
}
Expand All @@ -65,10 +65,7 @@ func fulfillProps(rule *api.Rule, props map[string]any) map[string]any {
}

// Split sink node according to the sink configuration. Return the new input emitters.
func splitSink(sink api.Sink, tp *topo.Topo, inputs []api.Emitter, sinkName string, options *api.RuleOption, sc *node.SinkConf) ([]api.Emitter, error) {
if _, ok := sink.(api.BatchAbleSink); ok {
return inputs, nil
}
func splitSink(tp *topo.Topo, inputs []api.Emitter, sinkName string, options *api.RuleOption, sc *node.SinkConf) ([]api.Emitter, error) {
index := 0
newInputs := inputs
// Batch enabled
Expand Down
5 changes: 0 additions & 5 deletions pkg/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ type LookupSource interface {
Closable
}

type BatchAbleSink interface {
Sink
ConfigureBatch(batchSize int, lingerDuration time.Duration)
}

type Sink interface {
// Open Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext) error
Expand Down

0 comments on commit d5ab84d

Please sign in to comment.