Skip to content

Commit eb01537

Browse files
committed
Merge branch 'master' of github.com:qiniu/logkit into shangmin02
2 parents 178d6e0 + f3eebb9 commit eb01537

File tree

6 files changed

+178
-9
lines changed

6 files changed

+178
-9
lines changed

cleaner/cleaner.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package cleaner
22

33
import (
44
"path/filepath"
5+
"runtime/debug"
6+
"sync/atomic"
57
"time"
68

79
"github.com/bmatcuk/doublestar"
@@ -23,6 +25,7 @@ type Cleaner struct {
2325
cleanChan chan<- CleanSignal
2426
name string
2527
logdir string
28+
status int32
2629
}
2730

2831
type CleanSignal struct {
@@ -91,10 +94,19 @@ func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSign
9194
cleanChan: cleanChan,
9295
name: name,
9396
logdir: logdir,
97+
status: config.StatusInit,
9498
}, nil
9599
}
96100

97101
func (c *Cleaner) Run() {
102+
if !atomic.CompareAndSwapInt32(&c.status, config.StatusInit, config.StatusRunning) {
103+
if c.hasStopped() {
104+
log.Warnf("cleaner[%v] has stopped, run operation ignored", c.name)
105+
} else {
106+
log.Warnf("cleaner[%v] has already running, run operation ignored", c.name)
107+
}
108+
return
109+
}
98110
for {
99111
select {
100112
case <-c.exitChan:
@@ -110,9 +122,17 @@ func (c *Cleaner) Run() {
110122
}
111123

112124
func (c *Cleaner) Close() {
125+
if !atomic.CompareAndSwapInt32(&c.status, config.StatusRunning, config.StatusStopped) {
126+
log.Warnf("cleaner[%v] is not running, close operation ignored", c.name)
127+
return
128+
}
113129
c.exitChan <- struct{}{}
114130
}
115131

132+
func (c *Cleaner) hasStopped() bool {
133+
return atomic.LoadInt32(&c.status) == config.StatusStopped
134+
}
135+
116136
func (c *Cleaner) Name() string {
117137
return c.name
118138
}
@@ -160,6 +180,15 @@ func (c *Cleaner) checkBelong(path string) bool {
160180
}
161181

162182
func (c *Cleaner) Clean() (err error) {
183+
defer func() {
184+
if rec := recover(); rec != nil {
185+
log.Errorf("cleaner %q was panicked and recovered from %v\nstack: %s", c.Name(), rec, debug.Stack())
186+
}
187+
}()
188+
if c.hasStopped() {
189+
log.Warnf("cleaner[%v] reader %s has stopped, skip clean operation", c.name)
190+
return
191+
}
163192
var size int64 = 0
164193
var count int64 = 0
165194
beginClean := false

sender/config/config.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ var ModeKeyOptions = map[string][]Option{
861861
KeyName: KeyElasticVersion,
862862
ChooseOnly: true,
863863
Default: ElasticVersion5,
864-
ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6},
864+
ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6, ElasticVersion7},
865865
Description: "ES版本号(es_version)",
866866
},
867867
{
@@ -882,6 +882,17 @@ var ModeKeyOptions = map[string][]Option{
882882
DefaultNoUse: true,
883883
Description: "索引类型名称(elastic_type)",
884884
},
885+
{
886+
KeyName: KeyElasticIDField,
887+
ChooseOnly: false,
888+
Advance: false,
889+
Default: "",
890+
Required: false,
891+
Placeholder: "id",
892+
DefaultNoUse: true,
893+
Description: "索引id字段名(elastic_id_field)",
894+
ToolTip: "默认随机生成id,使用数据中该字段的值作为id,若字段不存在则由es随机生成",
895+
},
885896
OptionAuthUsername,
886897
OptionAuthPassword,
887898
{

sender/config/models.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ const (
118118
KeyElasticAlias = "elastic_keys"
119119
KeyElasticIndexStrategy = "elastic_index_strategy"
120120
KeyElasticTimezone = "elastic_time_zone"
121+
KeyElasticIDField = "elastic_id_field"
121122

122123
KeyDefaultIndexStrategy = "default"
123124
KeyYearIndexStrategy = "year"
@@ -130,6 +131,8 @@ const (
130131
ElasticVersion5 = "5.x"
131132
// ElasticVersion6 v6.x
132133
ElasticVersion6 = "6.x"
134+
// ElasticVersion7 v7.x
135+
ElasticVersion7 = "7.x"
133136

134137
//timeZone
135138
KeylocalTimezone = "Local"
@@ -208,11 +211,11 @@ const (
208211
KeyKafkaCompressionSnappy = "snappy"
209212
KeyKafkaCompressionLZ4 = "lz4"
210213

211-
KeyKafkaHost = "kafka_host" //主机地址,可以有多个
212-
KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
213-
KeyKafkaClientId = "kafka_client_id" //客户端ID
214-
KeySaslUsername = "kafka_sasl_username" //SASL用户名
215-
KeySaslPassword = "kafka_sasl_password" //SASL密码
214+
KeyKafkaHost = "kafka_host" //主机地址,可以有多个
215+
KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
216+
KeyKafkaClientId = "kafka_client_id" //客户端ID
217+
KeySaslUsername = "kafka_sasl_username" //SASL用户名
218+
KeySaslPassword = "kafka_sasl_password" //SASL密码
216219
//KeyKafkaFlushNum = "kafka_flush_num" //缓冲条数
217220
//KeyKafkaFlushFrequency = "kafka_flush_frequency" //缓冲频率
218221
KeyKafkaRetryMax = "kafka_retry_max" //最大重试次数

sender/elasticsearch/elasticsearch.go

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
elasticV6 "github.com/olivere/elastic"
1313
elasticV3 "gopkg.in/olivere/elastic.v3"
1414
elasticV5 "gopkg.in/olivere/elastic.v5"
15+
elasticV7 "gopkg.in/olivere/elastic.v7"
1516

1617
"github.com/qiniu/log"
1718
"github.com/qiniu/pandora-go-sdk/base/reqerr"
@@ -31,11 +32,13 @@ type Sender struct {
3132
host []string
3233
retention int
3334
indexName string
35+
idField string
3436
eType string
3537
eVersion string
3638
elasticV3Client *elasticV3.Client
3739
elasticV5Client *elasticV5.Client
3840
elasticV6Client *elasticV6.Client
41+
elasticV7Client *elasticV7.Client
3942

4043
aliasFields map[string]string
4144

@@ -73,6 +76,7 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
7376
return
7477
}
7578
logkitSendTime, _ := conf.GetBoolOr(KeyLogkitSendTime, true)
79+
idField, _ := conf.GetStringOr(KeyElasticIDField, "")
7680
eType, _ := conf.GetStringOr(KeyElasticType, defaultType)
7781
name, _ := conf.GetStringOr(KeyName, fmt.Sprintf("elasticSender:(elasticUrl:%s,index:%s,type:%s)", host, index, eType))
7882
fields, _ := conf.GetAliasMapOr(KeyElasticAlias, make(map[string]string))
@@ -93,7 +97,24 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
9397
var elasticV3Client *elasticV3.Client
9498
var elasticV5Client *elasticV5.Client
9599
var elasticV6Client *elasticV6.Client
100+
var elasticV7Client *elasticV7.Client
96101
switch eVersion {
102+
case ElasticVersion7:
103+
optFns := []elasticV7.ClientOptionFunc{
104+
elasticV7.SetSniff(false),
105+
elasticV7.SetHealthcheck(false),
106+
elasticV7.SetURL(host...),
107+
elasticV7.SetGzip(enableGzip),
108+
}
109+
110+
if len(authUsername) > 0 && len(authPassword) > 0 {
111+
optFns = append(optFns, elasticV7.SetBasicAuth(authUsername, authPassword))
112+
}
113+
114+
elasticV7Client, err = elasticV7.NewClient(optFns...)
115+
if err != nil {
116+
return nil, err
117+
}
97118
case ElasticVersion6:
98119
optFns := []elasticV6.ClientOptionFunc{
99120
elasticV6.SetSniff(false),
@@ -152,10 +173,12 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
152173
name: name,
153174
host: host,
154175
indexName: index,
176+
idField: idField,
155177
eVersion: eVersion,
156178
elasticV3Client: elasticV3Client,
157179
elasticV5Client: elasticV5Client,
158180
elasticV6Client: elasticV6Client,
181+
elasticV7Client: elasticV7Client,
159182
eType: eType,
160183
aliasFields: fields,
161184
intervalIndex: i,
@@ -184,6 +207,78 @@ func (s *Sender) Name() string {
184207
// Send ElasticSearchSender
185208
func (s *Sender) Send(datas []Data) error {
186209
switch s.eVersion {
210+
case ElasticVersion7:
211+
bulkService := s.elasticV7Client.Bulk()
212+
213+
makeDoc := true
214+
if len(s.aliasFields) == 0 {
215+
makeDoc = false
216+
}
217+
var indexName string
218+
for _, doc := range datas {
219+
//计算索引
220+
indexName = buildIndexName(s.indexName, s.timeZone, s.intervalIndex)
221+
//字段名称替换
222+
if makeDoc {
223+
doc = s.wrapDoc(doc)
224+
}
225+
//添加发送时间
226+
if s.logkitSendTime {
227+
doc[KeySendTime] = time.Now().In(s.timeZone).UnixNano() / 1000000
228+
}
229+
doc2 := doc
230+
231+
request := elasticV7.NewBulkIndexRequest().UseEasyJSON(true).Index(indexName).Type(s.eType).Doc(&doc2)
232+
id, ok := doc[s.idField].(string)
233+
if ok && id != "" {
234+
request.Id(id)
235+
}
236+
bulkService.Add(request)
237+
}
238+
239+
resp, err := bulkService.Do(context.Background())
240+
if err != nil {
241+
return err
242+
}
243+
244+
var (
245+
// 查找出失败的操作并回溯对应的数据返回给上层
246+
lastFailedResult *elasticV7.BulkResponseItem
247+
failedDatas = make([]map[string]interface{}, len(datas))
248+
failedDatasIdx = 0
249+
)
250+
for i, item := range resp.Items {
251+
for _, result := range item {
252+
if !(result.Status >= 200 && result.Status <= 299) {
253+
failedDatas[failedDatasIdx] = datas[i]
254+
failedDatasIdx++
255+
lastFailedResult = result
256+
break // 任一情况的失败都算该条数据整体操作失败,没有必要重复检查
257+
}
258+
}
259+
}
260+
failedDatas = failedDatas[:failedDatasIdx]
261+
if len(failedDatas) == 0 {
262+
return nil
263+
}
264+
lastError, err := jsoniter.MarshalToString(lastFailedResult)
265+
if err != nil {
266+
lastError = fmt.Sprintf("marshal to string failed: %v", lastFailedResult)
267+
}
268+
269+
return &StatsError{
270+
StatsInfo: StatsInfo{
271+
Success: int64(len(datas) - len(failedDatas)),
272+
Errors: int64(len(failedDatas)),
273+
LastError: lastError,
274+
},
275+
SendError: reqerr.NewSendError(
276+
fmt.Sprintf("bulk failed with last error: %s", lastError),
277+
failedDatas,
278+
reqerr.TypeBinaryUnpack,
279+
),
280+
}
281+
187282
case ElasticVersion6:
188283
bulkService := s.elasticV6Client.Bulk()
189284

@@ -204,7 +299,13 @@ func (s *Sender) Send(datas []Data) error {
204299
doc[KeySendTime] = time.Now().In(s.timeZone).UnixNano() / 1000000
205300
}
206301
doc2 := doc
207-
bulkService.Add(elasticV6.NewBulkIndexRequest().UseEasyJSON(true).Index(indexName).Type(s.eType).Doc(&doc2))
302+
303+
request := elasticV6.NewBulkIndexRequest().UseEasyJSON(true).Index(indexName).Type(s.eType).Doc(&doc2)
304+
id, ok := doc[s.idField].(string)
305+
if ok && id != "" {
306+
request.Id(id)
307+
}
308+
bulkService.Add(request)
208309
}
209310

210311
resp, err := bulkService.Do(context.Background())
@@ -270,7 +371,12 @@ func (s *Sender) Send(datas []Data) error {
270371
doc[KeySendTime] = curTime
271372
}
272373
doc2 := doc
273-
bulkService.Add(elasticV5.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2))
374+
request := elasticV5.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2)
375+
id, ok := doc[s.idField].(string)
376+
if ok && id != "" {
377+
request.Id(id)
378+
}
379+
bulkService.Add(request)
274380
}
275381

276382
resp, err := bulkService.Do(context.Background())
@@ -335,7 +441,12 @@ func (s *Sender) Send(datas []Data) error {
335441
doc[KeySendTime] = time.Now().In(s.timeZone).UnixNano() / 1000000
336442
}
337443
doc2 := doc
338-
bulkService.Add(elasticV3.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2))
444+
request := elasticV3.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2)
445+
id, ok := doc[s.idField].(string)
446+
if ok && id != "" {
447+
request.Id(id)
448+
}
449+
bulkService.Add(request)
339450
}
340451

341452
resp, err := bulkService.Do()
@@ -409,6 +520,9 @@ func (s *Sender) Close() error {
409520
if s.elasticV6Client != nil {
410521
s.elasticV6Client.Stop()
411522
}
523+
if s.elasticV7Client != nil {
524+
s.elasticV7Client.Stop()
525+
}
412526
return nil
413527
}
414528

transforms/mutate/replace.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ var (
1818
type Replacer struct {
1919
StageTime string `json:"stage"`
2020
Key string `json:"key"`
21+
OldKey string `json:"old"` // 兼容老版本
22+
NewKey string `json:"new"`
2123
Old string `json:"old_string"`
2224
New string `json:"new_string"`
2325
Regex bool `json:"regex"`
@@ -29,6 +31,12 @@ type Replacer struct {
2931
}
3032

3133
func (g *Replacer) Init() error {
34+
if g.Old == "" {
35+
g.Old = g.OldKey
36+
}
37+
if g.New == "" {
38+
g.New = g.NewKey
39+
}
3240
rgexpr := g.Old
3341
if !g.Regex {
3442
rgexpr = regexp.QuoteMeta(g.Old)

transforms/mutate/split.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ type Spliter struct {
1919
Key string `json:"key"`
2020
SeparateKey string `json:"sep"`
2121
ArrayName string `json:"new"`
22+
ArrayNameNew string `json:"newfield"` // 兼容老版本
2223

2324
stats StatsInfo
2425
keys []string
2526
numRoutine int
2627
}
2728

2829
func (g *Spliter) Init() error {
30+
if g.ArrayName == "" {
31+
g.ArrayName = g.ArrayNameNew
32+
}
2933
g.keys = GetKeys(g.Key)
3034

3135
numRoutine := MaxProcs

0 commit comments

Comments
 (0)