Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limited errors #195

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ require (
github.com/uber/jaeger-lib v1.5.1-0.20181102163054-1fc5c315e03c
github.com/weaveworks/promrus v1.2.0
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
gotest.tools v2.2.0+incompatible
)
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
Expand Down Expand Up @@ -458,6 +459,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down Expand Up @@ -521,6 +523,9 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v1.4.0 h1:BjtEgfuw8Qyd+jPvQz8CfoxiO/UjFEidWinwEXZiWv0=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
79 changes: 79 additions & 0 deletions logging/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package logging

import "golang.org/x/time/rate"

type rateLimitedLogger struct {
next Interface
limiter *rate.Limiter
}

// NewRateLimitedLogger returns a logger.Interface that is limited to a number
// of logs per second
func NewRateLimitedLogger(logger Interface, logsPerSecond rate.Limit) Interface {
return &rateLimitedLogger{
next: logger,
limiter: rate.NewLimiter(logsPerSecond, 1),
}
}

func (l *rateLimitedLogger) Debugf(format string, args ...interface{}) {
if l.limiter.Allow() {
l.next.Debugf(format, args...)
}
}

func (l *rateLimitedLogger) Debugln(args ...interface{}) {
if l.limiter.Allow() {
l.next.Debugln(args...)
}
}

func (l *rateLimitedLogger) Infof(format string, args ...interface{}) {
if l.limiter.Allow() {
l.next.Infof(format, args...)
}
}

func (l *rateLimitedLogger) Infoln(args ...interface{}) {
if l.limiter.Allow() {
l.next.Infoln(args...)
}
}

func (l *rateLimitedLogger) Errorf(format string, args ...interface{}) {
if l.limiter.Allow() {
l.next.Errorf(format, args...)
}
}

func (l *rateLimitedLogger) Errorln(args ...interface{}) {
if l.limiter.Allow() {
l.next.Errorln(args...)
}
}

func (l *rateLimitedLogger) Warnf(format string, args ...interface{}) {
if l.limiter.Allow() {
l.next.Warnf(format, args...)
}
}

func (l *rateLimitedLogger) Warnln(args ...interface{}) {
if l.limiter.Allow() {
l.next.Warnln(args...)
}
}

func (l *rateLimitedLogger) WithField(key string, value interface{}) Interface {
return &rateLimitedLogger{
next: l.next.WithField(key, value),
limiter: rate.NewLimiter(l.limiter.Limit(), 0),
}
}

func (l *rateLimitedLogger) WithFields(f Fields) Interface {
return &rateLimitedLogger{
next: l.next.WithFields(f),
limiter: rate.NewLimiter(l.limiter.Limit(), 0),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we get a new limiter here?
Also, does it work with a burst size of zero?

Copy link
Contributor Author

@joe-elliott joe-elliott Sep 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the burst limit was wrong. Good catch. Added a test and fixed.

As far as the new limiter I figured calling WithFields() created a new logger that should have an independent rate limit, but I can go either way on this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing is, code often calls WithFields() on every operation, e.g. the example you gave in the description, or this one:

entry := user.LogWith(ctx, s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)})

I expect the case you're trying to hit is where a high volume of logs from from lots of operations, so you would want the same rate-limit to be applied across them all. Try it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Thanks man 👍

Fixed.

}
}
43 changes: 43 additions & 0 deletions logging/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package logging

import (
"testing"

"gotest.tools/assert"
)

type counterLogger struct {
count int
}

func (c *counterLogger) Debugf(format string, args ...interface{}) { c.count++ }
func (c *counterLogger) Debugln(args ...interface{}) { c.count++ }
func (c *counterLogger) Infof(format string, args ...interface{}) { c.count++ }
func (c *counterLogger) Infoln(args ...interface{}) { c.count++ }
func (c *counterLogger) Warnf(format string, args ...interface{}) { c.count++ }
func (c *counterLogger) Warnln(args ...interface{}) { c.count++ }
func (c *counterLogger) Errorf(format string, args ...interface{}) { c.count++ }
func (c *counterLogger) Errorln(args ...interface{}) { c.count++ }
func (c *counterLogger) WithField(key string, value interface{}) Interface {
return c
}
func (c *counterLogger) WithFields(Fields) Interface {
return c
}

