Skip to content

Commit

Permalink
fix(metrics): Fix model label metric in case of experiment (#6118)
Browse files Browse the repository at this point in the history
* use internal model header to compute model label

* add test for rest

* add test for grpc rproxy

* adjust log message
  • Loading branch information
sakoush committed Dec 2, 2024
1 parent 3d9cb60 commit 207e057
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 44 deletions.
18 changes: 16 additions & 2 deletions scheduler/pkg/agent/rproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,15 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e
var originalBody []byte
var err error

externalModelName := req.Header.Get(resources.SeldonModelHeader)
internalModelName := req.Header.Get(resources.SeldonInternalModelHeader)
// externalModelName is the name of the model as it is known to the client, we should not use
// resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
// however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
if err != nil {
t.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName)
externalModelName = req.Header.Get(resources.SeldonModelHeader)
}

// to sync between scalingMetricsSetup and scalingMetricsTearDown calls running in go routines
var wg sync.WaitGroup
Expand Down Expand Up @@ -139,8 +146,15 @@ func (rp *reverseHTTPProxy) addHandlers(proxy http.Handler) http.Handler {
rp.logger.Debugf("Received request with host %s and internal header %v", r.Host, r.Header.Values(resources.SeldonInternalModelHeader))
rewriteHostHandler(r)

externalModelName := r.Header.Get(resources.SeldonModelHeader)
internalModelName := r.Header.Get(resources.SeldonInternalModelHeader)
// externalModelName is the name of the model as it is known to the client, we should not use
// resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
// however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
if err != nil {
rp.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName)
externalModelName = r.Header.Get(resources.SeldonModelHeader)
}

//TODO should we return a 404 if headers not found?
if externalModelName == "" || internalModelName == "" {
Expand Down
5 changes: 4 additions & 1 deletion scheduler/pkg/agent/rproxy_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ func extractModelNamesFromHeaders(ctx context.Context) (string, string, bool) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
internalModelName := extractHeader(resources.SeldonInternalModelHeader, md)
externalModelName := extractHeader(resources.SeldonModelHeader, md)
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
if err != nil {
externalModelName = extractHeader(resources.SeldonModelHeader, md)
}
return internalModelName, externalModelName, internalModelName != "" && externalModelName != ""
}
return "", "", false
Expand Down
30 changes: 19 additions & 11 deletions scheduler/pkg/agent/rproxy_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)

