Skip to content

Commit

Permalink
refactored and improved stats
Browse files Browse the repository at this point in the history
  • Loading branch information
ramondeklein committed Dec 10, 2024
1 parent e3142f8 commit 93ab282
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 48 deletions.
14 changes: 0 additions & 14 deletions http-tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,6 @@ func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Requ
rw.LogBody = logBody
f(rw, r)

if backend.healthOptimistic {
// when running in optimistic mode the node will be taken out
// of the pool when it returns one of the following HTTP
// response status:
// - 502 Bad gateway (i.e. can't connect to remote)
// - 503 Service unavailable
// - 504 Gateway timeout
if rw.StatusCode == http.StatusBadGateway || rw.StatusCode == http.StatusServiceUnavailable || rw.StatusCode == http.StatusGatewayTimeout {
if backend.setOffline() {
go backend.healthCheck()
}
}
}

rq := traceRequestInfo{
Time: time.Now().UTC(),
Method: r.Method,
Expand Down
98 changes: 64 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type logMessage struct {
// Endpoint of backend
Endpoint string `json:"Endpoint"`
// Error message
Error error `json:"Error,omitempty"`
Error string `json:"Error,omitempty"`
// Status of endpoint
Status string `json:"Status,omitempty"`
// Downtime so far
Expand All @@ -141,7 +141,7 @@ type logMessage struct {
}

func (l logMessage) String() string {
if l.Error == nil {
if l.Error == "" {
if l.DowntimeDuration > 0 {
return fmt.Sprintf("%s%2s: %s %s is %s Downtime duration: %s",
console.Colorize("LogMsgType", l.Type), "",
Expand Down Expand Up @@ -175,18 +175,30 @@ const (
online
)

func (b *Backend) setOffline() (swapped bool) {
func (b *Backend) setOffline(msg string) (swapped bool) {
if atomic.SwapInt32(&b.up, offline) == offline {
return false
}

now := time.Now().UTC()

b.Stats.Lock()
defer b.Stats.Unlock()

b.Stats.DowntimeStart = now
b.Stats.UpSince = time.Time{}
b.Stats.Unlock()

if b.healthOptimistic {
go b.healthCheck(false)
}

if globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "down",
Error: msg,
})
}

return true
}

Expand All @@ -197,15 +209,22 @@ func (b *Backend) setOnline() (swapped bool) {
now := time.Now().UTC()

b.Stats.Lock()
defer b.Stats.Unlock()

b.Stats.UpSince = now
if !b.Stats.DowntimeStart.IsZero() {
downtime := now.Sub(b.Stats.DowntimeStart)
b.Stats.LastDowntime = downtime
b.Stats.CumDowntime += downtime
}
b.Stats.DowntimeStart = time.Time{}
b.Stats.Unlock()

if globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "up",
DowntimeDuration: b.Stats.LastDowntime,
})
}

return true
}
Expand Down Expand Up @@ -276,10 +295,7 @@ func (b *Backend) ErrorHandler(w http.ResponseWriter, r *http.Request, err error
}
}
if offline {
if globalLoggingEnabled {
logMsg(logMessage{Endpoint: b.endpoint, Status: "down", Error: err})
}
b.setOffline()
b.setOffline(err.Error())
}

writeErrorResponse(w, r, err)
Expand Down Expand Up @@ -341,7 +357,15 @@ func getHealthCheckURL(endpoint, healthCheckPath string, healthCheckPort int) (s
}

// healthCheck - background routine which checks if a backend is up or down.
func (b *Backend) healthCheck() {
func (b *Backend) healthCheck(immediate bool) {
if immediate {
if err := b.doHealthCheck(); err != nil {
console.Errorln(err)
} else if b.healthOptimistic && b.Online() {
return
}
}

rng := rand.New(rand.NewSource(time.Now().UnixNano()))
timer := time.NewTimer(b.healthCheckDuration)
defer timer.Stop()
Expand Down Expand Up @@ -383,23 +407,16 @@ func (b *Backend) doHealthCheck() error {
resp, err := b.httpClient.Do(req)
respTime := time.Now().UTC()
drainBody(resp)
if err != nil || resp.StatusCode != http.StatusOK {
if b.setOffline() && globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "down",
Error: err,
})
}
} else {
if b.setOnline() && globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "up",
DowntimeDuration: b.Stats.LastDowntime,
})
}

switch {
case err != nil:
b.setOffline(err.Error())
case resp.StatusCode != http.StatusOK:
b.setOffline(fmt.Sprintf("response status %d", resp.StatusCode))
default:
b.setOnline()
}

if globalTrace != "application" {
if resp != nil {
traceHealthCheckReq(req, resp, reqTime, respTime, b, err)
Expand Down Expand Up @@ -517,6 +534,14 @@ func (m *multisite) populate() {
minLatency = fmt.Sprintf("%2s", b.Stats.MinLatency.Round(time.Microsecond))
maxLatency = fmt.Sprintf("%2s", b.Stats.MaxLatency.Round(time.Microsecond))
}
cumDowntime := b.Stats.CumDowntime
lastDowntime := b.Stats.LastDowntime
if !b.Online() {
// show current downtime and cumulative downtime including
// the current downtime
lastDowntime = time.Now().UTC().Sub(b.Stats.DowntimeStart)
cumDowntime += lastDowntime
}
cellText[i*len(site.backends)+j+1] = []string{
humanize.Ordinal(b.siteNumber),
b.endpoint,
Expand All @@ -525,8 +550,8 @@ func (m *multisite) populate() {
strconv.FormatInt(b.Stats.TotCallFailures, 10),
humanize.IBytes(uint64(b.Stats.Rx)),
humanize.IBytes(uint64(b.Stats.Tx)),
b.Stats.CumDowntime.Round(time.Microsecond).String(),
b.Stats.LastDowntime.Round(time.Microsecond).String(),
cumDowntime.Round(time.Microsecond).String(),
lastDowntime.Round(time.Microsecond).String(),
minLatency,
maxLatency,
}
Expand Down Expand Up @@ -900,8 +925,8 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
var backends []*Backend
var prevScheme string
var transport http.RoundTripper
var connStats []*ConnStats
var hostName string

if len(endpoints) == 1 && ctx.GlobalBool("rr-dns-mode") {
console.Infof("RR DNS mode enabled, using %s as hostname", endpoints[0])
// guess it is LB config address
Expand All @@ -921,6 +946,13 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
}
}

var connStats []*ConnStats
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
connStats = append(connStats, newConnStats(endpoint))
}
globalConnStats.Store(&connStats)

healthOptimistic := ctx.GlobalBool("health-optimistic")
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
Expand Down Expand Up @@ -975,12 +1007,10 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
backend := &Backend{ctxt, siteNum, endpoint, proxy, &http.Client{
Transport: proxy.Transport,
}, 0, healthCheckURL, opts.healthCheckDuration, opts.healthCheckTimeout, healthOptimistic, &stats}
go backend.healthCheck()
proxy.ErrorHandler = backend.ErrorHandler
backends = append(backends, backend)
connStats = append(connStats, newConnStats(endpoint))
go backend.healthCheck(true)
}
globalConnStats.Store(&connStats)
return &site{
backends: backends,
}
Expand Down

0 comments on commit 93ab282

Please sign in to comment.