Skip to content

Commit

Permalink
feat(metrics): revise kafka sink conf/metrics (#3494)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Feb 7, 2025
1 parent 553e7be commit b77de3d
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 123 deletions.
43 changes: 0 additions & 43 deletions extensions/impl/kafka/metrics.go

This file was deleted.

69 changes: 48 additions & 21 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (
"github.com/lf-edge/ekuiper/v2/pkg/cert"
)

const (
LblKafka = "kafka"
LblMsg = "msg"
)

type KafkaSink struct {
writer *kafkago.Writer
kc *kafkaConf
Expand All @@ -40,9 +45,11 @@ type KafkaSink struct {
headerTemplate string
saslConf *saslConf
mechanism sasl.Mechanism
LastStats kafkago.WriterStats
}

type kafkaConf struct {
kafkaWriterConf
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
Expand All @@ -54,6 +61,12 @@ type kafkaConf struct {
Compression string `json:"compression"`
}

type kafkaWriterConf struct {
BatchSize int `json:"batchSize"`
BatchTimeout time.Duration `json:"-"`
BatchBytes int64 `json:"batchBytes"`
}

func (c *kafkaConf) validate() error {
if c.Topic == "" {
return fmt.Errorf("topic can not be empty")
Expand All @@ -65,11 +78,8 @@ func (c *kafkaConf) validate() error {
}

func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
err := cast.MapToStruct(configs, c)
c := getDefaultKafkaConf()
err := c.configure(configs)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), castConfErr)
})
Expand Down Expand Up @@ -149,7 +159,9 @@ func (k *KafkaSink) buildKafkaWriter() error {
AllowAutoTopicCreation: true,
MaxAttempts: k.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs),
BatchSize: 1,
BatchSize: k.kc.BatchSize,
BatchBytes: k.kc.BatchBytes,
BatchTimeout: k.kc.BatchTimeout,
Transport: &kafkago.Transport{
SASL: k.mechanism,
TLS: k.tlsConfig,
Expand All @@ -176,29 +188,21 @@ func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler)

func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Inc()
}()
msgs, err := k.collect(ctx, item)
if err != nil {
return err
}
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs)))
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
metrics.IODurationHist.WithLabelValues(LblKafka, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
}()
metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs)))
return k.writer.WriteMessages(ctx, msgs...)
}

func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
allMsgs := make([]kafkago.Message, 0)
items.RangeOfTuples(func(index int, tuple api.MessageTuple) bool {
msgs, err := k.collect(ctx, tuple)
Expand All @@ -208,13 +212,13 @@ func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleLis
allMsgs = append(allMsgs, msgs...)
return true
})
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs)))
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
metrics.IODurationHist.WithLabelValues(LblKafka, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
}()
return k.writer.WriteMessages(ctx, allMsgs...)
err = k.writer.WriteMessages(ctx, allMsgs...)
metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs)))
return err
}

func (k *KafkaSink) collect(ctx api.StreamContext, item api.MessageTuple) ([]kafkago.Message, error) {
Expand Down Expand Up @@ -338,3 +342,26 @@ var (
_ api.TupleCollector = &KafkaSink{}
_ util.PingableConn = &KafkaSink{}
)

func getDefaultKafkaConf() *kafkaConf {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
c.kafkaWriterConf = kafkaWriterConf{
BatchSize: 100,
BatchTimeout: time.Microsecond,
BatchBytes: 10485760, // 10MB
}
return c
}

func (kc *kafkaConf) configure(props map[string]interface{}) error {
if err := cast.MapToStruct(props, kc); err != nil {
return err
}
if err := cast.MapToStruct(props, &kc.kafkaWriterConf); err != nil {
return err
}
return nil
}
2 changes: 1 addition & 1 deletion extensions/impl/kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, i
ingestError(ctx, err)
continue
}
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblKafka, metrics.LblSourceIO, LblMsg, ctx.GetRuleId(), ctx.GetOpId()).Inc()
ingest(ctx, msg.Value, nil, timex.GetNow())
}
}
Expand Down
43 changes: 0 additions & 43 deletions extensions/impl/sql/metrics.go

This file was deleted.

