From d5ab84def25190441c7d2ea0b356fd63e7cc1c22 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 17 Jan 2025 11:32:54 +0800 Subject: [PATCH] fix: fix kafka writer configure (#3517) Signed-off-by: Song Gao --- extensions/sinks/kafka/ext/kafka.go | 50 ++++++++++++++------------- internal/topo/node/sink_node.go | 11 ------ internal/topo/planner/sink_planner.go | 7 ++-- pkg/api/stream.go | 5 --- 4 files changed, 28 insertions(+), 45 deletions(-) diff --git a/extensions/sinks/kafka/ext/kafka.go b/extensions/sinks/kafka/ext/kafka.go index c27d0ae34d..1723004f76 100644 --- a/extensions/sinks/kafka/ext/kafka.go +++ b/extensions/sinks/kafka/ext/kafka.go @@ -47,11 +47,11 @@ type sinkConf struct { } type kafkaConf struct { - MaxAttempts int `json:"maxAttempts"` - RequiredACKs int `json:"requiredACKs"` - Key string `json:"key"` - Headers interface{} `json:"headers"` - WriterConf kafkaWriterConf `json:"writerConf"` + kafkaWriterConf + MaxAttempts int `json:"maxAttempts"` + RequiredACKs int `json:"requiredACKs"` + Key string `json:"key"` + Headers interface{} `json:"headers"` } type kafkaWriterConf struct { @@ -101,7 +101,7 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error { } m.tlsConfig = tlsConfig kc := getDefaultKafkaConf() - if err := cast.MapToStruct(props, kc); err != nil { + if err := kc.configure(props); err != nil { return err } m.kc = kc @@ -112,15 +112,6 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error { return m.buildKafkaWriter() } -func (m *kafkaSink) ConfigureBatch(batchSize int, lingerInterval time.Duration) { - if batchSize > 0 { - m.kc.WriterConf.BatchSize = batchSize - } - if lingerInterval > 0 { - m.kc.WriterConf.BatchTimeout = lingerInterval - } -} - func (m *kafkaSink) buildKafkaWriter() error { mechanism, err := m.sc.GetMechanism() if err != nil { @@ -136,15 +127,15 @@ func (m *kafkaSink) buildKafkaWriter() error { AllowAutoTopicCreation: true, MaxAttempts: m.kc.MaxAttempts, RequiredAcks: kafkago.RequiredAcks(m.kc.RequiredACKs), - BatchSize: m.kc.WriterConf.BatchSize, - BatchBytes: m.kc.WriterConf.BatchBytes, - BatchTimeout: m.kc.WriterConf.BatchTimeout, + BatchSize: m.kc.BatchSize, + BatchBytes: m.kc.BatchBytes, + BatchTimeout: m.kc.BatchTimeout, Transport: &kafkago.Transport{ SASL: mechanism, TLS: m.tlsConfig, }, } - conf.Log.Infof("kafka writer batchSize:%v, batchTimeout:%v", m.kc.WriterConf.BatchSize, m.kc.WriterConf.BatchTimeout.String()) + conf.Log.Infof("kafka writer batchSize:%v, batchTimeout:%v", m.kc.BatchSize, m.kc.BatchTimeout.String()) m.writer = w return nil } @@ -332,11 +323,22 @@ func getDefaultKafkaConf() *kafkaConf { c := &kafkaConf{ RequiredACKs: -1, MaxAttempts: 1, - WriterConf: kafkaWriterConf{ - BatchSize: 5000, - BatchTimeout: 200 * time.Millisecond, - BatchBytes: 1048576 * 10, // 10MB - }, + } + c.kafkaWriterConf = kafkaWriterConf{ + BatchSize: 5000, + // send batch ASAP + BatchTimeout: time.Microsecond, + BatchBytes: 1048576 * 10, // 10MB } return c } + +func (kc *kafkaConf) configure(props map[string]interface{}) error { + if err := cast.MapToStruct(props, kc); err != nil { + return err + } + if err := cast.MapToStruct(props, &kc.kafkaWriterConf); err != nil { + return err + } + return nil +} diff --git a/internal/topo/node/sink_node.go b/internal/topo/node/sink_node.go index fcb99e00b1..9d18981c15 100644 --- a/internal/topo/node/sink_node.go +++ b/internal/topo/node/sink_node.go @@ -17,7 +17,6 @@ package node import ( "fmt" "sync" - "time" "github.com/lf-edge/ekuiper/internal/binder/io" "github.com/lf-edge/ekuiper/internal/conf" @@ -555,11 +554,6 @@ func resendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{}, } } -type batchConf struct { - BatchSize int `json:"batchSize"` - LingerInterval time.Duration `json:"lingerInterval"` -} - func getSink(name string, action map[string]interface{}) (api.Sink, error) { var ( s api.Sink @@ -572,11 +566,6 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) { if err != nil { return nil, err } - if bas, ok := s.(api.BatchAbleSink); ok { - bc := batchConf{} - cast.MapToStruct(newAction, &bc) - bas.ConfigureBatch(bc.BatchSize, bc.LingerInterval) - } return s, nil } else { if err != nil { diff --git a/internal/topo/planner/sink_planner.go b/internal/topo/planner/sink_planner.go index 9a7ff2ef67..46651b97e5 100644 --- a/internal/topo/planner/sink_planner.go +++ b/internal/topo/planner/sink_planner.go @@ -44,7 +44,7 @@ func buildActions(tp *topo.Topo, rule *api.Rule, inputs []api.Emitter) error { } // Split sink node sinkName := fmt.Sprintf("%s_%d", name, i) - newInputs, err := splitSink(s, tp, inputs, sinkName, rule.Options, commonConf) + newInputs, err := splitSink(tp, inputs, sinkName, rule.Options, commonConf) if err != nil { return err } @@ -65,10 +65,7 @@ func fulfillProps(rule *api.Rule, props map[string]any) map[string]any { } // Split sink node according to the sink configuration. Return the new input emitters. -func splitSink(sink api.Sink, tp *topo.Topo, inputs []api.Emitter, sinkName string, options *api.RuleOption, sc *node.SinkConf) ([]api.Emitter, error) { - if _, ok := sink.(api.BatchAbleSink); ok { - return inputs, nil - } +func splitSink(tp *topo.Topo, inputs []api.Emitter, sinkName string, options *api.RuleOption, sc *node.SinkConf) ([]api.Emitter, error) { index := 0 newInputs := inputs // Batch enabled diff --git a/pkg/api/stream.go b/pkg/api/stream.go index 02c32310f2..1423770c29 100644 --- a/pkg/api/stream.go +++ b/pkg/api/stream.go @@ -136,11 +136,6 @@ type LookupSource interface { Closable } -type BatchAbleSink interface { - Sink - ConfigureBatch(batchSize int, lingerDuration time.Duration) -} - type Sink interface { // Open Should be sync function for normal case. The container will run it in go func Open(ctx StreamContext) error