9
9
"fmt"
10
10
"github.com/Shopify/sarama"
11
11
"github.com/team-ide/go-tool/util"
12
- "io/ioutil"
13
12
"sort"
14
13
"strconv"
15
14
"strings"
@@ -25,28 +24,31 @@ type Config struct {
25
24
}
26
25
27
26
// New 创建kafka服务
28
- func New (config Config ) (* KafkaService , error ) {
29
- service := & KafkaService {
27
+ func New (config Config ) (IService , error ) {
28
+ service := & Service {
30
29
Config : config ,
31
30
}
32
31
err := service .init ()
33
- return service , err
32
+ if err != nil {
33
+ return nil , err
34
+ }
35
+ return service , nil
34
36
}
35
37
36
- // KafkaService 注册处理器在线信息等
37
- type KafkaService struct {
38
+ // Service 注册处理器在线信息等
39
+ type Service struct {
38
40
Config
39
41
}
40
42
41
- func (this_ * KafkaService ) init () (err error ) {
43
+ func (this_ * Service ) init () (err error ) {
42
44
return
43
45
}
44
46
45
- func (this_ * KafkaService ) Stop () {
47
+ func (this_ * Service ) Stop () {
46
48
47
49
}
48
50
49
- func (this_ * KafkaService ) GetServers () []string {
51
+ func (this_ * Service ) GetServers () []string {
50
52
var servers []string
51
53
if this_ .Address == "" {
52
54
return servers
@@ -61,7 +63,7 @@ func (this_ *KafkaService) GetServers() []string {
61
63
return servers
62
64
}
63
65
64
- func (this_ * KafkaService ) getClient () (saramaClient sarama.Client , err error ) {
66
+ func (this_ * Service ) getClient () (saramaClient sarama.Client , err error ) {
65
67
config := sarama .NewConfig ()
66
68
config .Consumer .Return .Errors = true
67
69
config .Consumer .Offsets .Initial = sarama .OffsetOldest
@@ -77,7 +79,7 @@ func (this_ *KafkaService) getClient() (saramaClient sarama.Client, err error) {
77
79
if this_ .CertPath != "" {
78
80
certPool := x509 .NewCertPool ()
79
81
var pemCerts []byte
80
- pemCerts , err = ioutil .ReadFile (this_ .CertPath )
82
+ pemCerts , err = util .ReadFile (this_ .CertPath )
81
83
if err != nil {
82
84
return
83
85
}
@@ -102,7 +104,7 @@ func (this_ *KafkaService) getClient() (saramaClient sarama.Client, err error) {
102
104
return
103
105
}
104
106
105
- func (this_ * KafkaService ) Info () (res interface {}, err error ) {
107
+ func (this_ * Service ) Info () (res interface {}, err error ) {
106
108
107
109
return
108
110
}
@@ -118,7 +120,7 @@ type TopicInfo struct {
118
120
Topic string `json:"topic"`
119
121
}
120
122
121
- func (this_ * KafkaService ) GetTopics () (res []* TopicInfo , err error ) {
123
+ func (this_ * Service ) GetTopics () (res []* TopicInfo , err error ) {
122
124
var saramaClient sarama.Client
123
125
saramaClient , err = this_ .getClient ()
124
126
if err != nil {
@@ -142,7 +144,7 @@ func (this_ *KafkaService) GetTopics() (res []*TopicInfo, err error) {
142
144
return
143
145
}
144
146
145
- func (this_ * KafkaService ) Pull (groupId string , topics []string , PullSize int , PullTimeout int , keyType , valueType string ) (msgList []* Message , err error ) {
147
+ func (this_ * Service ) Pull (groupId string , topics []string , PullSize int , PullTimeout int , keyType , valueType string ) (msgList []* Message , err error ) {
146
148
if PullSize <= 0 {
147
149
PullSize = 10
148
150
}
@@ -217,7 +219,7 @@ func (handler *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
217
219
return nil
218
220
}
219
221
220
- func (this_ * KafkaService ) MarkOffset (groupId string , topic string , partition int32 , offset int64 ) (err error ) {
222
+ func (this_ * Service ) MarkOffset (groupId string , topic string , partition int32 , offset int64 ) (err error ) {
221
223
var saramaClient sarama.Client
222
224
saramaClient , err = this_ .getClient ()
223
225
if err != nil {
@@ -237,7 +239,7 @@ func (this_ *KafkaService) MarkOffset(groupId string, topic string, partition in
237
239
return
238
240
}
239
241
240
- func (this_ * KafkaService ) ResetOffset (groupId string , topic string , partition int32 , offset int64 ) (err error ) {
242
+ func (this_ * Service ) ResetOffset (groupId string , topic string , partition int32 , offset int64 ) (err error ) {
241
243
var saramaClient sarama.Client
242
244
saramaClient , err = this_ .getClient ()
243
245
if err != nil {
@@ -259,7 +261,7 @@ func (this_ *KafkaService) ResetOffset(groupId string, topic string, partition i
259
261
return
260
262
}
261
263
262
- func (this_ * KafkaService ) CreatePartitions (topic string , count int32 ) (err error ) {
264
+ func (this_ * Service ) CreatePartitions (topic string , count int32 ) (err error ) {
263
265
var saramaClient sarama.Client
264
266
saramaClient , err = this_ .getClient ()
265
267
if err != nil {
@@ -278,7 +280,7 @@ func (this_ *KafkaService) CreatePartitions(topic string, count int32) (err erro
278
280
return
279
281
}
280
282
281
- func (this_ * KafkaService ) CreateTopic (topic string , numPartitions int32 , replicationFactor int16 ) (err error ) {
283
+ func (this_ * Service ) CreateTopic (topic string , numPartitions int32 , replicationFactor int16 ) (err error ) {
282
284
var saramaClient sarama.Client
283
285
saramaClient , err = this_ .getClient ()
284
286
if err != nil {
@@ -306,7 +308,7 @@ func (this_ *KafkaService) CreateTopic(topic string, numPartitions int32, replic
306
308
return
307
309
}
308
310
309
- func (this_ * KafkaService ) DeleteTopic (topic string ) (err error ) {
311
+ func (this_ * Service ) DeleteTopic (topic string ) (err error ) {
310
312
var saramaClient sarama.Client
311
313
saramaClient , err = this_ .getClient ()
312
314
if err != nil {
@@ -325,7 +327,7 @@ func (this_ *KafkaService) DeleteTopic(topic string) (err error) {
325
327
return
326
328
}
327
329
328
- func (this_ * KafkaService ) DeleteConsumerGroup (groupId string ) (err error ) {
330
+ func (this_ * Service ) DeleteConsumerGroup (groupId string ) (err error ) {
329
331
var saramaClient sarama.Client
330
332
saramaClient , err = this_ .getClient ()
331
333
if err != nil {
@@ -344,7 +346,7 @@ func (this_ *KafkaService) DeleteConsumerGroup(groupId string) (err error) {
344
346
return
345
347
}
346
348
347
- func (this_ * KafkaService ) DeleteRecords (topic string , partitionOffsets map [int32 ]int64 ) (err error ) {
349
+ func (this_ * Service ) DeleteRecords (topic string , partitionOffsets map [int32 ]int64 ) (err error ) {
348
350
var saramaClient sarama.Client
349
351
saramaClient , err = this_ .getClient ()
350
352
if err != nil {
@@ -364,7 +366,7 @@ func (this_ *KafkaService) DeleteRecords(topic string, partitionOffsets map[int3
364
366
}
365
367
366
368
// NewSyncProducer 创建生产者
367
- func (this_ * KafkaService ) NewSyncProducer () (syncProducer sarama.SyncProducer , err error ) {
369
+ func (this_ * Service ) NewSyncProducer () (syncProducer sarama.SyncProducer , err error ) {
368
370
369
371
config := sarama .NewConfig ()
370
372
config .Producer .Return .Successes = true
@@ -380,7 +382,7 @@ func (this_ *KafkaService) NewSyncProducer() (syncProducer sarama.SyncProducer,
380
382
if this_ .CertPath != "" {
381
383
certPool := x509 .NewCertPool ()
382
384
var pemCerts []byte
383
- pemCerts , err = ioutil .ReadFile (this_ .CertPath )
385
+ pemCerts , err = util .ReadFile (this_ .CertPath )
384
386
if err != nil {
385
387
return
386
388
}
@@ -410,7 +412,7 @@ type ProducerMessage struct {
410
412
}
411
413
412
414
// Push 推送消息到kafka
413
- func (this_ * KafkaService ) Push (msg * Message ) (err error ) {
415
+ func (this_ * Service ) Push (msg * Message ) (err error ) {
414
416
producerMessage , err := MessageToProducerMessage (msg )
415
417
if err != nil {
416
418
return
0 commit comments