func TestRateLimitedLoggerLogs(t *testing.T) {
c := &counterLogger{}
r := NewRateLimitedLogger(c, 1)

r.Errorln("asdf")
assert.Equal(t, 1, c.count)
}

func TestRateLimitedLoggerLimits(t *testing.T) {
c := &counterLogger{}
r := NewRateLimitedLogger(c, 1)

r.Errorln("asdf")
r.Infoln("asdf")
assert.Equal(t, 1, c.count)
}
27 changes: 23 additions & 4 deletions middleware/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ import (

// Log middleware logs http requests
type Log struct {
Log logging.Interface
LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level
SourceIPs *SourceIPExtractor
Log logging.Interface
HighVolumeErrorLog logging.Interface
LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level
SourceIPs *SourceIPExtractor
}

// logWithRequest information from the request and context as fields.
func (l Log) logWithRequest(r *http.Request) logging.Interface {
return l.logWithRequestAndLog(r, l.Log)
}

func (l Log) logWithRequestAndLog(r *http.Request, logger logging.Interface) logging.Interface {
localLog := l.Log
traceID, ok := ExtractTraceID(r.Context())
if ok {
Expand Down Expand Up @@ -62,18 +67,32 @@ func (l Log) Wrap(next http.Handler) http.Handler {

return
}
if 100 <= statusCode && statusCode < 500 || statusCode == http.StatusBadGateway || statusCode == http.StatusServiceUnavailable {
if 100 <= statusCode && statusCode < 500 {
l.logWithRequest(r).Debugf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
if l.LogRequestHeaders && headers != nil {
l.logWithRequest(r).Debugf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers))
}
} else if statusCode == http.StatusBadGateway || statusCode == http.StatusServiceUnavailable {
if l.LogRequestHeaders {
l.logHighVolumeError(r, "%s %s (%d) %s %v %s", r.Method, uri, statusCode, time.Since(begin), IsWSHandshakeRequest(r), string(headers))
} else {
l.logHighVolumeError(r, "%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
}
} else {
l.logWithRequest(r).Warnf("%s %s (%d) %s Response: %q ws: %v; %s",
r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers)
}
})
}

func (l Log) logHighVolumeError(r *http.Request, format string, args ...interface{}) {
if l.HighVolumeErrorLog != nil {
l.logWithRequestAndLog(r, l.HighVolumeErrorLog).Warnf(format, args...)
} else {
l.logWithRequest(r).Debugf(format, args...)
}
}

// Logging middleware logs each HTTP request method, path, response code and
// duration for all HTTP requests.
var Logging = Log{
Expand Down
12 changes: 10 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
node_https "github.com/prometheus/node_exporter/https"
"golang.org/x/net/context"
"golang.org/x/net/netutil"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -79,6 +80,7 @@ type Config struct {
LogFormat logging.Format `yaml:"log_format"`
LogLevel logging.Level `yaml:"log_level"`
Log logging.Interface `yaml:"-"`
LogErrorRate int `yaml:"log_error_rate"`
LogSourceIPs bool `yaml:"log_source_ips_enabled"`
LogSourceIPsHeader string `yaml:"log_source_ips_header"`
LogSourceIPsRegex string `yaml:"log_source_ips_regex"`
Expand Down Expand Up @@ -170,6 +172,11 @@ func New(cfg Config) (*Server, error) {
log = logging.NewLogrus(cfg.LogLevel)
}

var highVolumeErrorLog logging.Interface
if cfg.LogErrorRate > 0 {
highVolumeErrorLog = logging.NewRateLimitedLogger(log, rate.Limit(cfg.LogErrorRate))
}

// Setup TLS
var httpTLSConfig *tls.Config
if len(cfg.HTTPTLSConfig.TLSCertPath) > 0 && len(cfg.HTTPTLSConfig.TLSKeyPath) > 0 {
Expand Down Expand Up @@ -292,8 +299,9 @@ func New(cfg Config) (*Server, error) {
SourceIPs: sourceIPs,
},
middleware.Log{
Log: log,
SourceIPs: sourceIPs,
Log: log,
HighVolumeErrorLog: highVolumeErrorLog,
SourceIPs: sourceIPs,
},
middleware.Instrument{
RouteMatcher: router,
Expand Down