-
Notifications
You must be signed in to change notification settings - Fork 351
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
skipper: add server connection keepalive limits (#3246)
Clients may connect to a subset of Skipper fleet which leads to uneven request distribution and increased cpu usage. Autoscaling of Skipper fleet is not effective because clients stay connected to old instances while new instances are underutilized. This change adds ConnManager that tracks creation of new connections and closes connections when their age or number of requests served reaches configured limits. Signed-off-by: Alexander Yastrebov <[email protected]>
- Loading branch information
1 parent
85937e6
commit ce501f8
Showing
5 changed files
with
255 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package net | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/zalando/skipper/metrics" | ||
) | ||
|
||
// ConnManager tracks creation of HTTP server connections and | ||
// closes connections when their age or number of requests served reaches configured limits. | ||
// Use [ConnManager.Configure] method to setup ConnManager for an [http.Server]. | ||
type ConnManager struct { | ||
// Metrics is an optional metrics registry to count connection events. | ||
Metrics metrics.Metrics | ||
|
||
// Keepalive is the duration after which server connection is closed. | ||
Keepalive time.Duration | ||
|
||
// KeepaliveRequests is the number of requests after which server connection is closed. | ||
KeepaliveRequests int | ||
|
||
handler http.Handler | ||
} | ||
|
||
type connState struct { | ||
expiresAt time.Time | ||
requests int | ||
} | ||
|
||
type contextKey struct{} | ||
|
||
var connection contextKey | ||
|
||
func (cm *ConnManager) Configure(server *http.Server) { | ||
cm.handler = server.Handler | ||
server.Handler = http.HandlerFunc(cm.serveHTTP) | ||
|
||
if cc := server.ConnContext; cc != nil { | ||
server.ConnContext = func(ctx context.Context, c net.Conn) context.Context { | ||
ctx = cc(ctx, c) | ||
return cm.connContext(ctx, c) | ||
} | ||
} else { | ||
server.ConnContext = cm.connContext | ||
} | ||
|
||
if cs := server.ConnState; cs != nil { | ||
server.ConnState = func(c net.Conn, state http.ConnState) { | ||
cs(c, state) | ||
cm.connState(c, state) | ||
} | ||
} else { | ||
server.ConnState = cm.connState | ||
} | ||
} | ||
|
||
func (cm *ConnManager) serveHTTP(w http.ResponseWriter, r *http.Request) { | ||
state, _ := r.Context().Value(connection).(*connState) | ||
state.requests++ | ||
|
||
if cm.KeepaliveRequests > 0 && state.requests >= cm.KeepaliveRequests { | ||
w.Header().Set("Connection", "close") | ||
|
||
cm.count("lb-conn-closed.keepalive-requests") | ||
} | ||
|
||
if cm.Keepalive > 0 && time.Now().After(state.expiresAt) { | ||
w.Header().Set("Connection", "close") | ||
|
||
cm.count("lb-conn-closed.keepalive") | ||
} | ||
|
||
cm.handler.ServeHTTP(w, r) | ||
} | ||
|
||
func (cm *ConnManager) connContext(ctx context.Context, _ net.Conn) context.Context { | ||
state := &connState{ | ||
expiresAt: time.Now().Add(cm.Keepalive), | ||
} | ||
return context.WithValue(ctx, connection, state) | ||
} | ||
|
||
func (cm *ConnManager) connState(_ net.Conn, state http.ConnState) { | ||
cm.count(fmt.Sprintf("lb-conn-%s", state)) | ||
} | ||
|
||
func (cm *ConnManager) count(name string) { | ||
if cm.Metrics != nil { | ||
cm.Metrics.IncCounter(name) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package net_test | ||
|
||
import ( | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/zalando/skipper/metrics/metricstest" | ||
snet "github.com/zalando/skipper/net" | ||
) | ||
|
||
func TestConnManager(t *testing.T) { | ||
const ( | ||
keepaliveRequests = 3 | ||
keepalive = 100 * time.Millisecond | ||
|
||
testRequests = keepaliveRequests * 5 | ||
) | ||
t.Run("does not close connection without limits", func(t *testing.T) { | ||
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusOK) | ||
})) | ||
m := &metricstest.MockMetrics{} | ||
cm := &snet.ConnManager{ | ||
Metrics: m, | ||
} | ||
cm.Configure(ts.Config) | ||
|
||
ts.Start() | ||
defer ts.Close() | ||
|
||
for i := 0; i < testRequests; i++ { | ||
resp, err := ts.Client().Get(ts.URL) | ||
require.NoError(t, err) | ||
assert.Equal(t, http.StatusOK, resp.StatusCode) | ||
assert.False(t, resp.Close) | ||
} | ||
|
||
time.Sleep(100 * time.Millisecond) // wait for connection state update | ||
|
||
m.WithCounters(func(counters map[string]int64) { | ||
assert.Equal(t, int64(1), counters["lb-conn-new"]) | ||
assert.Equal(t, int64(testRequests), counters["lb-conn-active"]) | ||
assert.Equal(t, int64(testRequests), counters["lb-conn-idle"]) | ||
assert.Equal(t, int64(0), counters["lb-conn-closed"]) | ||
}) | ||
}) | ||
t.Run("closes connection after keepalive requests", func(t *testing.T) { | ||
const keepaliveRequests = 3 | ||
|
||
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusOK) | ||
})) | ||
m := &metricstest.MockMetrics{} | ||
cm := &snet.ConnManager{ | ||
Metrics: m, | ||
KeepaliveRequests: keepaliveRequests, | ||
} | ||
cm.Configure(ts.Config) | ||
|
||
ts.Start() | ||
defer ts.Close() | ||
|
||
for i := 1; i < testRequests; i++ { | ||
resp, err := ts.Client().Get(ts.URL) | ||
require.NoError(t, err) | ||
assert.Equal(t, http.StatusOK, resp.StatusCode) | ||
|
||
if i%keepaliveRequests == 0 { | ||
assert.True(t, resp.Close) | ||
} else { | ||
assert.False(t, resp.Close) | ||
} | ||
} | ||
|
||
time.Sleep(100 * time.Millisecond) // wait for connection state update | ||
|
||
m.WithCounters(func(counters map[string]int64) { | ||
rounds := int64(testRequests / keepaliveRequests) | ||
|
||
assert.Equal(t, rounds, counters["lb-conn-new"]) | ||
assert.Equal(t, rounds-1, counters["lb-conn-closed"]) | ||
assert.Equal(t, rounds-1, counters["lb-conn-closed.keepalive-requests"]) | ||
}) | ||
}) | ||
|
||
t.Run("closes connection after keepalive timeout", func(t *testing.T) { | ||
const keepalive = 100 * time.Millisecond | ||
|
||
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusOK) | ||
})) | ||
m := &metricstest.MockMetrics{} | ||
cm := &snet.ConnManager{ | ||
Metrics: m, | ||
Keepalive: keepalive, | ||
} | ||
cm.Configure(ts.Config) | ||
|
||
ts.Start() | ||
defer ts.Close() | ||
|
||
for i := 0; i < testRequests; i++ { | ||
resp, err := ts.Client().Get(ts.URL) | ||
require.NoError(t, err) | ||
assert.Equal(t, http.StatusOK, resp.StatusCode) | ||
assert.False(t, resp.Close) | ||
} | ||
|
||
time.Sleep(2 * keepalive) | ||
|
||
resp, err := ts.Client().Get(ts.URL) | ||
require.NoError(t, err) | ||
assert.Equal(t, http.StatusOK, resp.StatusCode) | ||
assert.True(t, resp.Close) | ||
|
||
time.Sleep(100 * time.Millisecond) // wait for connection state update | ||
|
||
m.WithCounters(func(counters map[string]int64) { | ||
assert.Equal(t, int64(1), counters["lb-conn-new"]) | ||
assert.Equal(t, int64(1), counters["lb-conn-closed"]) | ||
assert.Equal(t, int64(1), counters["lb-conn-closed.keepalive"]) | ||
}) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters