Skip to content

Commit

Permalink
Merge pull request #403 from askuy/feature/slowlog20240625
Browse files Browse the repository at this point in the history
Feature/slowlog20240625
  • Loading branch information
askuy authored Jun 26, 2024
2 parents ed20329 + d7e3a72 commit 7c1de32
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 116 deletions.
25 changes: 15 additions & 10 deletions client/egrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ func (c *Container) loggerUnaryClientInterceptor() grpc.UnaryClientInterceptor {
cost := time.Since(beg)
spbStatus := ecode.Convert(err)
httpStatusCode := ecode.GrpcToHTTPStatusCode(spbStatus.Code())

event := "normal"
fields = append(fields,
elog.FieldType("unary"),
elog.FieldKey("unary"),
elog.FieldCode(int32(spbStatus.Code())),
elog.FieldUniformCode(int32(httpStatusCode)),
elog.FieldDescription(spbStatus.Message()),
Expand All @@ -342,12 +342,12 @@ func (c *Container) loggerUnaryClientInterceptor() grpc.UnaryClientInterceptor {
)

// 开启了链路,那么就记录链路id
if c.config.EnableTraceInterceptor && etrace.IsGlobalTracerRegistered() {
if etrace.IsGlobalTracerRegistered() {
fields = append(fields, elog.FieldTid(etrace.ExtractTraceID(ctx)))
}

if c.config.EnableAccessInterceptorReq {
var reqMap = map[string]interface{}{
var reqMap = map[string]any{
"payload": xstring.JSON(req),
}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
Expand All @@ -358,13 +358,14 @@ func (c *Container) loggerUnaryClientInterceptor() grpc.UnaryClientInterceptor {
if c.config.EnableAccessInterceptorRes {
fields = append(fields, elog.Any("res", json.RawMessage(xstring.JSON(res))))
}

isSlowLog := false
if c.config.SlowLogThreshold > time.Duration(0) && cost > c.config.SlowLogThreshold {
c.logger.Warn("slow", fields...)
event = "slow"
isSlowLog = true
}

if err != nil {
fields = append(fields, elog.FieldEvent("error"), elog.FieldErr(err))
fields = append(fields, elog.FieldEvent(event), elog.FieldErr(err))
// 只记录系统级别错误
if httpStatusCode >= http.StatusInternalServerError {
// 只记录系统级别错误
Expand All @@ -376,9 +377,13 @@ func (c *Container) loggerUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return err
}

if c.config.EnableAccessInterceptor {
fields = append(fields, elog.FieldEvent("normal"))
c.logger.Info("access", fields...)
if c.config.EnableAccessInterceptor || isSlowLog {
fields = append(fields, elog.FieldEvent(event))
if isSlowLog {
c.logger.Warn("access", fields...)
} else {
c.logger.Info("access", fields...)
}
}
return nil
}
Expand Down
26 changes: 17 additions & 9 deletions client/ehttp/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ func logAccess(name string, config *Config, logger *elog.Component, req *resty.R

loggerKeys := transport.CustomContextKeys()

var fields = make([]elog.Field, 0, 16)
var fields = make([]elog.Field, 0, 16+transport.CustomContextKeysLength())
fields = append(fields,
elog.FieldMethod(fullMethod),
elog.FieldName(name),
elog.FieldCost(cost),
elog.FieldAddr(u.Host),
)

event := "normal"

// 支持自定义log
for _, key := range loggerKeys {
if value := req.Context().Value(key); value != nil {
Expand All @@ -65,31 +67,33 @@ func logAccess(name string, config *Config, logger *elog.Component, req *resty.R
}

// 开启了链路,那么就记录链路id
if config.EnableTraceInterceptor && etrace.IsGlobalTracerRegistered() {
if etrace.IsGlobalTracerRegistered() {
fields = append(fields, elog.FieldTid(etrace.ExtractTraceID(req.Context())))
}
if config.EnableAccessInterceptor {
if config.EnableAccessInterceptorReq {
fields = append(fields, elog.Any("req", map[string]interface{}{
fields = append(fields, elog.Any("req", map[string]any{
"metadata": req.Header,
"payload": req.Body,
}))
}

if config.EnableAccessInterceptorRes {
fields = append(fields, elog.Any("res", map[string]interface{}{
fields = append(fields, elog.Any("res", map[string]any{
"metadata": res.Header(),
"payload": respBody,
}))
}
}

isSlowLog := false
if config.SlowLogThreshold > time.Duration(0) && cost > config.SlowLogThreshold {
logger.Warn("slow", fields...)
event = "slow"
isSlowLog = true
}

if err != nil {
fields = append(fields, elog.FieldEvent("error"), elog.FieldErr(err))
fields = append(fields, elog.FieldEvent(event), elog.FieldErr(err))
if res == nil {
// 无 res 的是连接超时等系统级错误
logger.Error("access", fields...)
Expand All @@ -99,9 +103,13 @@ func logAccess(name string, config *Config, logger *elog.Component, req *resty.R
return
}

if config.EnableAccessInterceptor {
fields = append(fields, elog.FieldEvent("normal"))
logger.Info("access", fields...)
if config.EnableAccessInterceptor || isSlowLog {
fields = append(fields, elog.FieldEvent(event))
if isSlowLog {
logger.Warn("access", fields...)
} else {
logger.Info("access", fields...)
}
}
}

Expand Down
28 changes: 16 additions & 12 deletions server/egin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ type Config struct {
SlowLogThreshold time.Duration // 服务慢日志,默认500ms
EnableAccessInterceptor bool // 是否开启,记录请求数据
EnableAccessInterceptorReq bool // 是否开启记录请求参数,默认不开启
AccessInterceptorReqMaxLength int // 默认4K
EnableAccessInterceptorRes bool // 是否开启记录响应参数,默认不开启
AccessInterceptorResMaxLength int // 默认4K
AccessInterceptorReqResFilter string // AccessInterceptorReq 过滤器,只有符合过滤器的请求才会记录 Req 和 Res
EnableTrustedCustomHeader bool // 是否开启自定义header头,记录数据往链路后传递,默认不开启
EnableSentinel bool // 是否开启限流,默认不开启
Expand Down Expand Up @@ -63,18 +65,20 @@ type Config struct {
// DefaultConfig ...
func DefaultConfig() *Config {
return &Config{
Host: eflag.String("host"),
Port: 9090,
Mode: gin.ReleaseMode,
Network: "tcp",
EnableAccessInterceptor: true,
EnableTraceInterceptor: true,
EnableMetricInterceptor: true,
EnableSentinel: true,
SlowLogThreshold: xtime.Duration("500ms"),
EnableWebsocketCheckOrigin: false,
TrustedPlatform: "",
recoveryFunc: defaultRecoveryFunc,
Host: eflag.String("host"),
Port: 9090,
Mode: gin.ReleaseMode,
Network: "tcp",
EnableAccessInterceptor: true,
EnableTraceInterceptor: true,
EnableMetricInterceptor: true,
AccessInterceptorReqMaxLength: 4096,
AccessInterceptorResMaxLength: 4096,
EnableSentinel: true,
SlowLogThreshold: xtime.Duration("500ms"),
EnableWebsocketCheckOrigin: false,
TrustedPlatform: "",
recoveryFunc: defaultRecoveryFunc,
}
}

Expand Down
73 changes: 47 additions & 26 deletions server/egin/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,14 @@ func (c *Container) defaultServerInterceptor() gin.HandlerFunc {
defer func() {
cost := time.Since(beg)
fields = append(fields,
elog.FieldType("http"), // GET, POST
elog.FieldKey(ctx.Request.Method), // GET, POST
elog.FieldCost(cost),
elog.FieldMethod(ctx.Request.Method+"."+ctx.FullPath()),
elog.FieldAddr(ctx.Request.URL.RequestURI()),
elog.FieldIP(ctx.ClientIP()),
elog.FieldSize(int32(ctx.Writer.Size())),
elog.FieldPeerIP(getPeerIP(ctx.Request.RemoteAddr)),
// elog.FieldCode(int32(ctx.Writer.Status())),
// elog.FieldUniformCode(int32(ctx.Writer.Status())),
elog.FieldPeerName(getPeerName(ctx)),
)

for _, key := range loggerKeys {
Expand All @@ -161,39 +160,50 @@ func (c *Container) defaultServerInterceptor() gin.HandlerFunc {
}
}

if c.config.EnableTraceInterceptor && etrace.IsGlobalTracerRegistered() {
if etrace.IsGlobalTracerRegistered() {
fields = append(fields, elog.FieldTid(etrace.ExtractTraceID(ctx.Request.Context())))
}

c.config.mu.RLock()
if c.config.EnableAccessInterceptorReq || c.config.EnableAccessInterceptorRes {
out := c.checkFilter(ctx.Request, rw)

if c.config.EnableAccessInterceptorReq && out {
fields = append(fields, elog.Any("req", map[string]interface{}{
"metadata": copyHeaders(ctx.Request.Header),
"payload": rb.String(),
}))
if len(rb.String()) > c.config.AccessInterceptorReqMaxLength {
fields = append(fields, elog.Any("req", map[string]interface{}{
"metadata": copyHeaders(ctx.Request.Header),
"payload": rb.String()[:c.config.AccessInterceptorReqMaxLength] + "...",
}))
} else {
fields = append(fields, elog.Any("req", map[string]interface{}{
"metadata": copyHeaders(ctx.Request.Header),
"payload": rb.String(),
}))
}
}

if c.config.EnableAccessInterceptorRes && out {
fields = append(fields, elog.Any("res", map[string]interface{}{
"metadata": copyHeaders(ctx.Writer.Header()),
"payload": rw.body.String(),
}))
if len(rw.body.String()) > c.config.AccessInterceptorResMaxLength {
fields = append(fields, elog.Any("res", map[string]interface{}{
"metadata": copyHeaders(ctx.Request.Header),
"payload": rw.body.String()[:c.config.AccessInterceptorResMaxLength] + "...",
}))
} else {
fields = append(fields, elog.Any("res", map[string]interface{}{
"metadata": copyHeaders(ctx.Writer.Header()),
"payload": rw.body.String(),
}))
}
}
}
c.config.mu.RUnlock()

// slow log
isSlowLog := false
if c.config.SlowLogThreshold > time.Duration(0) && c.config.SlowLogThreshold < cost {
// 非长连接模式下,记入warn慢日志
if ctx.GetHeader("Accept") != "text/event-stream" {
// 最后添加状态码
fields = append(fields,
elog.FieldCode(int32(ctx.Writer.Status())),
elog.FieldUniformCode(int32(ctx.Writer.Status())),
)
c.logger.Warn("slow", fields...)
isSlowLog = true
event = "slow"
}
}

Expand All @@ -219,9 +229,6 @@ func (c *Container) defaultServerInterceptor() gin.HandlerFunc {
c.config.recoveryFunc(ctx, rec)
}

// 上面BrokenPipe使用的是用户ctx.Writer.Status()
// 如果不是BrokenPipe,那么会将Writer.Status()设置为500
event = "recover"
stackInfo := stack(3)
fields = append(fields,
elog.FieldEvent(event),
Expand All @@ -231,20 +238,29 @@ func (c *Container) defaultServerInterceptor() gin.HandlerFunc {
elog.FieldUniformCode(int32(ctx.Writer.Status())),
)
c.metricServerInterceptor(ctx, cost)
c.logger.Error("access", fields...)
// broken pipe 是warning
if brokenPipe {
c.logger.Warn("access", fields...)
} else {
c.logger.Error("access", fields...)
}
return
}
// todo 如果不记录日志的时候,应该早点return
if c.config.EnableAccessInterceptor {
if c.config.EnableAccessInterceptor || isSlowLog {
fields = append(fields,
elog.FieldEvent(event),
elog.FieldErrAny(ctx.Errors.ByType(gin.ErrorTypePrivate).String()),
elog.FieldCode(int32(ctx.Writer.Status())),
elog.FieldUniformCode(int32(ctx.Writer.Status())),
)
c.metricServerInterceptor(ctx, cost)
c.logger.Info("access", fields...)
if isSlowLog {
c.logger.Warn("access", fields...)
} else {
c.logger.Info("access", fields...)
}
}
c.metricServerInterceptor(ctx, cost)
}()
ctx.Next()
}
Expand Down Expand Up @@ -405,6 +421,11 @@ func getPeerIP(addr string) string {
return ""
}

func getPeerName(c *gin.Context) string {
value := c.GetHeader("app")
return value
}

func getHeaderValue(c *gin.Context, key string, enableTrustedCustomHeader bool) string {
if key == "" {
return ""
Expand Down
Loading

0 comments on commit 7c1de32

Please sign in to comment.