Skip to content

Commit e73b889

Browse files
committed
kafka消费异常抛出
1 parent 4a1a966 commit e73b889

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

kafka/kafka.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -217,29 +217,28 @@ func (this_ *Service) Pull(groupId string, topics []string, PullSize int, PullTi
217217
if err != nil {
218218
return
219219
}
220+
defer func() {
221+
e := group.Close()
222+
if e != nil {
223+
util.Logger.Error("group close error", zap.Error(e))
224+
}
225+
}()
220226
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(PullTimeout))
221227
handler := &consumerGroupHandler{
222228
size: PullSize,
223229
cancel: cancel,
224230
}
225231
util.Logger.Info("kafka pull start", zap.Any("topics", topics), zap.Any("groupId", groupId), zap.Any("timeout", PullTimeout))
226232
err = group.Consume(ctx, topics, handler)
227-
228233
util.Logger.Info("kafka pull end", zap.Any("topics", topics), zap.Any("groupId", groupId), zap.Any("timeout", PullTimeout))
229234
if err != nil {
230235
util.Logger.Error("group consume error", zap.Error(err))
236+
return
231237
}
232238

233-
err = group.Close()
234-
if err != nil {
235-
util.Logger.Error("group close error", zap.Error(err))
236-
}
237239
for _, one := range handler.messages {
238240
var msg *Message
239-
msg, err = ConsumerMessageToMessage(keyType, valueType, one)
240-
if err != nil {
241-
return
242-
}
241+
msg = ConsumerMessageToMessage(keyType, valueType, one)
243242
msgList = append(msgList, msg)
244243
}
245244
return
@@ -901,7 +900,7 @@ func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMes
901900
return
902901
}
903902

904-
func ConsumerMessageToMessage(keyType string, valueType string, consumerMessage *sarama.ConsumerMessage) (msg *Message, err error) {
903+
func ConsumerMessageToMessage(keyType string, valueType string, consumerMessage *sarama.ConsumerMessage) (msg *Message) {
905904
var key string
906905
var value string
907906

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
"version": "1.2.41"
2+
"version": "1.2.42"
33
}

0 commit comments

Comments
 (0)