func setupReverseGRPCService(numModels int, modelPrefix string, backEndGRPCPort, rpPort, backEndServerPort int) *reverseGRPCProxy {
func setupReverseGRPCService(numModels int, modelPrefix string, backEndGRPCPort, rpPort, backEndServerPort int, metricsHandler metrics.AgentMetricsHandler) *reverseGRPCProxy {
logger := log.New()
log.SetLevel(log.DebugLevel)

Expand All @@ -41,7 +42,7 @@ func setupReverseGRPCService(numModels int, modelPrefix string, backEndGRPCPort,
modelscaling.NewModelReplicaLagsKeeper(), modelscaling.NewModelReplicaLastUsedKeeper(),
)
rp := NewReverseGRPCProxy(
newFakeMetricsHandler(),
metricsHandler,
logger,
"localhost",
uint(backEndGRPCPort),
Expand Down Expand Up @@ -95,7 +96,8 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rpGRPC := setupReverseGRPCService(10, dummyModelNamePrefix, backEndGRPCPort, rpPort, serverPort)
fakeMetricsHandler := newFakeMetricsHandler()
rpGRPC := setupReverseGRPCService(10, dummyModelNamePrefix, backEndGRPCPort, rpPort, serverPort, fakeMetricsHandler)
_ = rpGRPC.Start()

t.Log("Testing model found")
Expand All @@ -115,28 +117,28 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
}
defer conn.Close()

doInfer := func(modelSuffix string) (*v2.ModelInferResponse, error) {
doInfer := func(modelSuffixInternal, modelSuffix string) (*v2.ModelInferResponse, error) {
client := v2.NewGRPCInferenceServiceClient(conn)
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffixInternal, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
return client.ModelInfer(ctx, &v2.ModelInferRequest{ModelName: dummyModelNamePrefix}) // note without suffix
}

doMeta := func(modelSuffix string) (*v2.ModelMetadataResponse, error) {
client := v2.NewGRPCInferenceServiceClient(conn)
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix)
return client.ModelMetadata(ctx, &v2.ModelMetadataRequest{Name: dummyModelNamePrefix}) // note without suffix
}

doModelReady := func(modelSuffix string) (*v2.ModelReadyResponse, error) {
client := v2.NewGRPCInferenceServiceClient(conn)
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix)
return client.ModelReady(ctx, &v2.ModelReadyRequest{Name: dummyModelNamePrefix}) // note without suffix
}

responseInfer, errInfer := doInfer("_0")
responseInfer, errInfer := doInfer("_0", ".experiment")
g.Expect(errInfer).To(BeNil())
g.Expect(responseInfer.ModelName).To(Equal(dummyModelNamePrefix + "_0"))
g.Expect(responseInfer.ModelVersion).To(Equal("")) // in practice this should be something else
Expand All @@ -145,6 +147,11 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
g.Expect(rpGRPC.modelScalingStatsCollector.ModelLagStats.Get(dummyModelNamePrefix + "_0")).To(Equal(uint32(0)))
g.Expect(rpGRPC.modelScalingStatsCollector.ModelLastUsedStats.Get(dummyModelNamePrefix + "_0")).Should(BeNumerically("<=", time.Now().Unix())) // only triggered when we get results back

t.Log("Testing model infer metrics")
g.Expect(fakeMetricsHandler.modelInferState[dummyModelNamePrefix].internalModelName).To(Equal(dummyModelNamePrefix + "_0"))
g.Expect(fakeMetricsHandler.modelInferState[dummyModelNamePrefix].method).To(Equal("grpc"))
g.Expect(fakeMetricsHandler.modelInferState[dummyModelNamePrefix].code).To(Equal("OK")) // note it is not 200 for grpc, should we change this?

responseMeta, errMeta := doMeta("_0")
g.Expect(responseMeta.Name).To(Equal(dummyModelNamePrefix + "_0"))
g.Expect(responseMeta.Versions).To(Equal([]string{""})) // in practice this should be something else
Expand All @@ -157,13 +164,13 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {

t.Log("Testing lazy load")
mockMLServerState.setModelServerUnloaded(dummyModelNamePrefix + "_0")
responseInfer, errInfer = doInfer("_0")
responseInfer, errInfer = doInfer("_0", "")
g.Expect(errInfer).To(BeNil())
g.Expect(responseInfer.ModelName).To(Equal(dummyModelNamePrefix + "_0"))
g.Expect(responseInfer.ModelVersion).To(Equal("")) // in practice this should be something else

t.Log("Testing model not found")
_, errInfer = doInfer("_1")
_, errInfer = doInfer("_1", "")
g.Expect(errInfer).NotTo(BeNil())
g.Expect(mockMLServerState.isModelLoaded(dummyModelNamePrefix + "_1")).To(Equal(false))

Expand All @@ -184,7 +191,8 @@ func TestReverseGRPCServiceEarlyStop(t *testing.T) {

dummyModelNamePrefix := "dummy_model"

rpGRPC := setupReverseGRPCService(0, dummyModelNamePrefix, 1, 1, 1)
fakeMetricsHandler := newFakeMetricsHandler()
rpGRPC := setupReverseGRPCService(0, dummyModelNamePrefix, 1, 1, 1, fakeMetricsHandler)
err := rpGRPC.Stop()
g.Expect(err).To(BeNil())
ready := rpGRPC.Ready()
Expand Down
103 changes: 73 additions & 30 deletions scheduler/pkg/agent/rproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)

Expand Down Expand Up @@ -98,9 +99,16 @@ type loadModelSateValue struct {
isSoft bool
}

type inferModelSateValue struct {
internalModelName string
method string
code string
}

type fakeMetricsHandler struct {
modelLoadState map[string]loadModelSateValue
mu *sync.Mutex
modelLoadState map[string]loadModelSateValue
modelInferState map[string]inferModelSateValue
mu *sync.Mutex
}

func (f fakeMetricsHandler) AddModelHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc {
Expand All @@ -112,6 +120,14 @@ func (f fakeMetricsHandler) HttpCodeToString(code int) string {
}

func (f fakeMetricsHandler) AddModelInferMetrics(externalModelName string, internalModelName string, method string, elapsedTime float64, code string) {
f.mu.Lock()
defer f.mu.Unlock()

f.modelInferState[externalModelName] = inferModelSateValue{
internalModelName: internalModelName,
method: method,
code: code,
}
}

func (f fakeMetricsHandler) AddLoadedModelMetrics(internalModelName string, memory uint64, isLoad, isSoft bool) {
Expand All @@ -130,8 +146,9 @@ func (f fakeMetricsHandler) AddServerReplicaMetrics(memory uint64, memoryWithOve

func newFakeMetricsHandler() fakeMetricsHandler {
return fakeMetricsHandler{
modelLoadState: map[string]loadModelSateValue{},
mu: &sync.Mutex{},
modelLoadState: map[string]loadModelSateValue{},
modelInferState: map[string]inferModelSateValue{},
mu: &sync.Mutex{},
}
}

Expand All @@ -141,7 +158,7 @@ func (f fakeMetricsHandler) UnaryServerInterceptor() func(ctx context.Context, r
}
}

func setupReverseProxy(logger log.FieldLogger, numModels int, modelPrefix string, rpPort, serverPort int) *reverseHTTPProxy {
func setupReverseProxy(logger log.FieldLogger, numModels int, modelPrefix string, rpPort, serverPort int, metricsHandler metrics.AgentMetricsHandler) *reverseHTTPProxy {
v2Client := testing_utils.NewV2RestClientForTest("localhost", serverPort, logger)
localCacheManager := setupLocalTestManager(numModels, modelPrefix, v2Client, numModels-2, 1)
modelScalingStatsCollector := modelscaling.NewDataPlaneStatsCollector(
Expand All @@ -153,7 +170,7 @@ func setupReverseProxy(logger log.FieldLogger, numModels int, modelPrefix string
"localhost",
uint(serverPort),
uint(rpPort),
fakeMetricsHandler{},
metricsHandler,
modelScalingStatsCollector,
)
rp.SetState(localCacheManager)
Expand All @@ -166,34 +183,51 @@ func TestReverseProxySmoke(t *testing.T) {
logger.SetLevel(log.DebugLevel)

type test struct {
name string
modelToLoad string
modelToRequest string
statusCode int
isLoadedonServer bool
name string
modelToLoad string
modelToRequest string
modelExternalHeader string
expectedModelExternalTag string
statusCode int
isLoadedonServer bool
}

tests := []test{
{
name: "model exists",
modelToLoad: "foo",
modelToRequest: "foo",
statusCode: http.StatusOK,
isLoadedonServer: true,
name: "model exists",
modelToLoad: "foo_1",
modelToRequest: "foo_1",
modelExternalHeader: "foo",
expectedModelExternalTag: "foo",
statusCode: http.StatusOK,
isLoadedonServer: true,
},
{
name: "model exists on agent but not loaded on server",
modelToLoad: "foo",
modelToRequest: "foo",
statusCode: http.StatusOK,
isLoadedonServer: false,
name: "model exists, part of experiment",
modelToLoad: "foo_1",
modelToRequest: "foo_1",
modelExternalHeader: "foo-experiment.experiment",
expectedModelExternalTag: "foo",
statusCode: http.StatusOK,
isLoadedonServer: true,
},
{
name: "model does not exists",
modelToLoad: "foo",
modelToRequest: "foo2",
statusCode: http.StatusNotFound,
isLoadedonServer: false,
name: "model exists on agent but not loaded on server",
modelToLoad: "foo_1",
modelToRequest: "foo_1",
modelExternalHeader: "foo",
expectedModelExternalTag: "foo",
statusCode: http.StatusOK,
isLoadedonServer: false,
},
{
name: "model does not exists",
modelToLoad: "foo_1",
modelToRequest: "foo2_1",
modelExternalHeader: "foo2",
expectedModelExternalTag: "foo2",
statusCode: http.StatusNotFound,
isLoadedonServer: false,
},
}

Expand All @@ -217,7 +251,8 @@ func TestReverseProxySmoke(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rpHTTP := setupReverseProxy(logger, 3, test.modelToLoad, rpPort, serverPort)
fakeMetricsHandler := newFakeMetricsHandler()
rpHTTP := setupReverseProxy(logger, 3, test.modelToLoad, rpPort, serverPort, fakeMetricsHandler)
err = rpHTTP.Start()
g.Expect(err).To(BeNil())
time.Sleep(500 * time.Millisecond)
Expand All @@ -237,7 +272,7 @@ func TestReverseProxySmoke(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, url, nil)
g.Expect(err).To(BeNil())
req.Header.Set("contentType", "application/json")
req.Header.Set(resources.SeldonModelHeader, test.modelToRequest)
req.Header.Set(resources.SeldonModelHeader, test.modelExternalHeader)
req.Header.Set(resources.SeldonInternalModelHeader, test.modelToRequest)
resp, err := http.DefaultClient.Do(req)
g.Expect(err).To(BeNil())
Expand All @@ -254,9 +289,16 @@ func TestReverseProxySmoke(t *testing.T) {
if test.statusCode == http.StatusOK {
g.Expect(rpHTTP.modelScalingStatsCollector.ModelLagStats.Get(test.modelToRequest)).To(Equal(uint32(0)))
g.Expect(rpHTTP.modelScalingStatsCollector.ModelLastUsedStats.Get(test.modelToRequest)).Should(BeNumerically("<=", time.Now().Unix())) // only triggered when we get results back

}

// test infer metrics
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].internalModelName).To(Equal(test.modelToRequest))
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].method).To(Equal("rest"))
if test.statusCode == http.StatusOK {
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].code).To(Equal("200"))
} else {
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].code).To(Equal("404"))
}
g.Expect(rpHTTP.Ready()).To(BeTrue())
_ = rpHTTP.Stop()
g.Expect(rpHTTP.Ready()).To(BeFalse())
Expand All @@ -273,7 +315,8 @@ func TestReverseEarlyStop(t *testing.T) {
logger := log.New()
logger.SetLevel(log.DebugLevel)

rpHTTP := setupReverseProxy(logger, 0, "dummy", 1, 1)
fakeMetricsHandler := newFakeMetricsHandler()
rpHTTP := setupReverseProxy(logger, 0, "dummy", 1, 1, fakeMetricsHandler)
err := rpHTTP.Stop()
g.Expect(err).To(BeNil())
ready := rpHTTP.Ready()
Expand Down

0 comments on commit 207e057

Please sign in to comment.