Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimistic mode #123

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions http-tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func InternalTrace(req *http.Request, resp *http.Response, reqTime, respTime tim
}

// Trace gets trace of http request
func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Request, endpoint string) TraceInfo {
func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Request, backend *Backend) TraceInfo {
// Setup a http request body recorder
reqHeaders := r.Header.Clone()
reqHeaders.Set("Host", r.Host)
Expand Down Expand Up @@ -270,7 +270,7 @@ func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Requ

t.ReqInfo = rq
t.RespInfo = rs
t.NodeName = endpoint
t.NodeName = backend.endpoint
t.CallStats = traceCallStats{
Latency: rs.Time.Sub(rw.StartTime),
Rx: reqBodyRecorder.Size(),
Expand All @@ -286,7 +286,7 @@ func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Requ

// Log only the headers.
func httpTraceHdrs(f http.HandlerFunc, w http.ResponseWriter, r *http.Request, backend *Backend) {
trace := Trace(f, false, w, r, backend.endpoint)
trace := Trace(f, false, w, r, backend)
doTrace(trace, backend)
}

Expand Down
161 changes: 107 additions & 54 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type logMessage struct {
// Endpoint of backend
Endpoint string `json:"Endpoint"`
// Error message
Error error `json:"Error,omitempty"`
Error string `json:"Error,omitempty"`
// Status of endpoint
Status string `json:"Status,omitempty"`
// Downtime so far
Expand All @@ -141,7 +141,7 @@ type logMessage struct {
}

func (l logMessage) String() string {
if l.Error == nil {
if l.Error == "" {
if l.DowntimeDuration > 0 {
return fmt.Sprintf("%s%2s: %s %s is %s Downtime duration: %s",
console.Colorize("LogMsgType", l.Type), "",
Expand All @@ -157,6 +157,7 @@ func (l logMessage) String() string {

// Backend entity to which requests gets load balanced.
type Backend struct {
ctxt context.Context
siteNumber int
endpoint string
proxy *reverse.Proxy
Expand All @@ -165,6 +166,7 @@ type Backend struct {
healthCheckURL string
healthCheckDuration time.Duration
healthCheckTimeout time.Duration
healthOptimistic bool
Stats *BackendStats
}

Expand All @@ -173,12 +175,58 @@ const (
online
)

func (b *Backend) setOffline() {
atomic.StoreInt32(&b.up, offline)
func (b *Backend) setOffline(msg string) (swapped bool) {
if atomic.SwapInt32(&b.up, offline) == offline {
return false
}

now := time.Now().UTC()

b.Stats.Lock()
b.Stats.DowntimeStart = now
b.Stats.UpSince = time.Time{}
b.Stats.Unlock()

if b.healthOptimistic {
go b.healthCheck(false)
}

if globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "down",
Error: msg,
})
}

return true
}

func (b *Backend) setOnline() {
atomic.StoreInt32(&b.up, online)
func (b *Backend) setOnline() (swapped bool) {
if atomic.SwapInt32(&b.up, online) == online {
return false
}
now := time.Now().UTC()

b.Stats.Lock()
b.Stats.UpSince = now
if !b.Stats.DowntimeStart.IsZero() {
downtime := now.Sub(b.Stats.DowntimeStart)
b.Stats.LastDowntime = downtime
b.Stats.CumDowntime += downtime
}
b.Stats.DowntimeStart = time.Time{}
b.Stats.Unlock()

if globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "up",
DowntimeDuration: b.Stats.LastDowntime,
})
}

return true
}

// Online returns true if backend is up
Expand Down Expand Up @@ -247,10 +295,7 @@ func (b *Backend) ErrorHandler(w http.ResponseWriter, r *http.Request, err error
}
}
if offline {
if globalLoggingEnabled {
logMsg(logMessage{Endpoint: b.endpoint, Status: "down", Error: err})
}
b.setOffline()
b.setOffline(err.Error())
}

writeErrorResponse(w, r, err)
Expand Down Expand Up @@ -312,18 +357,27 @@ func getHealthCheckURL(endpoint, healthCheckPath string, healthCheckPort int) (s
}

// healthCheck - background routine which checks if a backend is up or down.
func (b *Backend) healthCheck(ctxt context.Context) {
func (b *Backend) healthCheck(immediate bool) {
if immediate {
if err := b.doHealthCheck(); err != nil {
console.Errorln(err)
} else if b.healthOptimistic && b.Online() {
return
}
}

rng := rand.New(rand.NewSource(time.Now().UnixNano()))
timer := time.NewTimer(b.healthCheckDuration)
defer timer.Stop()
for {
select {
case <-ctxt.Done():
case <-b.ctxt.Done():
return
case <-timer.C:
err := b.doHealthCheck()
if err != nil {
if err := b.doHealthCheck(); err != nil {
console.Errorln(err)
} else if b.healthOptimistic && b.Online() {
return
}
// Add random jitter to call
timer.Reset(b.healthCheckDuration + time.Duration(rng.Int63n(int64(b.healthCheckDuration))))
Expand Down Expand Up @@ -353,33 +407,16 @@ func (b *Backend) doHealthCheck() error {
resp, err := b.httpClient.Do(req)
respTime := time.Now().UTC()
drainBody(resp)
if err != nil || (err == nil && resp.StatusCode != http.StatusOK) {
if globalLoggingEnabled && (!b.Online() || b.Stats.UpSince.IsZero()) {
logMsg(logMessage{Endpoint: b.endpoint, Status: "down", Error: err})
}
// observed an error, take the backend down.
b.setOffline()
if b.Stats.DowntimeStart.IsZero() {
b.Stats.DowntimeStart = time.Now().UTC()
}
} else {
var downtimeEnd time.Time
if !b.Stats.DowntimeStart.IsZero() {
now := time.Now().UTC()
b.updateDowntime(now.Sub(b.Stats.DowntimeStart))
downtimeEnd = now
}
if globalLoggingEnabled && !b.Online() && !b.Stats.UpSince.IsZero() {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "up",
DowntimeDuration: downtimeEnd.Sub(b.Stats.DowntimeStart),
})
}
b.Stats.UpSince = time.Now().UTC()
b.Stats.DowntimeStart = time.Time{}

switch {
case err != nil:
b.setOffline(err.Error())
case resp.StatusCode != http.StatusOK:
b.setOffline(fmt.Sprintf("response status %d", resp.StatusCode))
default:
b.setOnline()
}

if globalTrace != "application" {
if resp != nil {
traceHealthCheckReq(req, resp, reqTime, respTime, b, err)
Expand All @@ -389,13 +426,6 @@ func (b *Backend) doHealthCheck() error {
return nil
}

func (b *Backend) updateDowntime(downtime time.Duration) {
b.Stats.Lock()
defer b.Stats.Unlock()
b.Stats.LastDowntime = downtime
b.Stats.CumDowntime += downtime
}

// updateCallStats updates the cumulative stats for each call to backend
func (b *Backend) updateCallStats(t shortTraceMsg) {
b.Stats.Lock()
Expand Down Expand Up @@ -438,6 +468,7 @@ type healthCheckOptions struct {
healthCheckPort int
healthCheckDuration time.Duration
healthCheckTimeout time.Duration
healthOptimistic bool
}

func (m *multisite) renewSite(ctx *cli.Context, tlsMaxVersion uint16, opts healthCheckOptions) {
Expand Down Expand Up @@ -503,6 +534,14 @@ func (m *multisite) populate() {
minLatency = fmt.Sprintf("%2s", b.Stats.MinLatency.Round(time.Microsecond))
maxLatency = fmt.Sprintf("%2s", b.Stats.MaxLatency.Round(time.Microsecond))
}
cumDowntime := b.Stats.CumDowntime
lastDowntime := b.Stats.LastDowntime
if !b.Online() {
// show current downtime and cumulative downtime including
// the current downtime
lastDowntime = time.Now().UTC().Sub(b.Stats.DowntimeStart)
cumDowntime += lastDowntime
}
cellText[i*len(site.backends)+j+1] = []string{
humanize.Ordinal(b.siteNumber),
b.endpoint,
Expand All @@ -511,8 +550,8 @@ func (m *multisite) populate() {
strconv.FormatInt(b.Stats.TotCallFailures, 10),
humanize.IBytes(uint64(b.Stats.Rx)),
humanize.IBytes(uint64(b.Stats.Tx)),
b.Stats.CumDowntime.Round(time.Microsecond).String(),
b.Stats.LastDowntime.Round(time.Microsecond).String(),
cumDowntime.Round(time.Microsecond).String(),
lastDowntime.Round(time.Microsecond).String(),
minLatency,
maxLatency,
}
Expand Down Expand Up @@ -886,8 +925,8 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
var backends []*Backend
var prevScheme string
var transport http.RoundTripper
var connStats []*ConnStats
var hostName string

if len(endpoints) == 1 && ctx.GlobalBool("rr-dns-mode") {
console.Infof("RR DNS mode enabled, using %s as hostname", endpoints[0])
// guess it is LB config address
Expand All @@ -906,6 +945,15 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
endpoints = append(endpoints, strings.Replace(target.String(), hostName, ip, 1))
}
}

var connStats []*ConnStats
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
connStats = append(connStats, newConnStats(endpoint))
}
globalConnStats.Store(&connStats)

healthOptimistic := ctx.GlobalBool("health-optimistic")
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
target, err := url.Parse(endpoint)
Expand Down Expand Up @@ -956,15 +1004,13 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
if err != nil {
console.Fatalln(err)
}
backend := &Backend{siteNum, endpoint, proxy, &http.Client{
backend := &Backend{ctxt, siteNum, endpoint, proxy, &http.Client{
Transport: proxy.Transport,
}, 0, healthCheckURL, opts.healthCheckDuration, opts.healthCheckTimeout, &stats}
go backend.healthCheck(ctxt)
}, 0, healthCheckURL, opts.healthCheckDuration, opts.healthCheckTimeout, healthOptimistic, &stats}
proxy.ErrorHandler = backend.ErrorHandler
backends = append(backends, backend)
connStats = append(connStats, newConnStats(endpoint))
go backend.healthCheck(true)
}
globalConnStats.Store(&connStats)
return &site{
backends: backends,
}
Expand Down Expand Up @@ -993,6 +1039,7 @@ func sidekickMain(ctx *cli.Context) {
})
log2.SetReportCaller(true)

healthOptimistic := ctx.GlobalBool("health-optimistic")
healthCheckPath := ctx.GlobalString("health-path")
healthReadCheckPath := ctx.GlobalString("read-health-path")
healthCheckPort := ctx.GlobalInt("health-port")
Expand Down Expand Up @@ -1082,6 +1129,7 @@ func sidekickMain(ctx *cli.Context) {
healthCheckPort,
healthCheckDuration,
healthCheckTimeout,
healthOptimistic,
})
m.displayUI(!globalConsoleDisplay)

Expand Down Expand Up @@ -1179,6 +1227,7 @@ func sidekickMain(ctx *cli.Context) {
healthCheckPort,
healthCheckDuration,
healthCheckTimeout,
healthOptimistic,
})
default:
console.Infof("caught signal '%s'\n", signal)
Expand Down Expand Up @@ -1240,6 +1289,10 @@ func main() {
Name: "rr-dns-mode",
Usage: "enable round-robin DNS mode",
},
cli.BoolFlag{
Name: "health-optimistic",
Usage: "only perform health requests when nodes are down",
},
cli.StringFlag{
Name: "auto-tls-host",
Usage: "enable auto TLS mode for the specified host",
Expand Down
Loading