18 changes: 12 additions & 6 deletions extensions/impl/sql/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
)

const (
LblInsert = "insert"
LblUpdate = "update"
LblDel = "del"
)

type SQLSinkConnector struct {
config *sqlSinkConfig
cw *connection.ConnWrapper
Expand Down Expand Up @@ -156,10 +162,10 @@ func (s *SQLSinkConnector) Close(ctx api.StreamContext) error {
func (s *SQLSinkConnector) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) {
defer func() {
if err != nil {
SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return s.collect(ctx, item.ToMap())
}

Expand All @@ -185,10 +191,10 @@ func (s *SQLSinkConnector) collect(ctx api.StreamContext, item map[string]any) (
func (s *SQLSinkConnector) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) {
defer func() {
if err != nil {
SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return s.collectList(ctx, items.ToMaps())
}

Expand Down Expand Up @@ -283,7 +289,7 @@ func (s *SQLSinkConnector) save(ctx api.StreamContext, table string, data map[st
func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error {
ctx.GetLogger().Debugf(sqlStr)
if s.needReconnect {
SQLCounter.WithLabelValues(LblReconn, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSinkIO, LblReconn, ctx.GetRuleId(), ctx.GetOpId()).Inc()
err := s.conn.Reconnect()
if err != nil {
return errorx.NewIOErr(err.Error())
Expand All @@ -298,7 +304,7 @@ func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error
s.needReconnect = true
return errorx.NewIOErr(err.Error())
}
SQLHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
metrics.IODurationHist.WithLabelValues(LblSql, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
s.needReconnect = false
d, err := r.RowsAffected()
if err != nil {
Expand Down
25 changes: 17 additions & 8 deletions extensions/impl/sql/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ import (
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

const (
LblSql = "sql"
LblReq = "req"
LblReconn = "reconn"
LblException = "exception"
LblRecv = "recv"
)

type SQLSourceConnector struct {
id string
conf *SQLConf
Expand Down Expand Up @@ -128,19 +136,19 @@ func (s *SQLSourceConnector) Close(ctx api.StreamContext) error {
}

func (s *SQLSourceConnector) Pull(ctx api.StreamContext, recvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) {
SQLCounter.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblReq, ctx.GetRuleId(), ctx.GetOpId()).Inc()
s.queryData(ctx, recvTime, ingest, ingestError)
}

func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) {
logger := ctx.GetLogger()
if s.needReconnect {
SQLCounter.WithLabelValues(LblReconn, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblReconn, ctx.GetRuleId(), ctx.GetOpId()).Inc()
err := s.conn.Reconnect()
if err != nil {
logger.Errorf("reconnect db error %v", err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
}
Expand All @@ -151,7 +159,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
if err != nil {
logger.Errorf("Get sql query error %v", err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
logger.Debugf("Query the database with %s", query)
Expand All @@ -160,12 +168,12 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
failpoint.Inject("QueryErr", func() {
err = errors.New("QueryErr")
})
SQLHist.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
metrics.IODurationHist.WithLabelValues(LblSql, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
if err != nil {
logger.Errorf("query sql error %v", err)
s.needReconnect = true
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
} else if s.needReconnect {
s.needReconnect = false
Expand All @@ -178,7 +186,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
if err != nil {
logger.Errorf("query %v row ColumnTypes error %v", query, err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
for rows.Next() {
Expand All @@ -192,12 +200,13 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
if err != nil {
logger.Errorf("Run sql scan(%s) error %v", query, err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblException, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
scanIntoMap(data, columns, cols)
s.Query.UpdateMaxIndexValue(data)
ingest(ctx, data, nil, rcvTime)
metrics.IOCounter.WithLabelValues(LblSql, metrics.LblSourceIO, LblRecv, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}

Expand Down
1 change: 0 additions & 1 deletion internal/topo/planner/planner_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func findTemplateProps(props map[string]any) []string {
func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) {
index := 0
result := make([]node.TopNode, 0)
// Batch enabled
if sc.BatchSize > 0 || sc.LingerInterval > 0 {
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval))
if err != nil {
Expand Down
Loading

0 comments on commit b77de3d

Please sign in to comment.