Skip to content

Commit 56fb553

Browse files
author
Chen Ming
committed
异步处理日志切换后文件关闭工作
1 parent c55f0a7 commit 56fb553

7 files changed

+88
-65
lines changed

file.go

+21-4
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,26 @@ import (
99
// filepath 文件默认地址
1010
// nextTheTime下一个整点时间
1111
// 返回整点时间日志文件地址
12-
func GenLogFilepath(filepath string, nextTheTime time.Time) string {
12+
func GenLogFilepath(filepath string, nextTheTime time.Time, tk time.Duration) string {
1313
fileSlice := strings.Split(filepath, ".")
14-
return strings.Join([]string{fileSlice[0], nextTheTime.Format("2006-01-02 15:04:05"), ".log"}, "")
14+
var formatStyle string
15+
switch tk {
16+
case time.Nanosecond:
17+
formatStyle = "20060102150405000000000"
18+
case time.Second:
19+
formatStyle = "20060102150405"
20+
case time.Minute:
21+
formatStyle = "200601021504"
22+
case time.Hour:
23+
formatStyle = "2006010215"
24+
case TheDay:
25+
formatStyle = "20060102"
26+
case TheMonth:
27+
formatStyle = "200601"
28+
case TheYear:
29+
formatStyle = "2006"
30+
default:
31+
formatStyle = "2006010215"
32+
}
33+
return strings.Join([]string{fileSlice[0], nextTheTime.Format(formatStyle), ".log"}, "")
1534
}
16-
17-

file_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ func TestGenLogFilepath(t *testing.T) {
99
result := "my2021-10-09 00:00:00.log"
1010
file := "my.log"
1111
dur := time.Date(2021, 10, 9, 0, 0, 0, 0, time.Local)
12-
log_name := GenLogFilepath(file, dur)
12+
log_name := GenLogFilepath(file, dur,time.Minute)
1313
if log_name != result {
1414
t.Errorf("生成log_name不正确 result:%s log_naem:%s", result, log_name)
1515
return

logger.go

+4-16
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,8 @@ import (
77
"go.uber.org/zap/zapcore"
88
)
99

10-
// zapper 默认的logger的配置
11-
const (
12-
defaultLevel = zap.DebugLevel
13-
defaultProduct = "zapper"
14-
defaultModule = "zapper"
15-
defaultFilePath = "./zapper.log"
16-
)
17-
1810
// defaultLogger 默认的日志对象
19-
var defaultLogger *zap.Logger = NewLogger(defaultLevel, defaultProduct, defaultModule, defaultFilePath)
20-
21-
func init() {
22-
defaultLogger.Info("zapper init finished", zap.Bool("success", true),zap.Int("int", 100),zap.String("string", "val string"))
23-
}
11+
var defaultLogger *zap.Logger
2412

2513
// 初始化函数
2614
// level 日志收集的最低级别
@@ -34,14 +22,14 @@ func Zapper(level zapcore.Level, product, module string, outputPath string) {
3422
}
3523

3624
// Sync 在服务停止前调用,日志罗盘goroutine生命周期管理的收尾工作
25+
// 通知writer 及时停止;
3726
// defer zapper.Sync()
3827
func Sync() {
39-
defaultLogger.Sync()
40-
// TODO: 通知writer 及时停止;
28+
defaultLogger.Sync()
4129
defaultWriter.Close()
4230
}
4331

44-
// SetDefaultLogger
32+
// SetDefaultLogger
4533
// logger 用于替换zapper中默认的logger
4634
func SetDefaultLogger(logger *zap.Logger) { defaultLogger = logger }
4735

middleware.go

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ func LogMiddleware(h http.Handler) http.Handler {
1111

1212
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1313

14+
// TODO:
15+
// req body
16+
// resp body
17+
// latency 响应时长
1418
newLogger := wrapReqFields(defaultLogger, r)
1519

1620
r.WithContext(context.WithValue(r.Context(), "logger", newLogger))
@@ -27,6 +31,7 @@ func LogMiddleware(h http.Handler) http.Handler {
2731
func wrapReqFields(logger *zap.Logger, r *http.Request) *zap.Logger {
2832
urlQuery := r.URL.Query()
2933
return logger.With(
34+
// TODO: 找到字段所在报文位置
3035
zap.String("logid", urlQuery.Get("")),
3136
zap.String("caller_ip", r.Header.Get("")),
3237
zap.String("host_ip", r.Host),

middleware_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
"log"
77
"net/http"
88
"testing"
9+
10+
"go.uber.org/zap"
911
)
1012

1113
func TestLogMiddleware(t *testing.T) {
12-
1314

1415
fooHandler := func(w http.ResponseWriter, r *http.Request) {
15-
w.Write([]byte("I am ok"))
16+
Error("LogMiddleware", zap.String("net/http middleware", "net/http的handler打印"))
17+
w.Write([]byte("I'm OK"))
1618
}
1719

1820
http.Handle("/foo", LogMiddleware(http.HandlerFunc(fooHandler)))

write.go

+45-39
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ func SetDefaultWriter(w *Writer) { defaultWriter = w }
2323
// tk 为整点间隔单位
2424
// tkSStop 为日志切割停止信号
2525
type Writer struct {
26-
buf chan string
27-
signal chan struct{}
28-
f *os.File
29-
nextTheTime time.Time
30-
filepath string
31-
tk time.Duration
32-
tkStop chan struct{}
26+
buf chan string // 日志数据缓冲
27+
rotateSignal chan struct{} // 日志切割信号
28+
f *os.File // 当前落盘日志文件指针
29+
nextTheTime time.Time // 当前日志所在整点时间
30+
filepath string // 默认日志文件地址
31+
tk time.Duration // 日志切割间隔整点时间
32+
tkStop chan struct{} // 日志切割停止信号
33+
syncFileBuf chan *os.File // 需要异步关闭的文件指针
3334
}
3435

3536
// Clone 复制一个新的Writer
@@ -64,19 +65,17 @@ func (f WriterOptionFunc) Apply(w *Writer) {
6465
// file 文件名
6566
// tk 日志切割的间隔整点时间单位
6667
// cacheMax 缓存的最大值
67-
// TODO: Writer 的配置项做成Options方式处理
68+
// Writer 的配置项做成Options方式处理
6869
func NewWriter(file string, tk time.Duration, cacheMax int64) (w *Writer) {
6970
// 路径转换(相对路径转绝对路径)
70-
if !filepath.IsAbs(file) {
71-
var err error
72-
file, err = filepath.Abs(file)
73-
if err != nil {
74-
panic(err)
75-
}
71+
var err error
72+
file, err = filepath.Abs(file) //绝对路径会直接返回绝对路径
73+
if err != nil {
74+
panic(err)
7675
}
7776

7877
firstTheTime := NextTheTime(time.Now(), tk)
79-
firstFile := GenLogFilepath(file, firstTheTime)
78+
firstFile := GenLogFilepath(file, firstTheTime, tk)
8079
f, err := os.OpenFile(firstFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
8180
if err != nil {
8281
panic(err)
@@ -87,17 +86,19 @@ func NewWriter(file string, tk time.Duration, cacheMax int64) (w *Writer) {
8786
}
8887

8988
w = &Writer{
90-
buf: make(chan string, cacheMax),
91-
signal: make(chan struct{}),
92-
f: f,
93-
nextTheTime: firstTheTime,
94-
filepath: file,
95-
tk: tk,
96-
tkStop: make(chan struct{}),
89+
buf: make(chan string, cacheMax),
90+
rotateSignal: make(chan struct{}),
91+
f: f,
92+
nextTheTime: firstTheTime,
93+
filepath: file,
94+
tk: tk,
95+
tkStop: make(chan struct{}),
96+
syncFileBuf: make(chan *os.File, 128),
9797
}
9898

9999
go w.run()
100100
go w.rotate()
101+
go w.syncSync()
101102

102103
return
103104
}
@@ -106,7 +107,6 @@ func NewWriter(file string, tk time.Duration, cacheMax int64) (w *Writer) {
106107
// io.Writer interface的实现
107108
func (w *Writer) Write(p []byte) (n int, err error) {
108109
w.buf <- string(p)
109-
// TODO: whether n,err have been used by zap
110110
return
111111
}
112112

@@ -125,61 +125,67 @@ func (w *Writer) run() {
125125
case b, ok := <-w.buf:
126126
if !ok {
127127
w.tkStop <- struct{}{}
128-
w.f.Sync()
128+
w.f.Sync() // TODO: 检查logger.Sync()
129129
w.f.Close()
130130
return
131131
}
132-
// TODO: check json repeated key
132+
// check json repeated key
133133
fmt.Println("准备写入文件的日志内容:", b)
134134
bm := make(map[string]interface{}, 0)
135135
err := json.Unmarshal([]byte(b), &bm)
136136
if err != nil {
137137
fmt.Println(err)
138138
}
139139

140-
fmt.Println("准备写入文件的日志内容,json->map:", bm) //TODO: delete
141140
bb, err := json.Marshal(bm)
142141
if err != nil {
143142
fmt.Println(err)
144143
}
145-
fmt.Println("当前日志写入的文件名", w.f.Name()) //TODO: delete
146144
// w.f.WriteString(b)
147145
w.f.WriteString(string(bb) + "\n")
148-
case <-w.signal:
149-
w.f.Sync()
150-
w.f.Close()
146+
case <-w.rotateSignal:
147+
// 异步化处理文件同步、关闭
148+
w.syncFileBuf <- w.f
151149
// replace os.File
152-
fmt.Println("收到替换信号,当前日志文件:", w.f.Name())
153-
fmt.Println("收到替换信号,w.nextTheTime:", w.nextTheTime.Format(time.RFC3339))
154-
newFile := GenLogFilepath(w.filepath, w.nextTheTime)
155-
fmt.Println("收到替换信号,newFile生成的文件名:", newFile)
150+
newFile := GenLogFilepath(w.filepath, w.nextTheTime, w.tk)
156151
var err error
157152
w.f, err = os.OpenFile(newFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
158153
if err != nil {
159154
fmt.Println(err)
160155
}
161-
fmt.Println("收到替换信号,已替换替换的日志文件名:", w.f.Name())
162156
}
163157
}
164158
}
165159

166-
// rotate
160+
// rotate
167161
// 定时发送日志切割信号
168162
// 监听切割终止信号
169163
func (w *Writer) rotate() {
170164
for {
171165
t := time.NewTimer(w.nextTheTime.Sub(time.Now()))
172166
select {
173167
case <-w.tkStop:
168+
close(w.syncFileBuf) // 通知异步任务关闭
174169
return
175170
case <-t.C:
176171
// change nextTheTime
177-
fmt.Println("发送替换日志文件信号时间,NextTheTime前:", w.nextTheTime.Format(time.RFC3339)) // TODO: delete
178172
w.nextTheTime = NextTheTime(w.nextTheTime, w.tk)
179-
fmt.Println("发送替换日志文件信号时间,NextTheTime后:", w.nextTheTime.Format(time.RFC3339)) // TODO: delete
180173
// it's about time to repalce os.File.
181-
w.signal <- struct{}{}
174+
w.rotateSignal <- struct{}{}
182175
}
183176

184177
}
185178
}
179+
180+
func (w *Writer) syncSync() {
181+
for {
182+
select {
183+
case f, ok := <-w.syncFileBuf:
184+
if !ok {
185+
return
186+
}
187+
f.Sync()
188+
f.Close()
189+
}
190+
}
191+
}

write_test.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package zapper
22

33
import (
44
"errors"
5+
"fmt"
56
"path/filepath"
67
"testing"
78
"time"
@@ -22,11 +23,11 @@ func TestNewWriter(t *testing.T) {
2223

2324
syncLock := make(chan struct{}, 0)
2425
go func() {
25-
for i := 0; i <= 65; i++ {
26-
26+
for i := 0; i <= 240; i++ {
27+
err:=errors.New("text string001")
2728
Error("test cron zaper ",
2829
zap.Int("int", 10),
29-
zap.Error(errors.New("text string")),
30+
zap.Error(fmt.Errorf("text string: %w",err)),
3031
zap.String("msg", "val string1"),
3132
zap.String("msg", "val string2"),
3233
zap.String("msg", "val string3"),
@@ -51,4 +52,8 @@ func TestAbs(t *testing.T) {
5152
f,e:=filepath.Abs(s)
5253
println(f,e)
5354
}
55+
56+
f:="/Users/yymt/Documents/zaper/zapper2021-11-08 17:00:00.log"
57+
f2,e:=filepath.Abs(f)
58+
t.Log(f2,e)
5459
}

0 commit comments

Comments
 (0)