diff --git a/docs/07.Reference/7.02.Filters.md b/docs/07.Reference/7.02.Filters.md index 210b0cb646..ebe6d18458 100644 --- a/docs/07.Reference/7.02.Filters.md +++ b/docs/07.Reference/7.02.Filters.md @@ -111,6 +111,7 @@ - [validator.OAuth2TokenIntrospect](#validatoroauth2tokenintrospect) - [validator.OAuth2JWT](#validatoroauth2jwt) - [kafka.Topic](#kafkatopic) + - [kafka.Key](#kafkakey) - [headertojson.HeaderMap](#headertojsonheadermap) - [headerlookup.HeaderSetterSpec](#headerlookupheadersetterspec) - [requestadaptor.SignerSpec](#requestadaptorsignerspec) @@ -1019,10 +1020,22 @@ Below is an example configuration. kind: Kafka name: kafka-example backend: [":9093"] +# sync determines the usage of AsyncProducer or SyncProducer for the Kafka filter. +# default is false. If set to true, encountering a message error will cause the Kafka filter +# to respond with a status code of 503 and a body containing "{err: "error message"}". +sync: false topic: + # default topic for Kafka message default: kafka-topic + # dynamic topic for Kafka message, get from http header dynamic: header: X-Kafka-Topic +key: + # default key for Kafka message + default: kafka-key + # dynamic key for Kafka message, get from http header + dynamic: + header: X-Kafka-Key ``` ### Configuration @@ -1030,7 +1043,9 @@ topic: | Name | Type | Description | Required | | ------------ | -------- | -------------------------------- | -------- | | backend | []string | Addresses of Kafka backend | Yes | +| sync | bool | Usage of AsyncProducer or SyncProducer, default is false | No | | topic | [Kafka.Topic](#kafkatopic) | the topic is Spec used to get Kafka topic used to send message to the backend | Yes | +| key | [Kafka.Key](#kafkakey) | the key is Spec used to get Kafka message key | No | ### Results @@ -2029,6 +2044,13 @@ The relationship between `methods` and `url` is `AND`. | default | string | Default topic for Kafka backend | Yes | | dynamic.header | string | The HTTP header that contains Kafka topic | Yes | +### kafka.Key + +| Name | Type | Description | Required | +| --------- | ------ | ------------------------------------------------------------------------ | -------- | +| default | string | Default key for Kafka message | Yes | +| dynamic.header | string | The HTTP header that contains Kafka key | No | + ### headertojson.HeaderMap | Name | Type | Description | Required | diff --git a/pkg/filters/kafkabackend/kafka.go b/pkg/filters/kafkabackend/kafka.go index 11cb46f705..d29302d6ce 100644 --- a/pkg/filters/kafkabackend/kafka.go +++ b/pkg/filters/kafkabackend/kafka.go @@ -28,6 +28,7 @@ import ( "github.com/megaease/easegress/v2/pkg/filters" "github.com/megaease/easegress/v2/pkg/logger" "github.com/megaease/easegress/v2/pkg/protocols/httpprot" + "github.com/megaease/easegress/v2/pkg/util/codectool" ) const ( @@ -56,10 +57,19 @@ func init() { type ( // Kafka is a kafka proxy for HTTP requests. Kafka struct { - spec *Spec - producer sarama.AsyncProducer - done chan struct{} - header string + spec *Spec + asyncProducer sarama.AsyncProducer + syncProcuder sarama.SyncProducer + + headerTopic string + headerKey string + done chan struct{} + } + + // Err is the error of Kafka + Err struct { + Err string `json:"err"` + Code int `json:"code"` } ) @@ -80,11 +90,17 @@ func (k *Kafka) Spec() filters.Spec { return k.spec } -func (k *Kafka) setHeader(spec *Spec) { +func (k *Kafka) setHeader() { if k.spec.Topic.Dynamic != nil { - k.header = http.CanonicalHeaderKey(k.spec.Topic.Dynamic.Header) - if k.header == "" { - panic("empty header") + k.headerTopic = http.CanonicalHeaderKey(k.spec.Topic.Dynamic.Header) + if k.headerTopic == "" { + panic("empty header topic") + } + } + if k.spec.Key.Dynamic != nil { + k.headerKey = http.CanonicalHeaderKey(k.spec.Key.Dynamic.Header) + if k.headerKey == "" { + panic("empty header key") } } } @@ -93,30 +109,39 @@ func (k *Kafka) setHeader(spec *Spec) { func (k *Kafka) Init() { spec := k.spec k.done = make(chan struct{}) - k.setHeader(k.spec) + k.setHeader() config := sarama.NewConfig() config.ClientID = spec.Name() config.Version = sarama.V1_0_0_0 - producer, err := sarama.NewAsyncProducer(k.spec.Backend, config) - if err != nil { - panic(fmt.Errorf("start sarama producer with address %v failed: %v", k.spec.Backend, err)) - } + if spec.Sync { + config.Producer.Return.Successes = true + producer, err := sarama.NewSyncProducer(k.spec.Backend, config) + if err != nil { + panic(fmt.Errorf("start sarama sync producer with address %v failed: %v", spec.Backend, err)) + } + k.syncProcuder = producer + } else { + producer, err := sarama.NewAsyncProducer(k.spec.Backend, config) + if err != nil { + panic(fmt.Errorf("start sarama async producer with address %v failed: %v", k.spec.Backend, err)) + } - k.producer = producer - go k.checkProduceError() + k.asyncProducer = producer + go k.checkProduceError() + } } func (k *Kafka) checkProduceError() { for { select { case <-k.done: - err := k.producer.Close() + err := k.asyncProducer.Close() if err != nil { logger.Errorf("close kafka producer failed: %v", err) } return - case err, ok := <-k.producer.Errors(): + case err, ok := <-k.asyncProducer.Errors(): if !ok { return } @@ -141,20 +166,32 @@ func (k *Kafka) Status() interface{} { } func (k *Kafka) getTopic(req *httpprot.Request) string { - if k.header == "" { + if k.headerTopic == "" { return k.spec.Topic.Default } - topic := req.Std().Header.Get(k.header) + topic := req.Std().Header.Get(k.headerTopic) if topic == "" { return k.spec.Topic.Default } return topic } +func (k *Kafka) getKey(req *httpprot.Request) string { + if k.headerKey == "" { + return k.spec.Key.Default + } + key := req.Std().Header.Get(k.headerKey) + if key == "" { + return k.spec.Key.Default + } + return key +} + // Handle handles the context. func (k *Kafka) Handle(ctx *context.Context) (result string) { req := ctx.GetInputRequest().(*httpprot.Request) topic := k.getTopic(req) + key := k.getKey(req) body, err := io.ReadAll(req.GetPayload()) if err != nil { @@ -165,6 +202,45 @@ func (k *Kafka) Handle(ctx *context.Context) (result string) { Topic: topic, Value: sarama.ByteEncoder(body), } - k.producer.Input() <- msg + if key != "" { + msg.Key = sarama.StringEncoder(key) + } + + if k.spec.Sync { + _, _, err = k.syncProcuder.SendMessage(msg) + if err != nil { + logger.Errorf("send message to kafka failed: %v", err) + setErrResponse(ctx, err) + } else { + setSuccessResponse(ctx) + } + return "" + } + + k.asyncProducer.Input() <- msg return "" } + +func setSuccessResponse(ctx *context.Context) { + resp, _ := ctx.GetOutputResponse().(*httpprot.Response) + if resp == nil { + resp, _ = httpprot.NewResponse(nil) + } + resp.SetStatusCode(http.StatusOK) + ctx.SetOutputResponse(resp) +} + +func setErrResponse(ctx *context.Context, err error) { + resp, _ := ctx.GetOutputResponse().(*httpprot.Response) + if resp == nil { + resp, _ = httpprot.NewResponse(nil) + } + resp.SetStatusCode(http.StatusInternalServerError) + errMsg := &Err{ + Err: err.Error(), + Code: http.StatusInternalServerError, + } + data, _ := codectool.MarshalJSON(errMsg) + resp.SetPayload(data) + ctx.SetOutputResponse(resp) +} diff --git a/pkg/filters/kafkabackend/kafka_test.go b/pkg/filters/kafkabackend/kafka_test.go index c971fe95e9..0bc17163ce 100644 --- a/pkg/filters/kafkabackend/kafka_test.go +++ b/pkg/filters/kafkabackend/kafka_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strings" + "sync" "testing" "github.com/Shopify/sarama" @@ -87,6 +88,41 @@ func newMockAsyncProducer() sarama.AsyncProducer { } } +type mockSyncProducer struct { + msgs []*sarama.ProducerMessage + mu sync.Mutex +} + +func newMockSyncProducer() sarama.SyncProducer { + return &mockSyncProducer{} +} + +func (s *mockSyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.msgs = append(s.msgs, msg) + return 0, 0, nil +} +func (s *mockSyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + s.mu.Lock() + defer s.mu.Unlock() + s.msgs = append(s.msgs, msgs...) + return nil +} + +func (s *mockSyncProducer) Close() error { return nil } +func (s *mockSyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag { return 0 } +func (s *mockSyncProducer) IsTransactional() bool { return false } +func (s *mockSyncProducer) BeginTxn() error { return nil } +func (s *mockSyncProducer) CommitTxn() error { return nil } +func (s *mockSyncProducer) AbortTxn() error { return nil } +func (s *mockSyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error { + return nil +} +func (s *mockSyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error { + return nil +} + func defaultFilterSpec(t *testing.T, spec *Spec) filters.Spec { spec.BaseSpec.MetaSpec.Kind = Kind spec.BaseSpec.MetaSpec.Name = "kafka" @@ -110,6 +146,9 @@ func TestKafka(t *testing.T) { Topic: &Topic{ Default: "default-topic", }, + Key: Key{ + Default: "default-key", + }, }) k := kind.CreateInstance(spec) @@ -133,11 +172,16 @@ func TestHandleHTTP(t *testing.T) { Header: "x-kafka-topic", }, }, + Key: Key{ + Dynamic: &Dynamic{ + Header: "x-kafka-key", + }, + }, }, - producer: newMockAsyncProducer(), - done: make(chan struct{}), + asyncProducer: newMockAsyncProducer(), + done: make(chan struct{}), } - kafka.setHeader(kafka.spec) + kafka.setHeader() go kafka.checkProduceError() defer kafka.Close() @@ -147,14 +191,19 @@ func TestHandleHTTP(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "127.0.0.1", strings.NewReader("text")) assert.Nil(err) req.Header.Add("x-kafka-topic", "kafka") + req.Header.Add("x-kafka-key", "key") setRequest(t, ctx, req) ans := kafka.Handle(ctx) assert.Equal("", ans) - msg := <-kafka.producer.(*mockAsyncProducer).ch + msg := <-kafka.asyncProducer.(*mockAsyncProducer).ch assert.Equal("kafka", msg.Topic) assert.Equal(0, len(msg.Headers)) + + key, err := msg.Key.Encode() + assert.Nil(err) + assert.Equal("key", string(key)) value, err := msg.Value.Encode() assert.Nil(err) assert.Equal("text", string(value)) @@ -167,9 +216,87 @@ func TestHandleHTTP(t *testing.T) { ans = kafka.Handle(ctx) assert.Equal("", ans) - msg = <-kafka.producer.(*mockAsyncProducer).ch + msg = <-kafka.asyncProducer.(*mockAsyncProducer).ch assert.Equal("default-topic", msg.Topic) assert.Equal(0, len(msg.Headers)) + + assert.Nil(msg.Key) + value, err = msg.Value.Encode() + assert.Nil(err) + assert.Equal("text", string(value)) +} + +func TestHandleHTTPSync(t *testing.T) { + assert := assert.New(t) + kafka := Kafka{ + spec: &Spec{ + Sync: true, + Topic: &Topic{ + Default: "default-topic", + Dynamic: &Dynamic{ + Header: "x-kafka-topic", + }, + }, + Key: Key{ + Dynamic: &Dynamic{ + Header: "x-kafka-key", + }, + }, + }, + syncProcuder: newMockSyncProducer(), + done: make(chan struct{}), + } + kafka.setHeader() + defer kafka.Close() + + getMsg := func() *sarama.ProducerMessage { + p := kafka.syncProcuder.(*mockSyncProducer) + p.mu.Lock() + defer p.mu.Unlock() + if len(p.msgs) == 0 { + return nil + } + msg := p.msgs[0] + p.msgs = p.msgs[1:] + return msg + } + + ctx := context.New(nil) + + // test header + req, err := http.NewRequest(http.MethodPost, "127.0.0.1", strings.NewReader("text")) + assert.Nil(err) + req.Header.Add("x-kafka-topic", "kafka") + req.Header.Add("x-kafka-key", "key") + setRequest(t, ctx, req) + + ans := kafka.Handle(ctx) + assert.Equal("", ans) + + msg := getMsg() + assert.Equal("kafka", msg.Topic) + assert.Equal(0, len(msg.Headers)) + + key, err := msg.Key.Encode() + assert.Nil(err) + assert.Equal("key", string(key)) + value, err := msg.Value.Encode() + assert.Nil(err) + assert.Equal("text", string(value)) + + // test default + req, err = http.NewRequest(http.MethodPost, "127.0.0.1", strings.NewReader("text")) + assert.Nil(err) + setRequest(t, ctx, req) + + ans = kafka.Handle(ctx) + assert.Equal("", ans) + + msg = getMsg() + assert.Equal("default-topic", msg.Topic) + assert.Equal(0, len(msg.Headers)) + + assert.Nil(msg.Key) value, err = msg.Value.Encode() assert.Nil(err) assert.Equal("text", string(value)) diff --git a/pkg/filters/kafkabackend/spec.go b/pkg/filters/kafkabackend/spec.go index 29d9eef4ff..9b07ac342c 100644 --- a/pkg/filters/kafkabackend/spec.go +++ b/pkg/filters/kafkabackend/spec.go @@ -25,7 +25,10 @@ type ( filters.BaseSpec `json:",inline"` Backend []string `json:"backend" jsonschema:"required,uniqueItems=true"` - Topic *Topic `json:"topic" jsonschema:"required"` + Sync bool `json:"sync,omitempty"` + + Topic *Topic `json:"topic" jsonschema:"required"` + Key Key `json:"key,omitempty"` } // Topic defined ways to get Kafka topic @@ -38,4 +41,10 @@ type ( Dynamic struct { Header string `json:"header,omitempty"` } + + // Key defined ways to get Kafka key + Key struct { + Default string `json:"default"` + Dynamic *Dynamic `json:"dynamic,omitempty"` + } )