Skip to content

Commit

Permalink
feat(envoy): fixing a test (#6163)
Browse files Browse the repository at this point in the history
* moving headers to util

* removing a newline

* lint

* saving progress

* nearly there

* fixing a bug

* cleaning up

* lint

* some slight refactoring and test improvements

* changes following PR review

* try to fix flaky test

* fixing test

* another fix for the flaky test

* increase deadline

* increasing deadline

* use a background context

* remove final check

* another fix for the flaky test

* add timeout

* polling the xdscache first

* increase timeout

* remove first fetch

* remove unused var
  • Loading branch information
driev authored Dec 18, 2024
1 parent 0640876 commit dd44c03
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 67 deletions.
92 changes: 37 additions & 55 deletions scheduler/pkg/envoy/processor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processor

import (
"context"
"reflect"
"slices"
"strconv"
"testing"
Expand Down Expand Up @@ -30,7 +29,7 @@ var permanentClusterNames = []string{"pipelinegateway_http", "pipelinegateway_gr

func TestFetch(t *testing.T) {
g := NewGomegaWithT(t)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

logger := log.New()
Expand Down Expand Up @@ -73,83 +72,66 @@ func TestFetch(t *testing.T) {
}

func testInitialFetch(g *WithT, inc *IncrementalProcessor, c client.ADSClient) func(t *testing.T) {
secondFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http")

expectedClusters := make([][]string, 2)

expectedClusters[0] = permanentClusterNames
expectedClusters[1] = secondFetch
firstFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http")

return func(t *testing.T) {
fecthAndVerifyResponse(permanentClusterNames, c, g)

ops := []func(inc *IncrementalProcessor, g *WithT){
createTestServer("server", 1),
createTestModel("model", "server", 1, []int{0}, 1, []store.ModelReplicaState{store.Available}),
}

for _, op := range ops {
op(inc, g)
}

for _, expectedClusterNames := range expectedClusters {
resp, err := c.Fetch()
g.Expect(err).To(BeNil())
actualClusterNames := make([]string, 0)
for _, r := range resp.Resources {
cluster := &clusterv3.Cluster{}
err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{})
g.Expect(err).To(BeNil())
actualClusterNames = append(actualClusterNames, cluster.Name)
go func() {
for _, op := range ops {
op(inc, g)
}
slices.Sort(actualClusterNames)
slices.Sort(expectedClusterNames)
g.Expect(reflect.DeepEqual(actualClusterNames, expectedClusterNames)).To(BeTrue())
}()

err = c.Ack()
g.Expect(err).To(BeNil())
}
g.Eventually(inc.xdsCache.Clusters).WithPolling(100 * time.Millisecond).WithTimeout(5 * time.Second).Should(HaveKey(MatchRegexp(`^model_1`)))

fecthAndVerifyResponse(firstFetch, c, g)
}
}

func testUpdateModelVersion(g *WithT, inc *IncrementalProcessor, c client.ADSClient) func(t *testing.T) {
firstFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http", "model_2_grpc", "model_2_http")
secondFetch := append(permanentClusterNames, "model_2_grpc", "model_2_http")

expectedClusters := make([][]string, 2)

expectedClusters[0] = firstFetch
expectedClusters[1] = secondFetch

return func(t *testing.T) {

ops := []func(inc *IncrementalProcessor, g *WithT){
createTestModel("model", "server", 1, []int{0}, 2, []store.ModelReplicaState{store.Available}),
}
go func() {
for _, op := range ops {
op(inc, g)
}
}()

for _, op := range ops {
op(inc, g)
}

for _, expectedClusterNames := range expectedClusters {
g.Eventually(inc.xdsCache.Clusters).WithPolling(100 * time.Millisecond).WithTimeout(5 * time.Second).Should(HaveKey(MatchRegexp(`^model_2`)))

resp, err := c.Fetch()
g.Expect(err).To(BeNil())
actualClusterNames := make([]string, 0)
for _, r := range resp.Resources {
cluster := &clusterv3.Cluster{}
err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{})
g.Expect(err).To(BeNil())
actualClusterNames = append(actualClusterNames, cluster.Name)
}
slices.Sort(actualClusterNames)
slices.Sort(expectedClusterNames)
g.Expect(reflect.DeepEqual(actualClusterNames, expectedClusterNames)).To(BeTrue())
// version 2 exists
fecthAndVerifyResponse(secondFetch, c, g)
}
}

err = c.Ack()
g.Expect(err).To(BeNil())
}
func fecthAndVerifyResponse(expectedClusterNames []string, c client.ADSClient, g *WithT) {
slices.Sort(expectedClusterNames)
g.Expect(fetch(c, g)).Should(ContainElements(expectedClusterNames))
}

func fetch(c client.ADSClient, g *WithT) []string {
resp, err := c.Fetch()
g.Expect(err).To(BeNil())
actualClusterNames := make([]string, 0)
for _, r := range resp.Resources {
cluster := &clusterv3.Cluster{}
err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{})
g.Expect(err).To(BeNil())
actualClusterNames = append(actualClusterNames, cluster.Name)
}
slices.Sort(actualClusterNames)
err = c.Ack()
g.Expect(err).To(BeNil())
return actualClusterNames
}

func startAdsServer(inc *IncrementalProcessor, port uint) error {
Expand Down
29 changes: 17 additions & 12 deletions scheduler/pkg/envoy/xdscache/seldoncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,24 @@ const (
envoyDownstreamClientCertName = "downstream_client"
envoyUpstreamServerCertName = "upstream_server"
envoyUpstreamClientCertName = "upstream_client"
snapshotType = "snapshot"
)

type SeldonXDSCache struct {
// https://github.com/envoyproxy/go-control-plane?tab=readme-ov-file#resource-caching
// each envoy resourece is managed independently, using ADS (aggregated discovery service), so
// updates can be sequenced in a way that reduces the susceptibility to "no cluster found"
// responses
muxCache *cache.MuxCache
snapshotCache cache.SnapshotCache
cds *cache.LinearCache
lds *cache.LinearCache
sds *cache.LinearCache
snapshotCache cache.SnapshotCache // routes
cds *cache.LinearCache // clusters
lds *cache.LinearCache // listener
sds *cache.LinearCache // secrets

Clusters map[string]Cluster
Pipelines map[string]PipelineRoute
Routes map[string]Route
clustersToAdd map[string]bool
clustersToRemove map[string]bool
clustersToAdd map[string]struct{}
clustersToRemove map[string]struct{}
pipelineGatewayDetails *PipelineGatewayDetails
secrets map[string]Secret
logger logrus.FieldLogger
Expand All @@ -73,8 +76,8 @@ func NewSeldonXDSCache(logger logrus.FieldLogger, pipelineGatewayDetails *Pipeli
Clusters: make(map[string]Cluster),
Pipelines: make(map[string]PipelineRoute),
Routes: make(map[string]Route),
clustersToAdd: make(map[string]bool),
clustersToRemove: make(map[string]bool),
clustersToAdd: make(map[string]struct{}),
clustersToRemove: make(map[string]struct{}),
pipelineGatewayDetails: pipelineGatewayDetails,
secrets: make(map[string]Secret),
logger: logger.WithField("source", "SeldonXDSCache"),
Expand Down Expand Up @@ -111,6 +114,7 @@ func (xds *SeldonXDSCache) newSnapshotVersion() string {
}

func (xds *SeldonXDSCache) init() error {
const snapshotType = "snapshot"
linearLogger := xds.logger.WithField("source", "LinearCache")
snapshotLogger := xds.logger.WithField("source", "SnapshotCache")

Expand Down Expand Up @@ -356,6 +360,7 @@ func (xds *SeldonXDSCache) RemoveClusters() error {
return xds.cds.UpdateResources(nil, clustersToRemove)
}

// updates are batched - always check if the state has changed
func (xds *SeldonXDSCache) shouldRemoveCluster(name string) bool {
cluster, ok := xds.Clusters[name]
return !ok || len(cluster.Routes) < 1
Expand Down Expand Up @@ -470,11 +475,11 @@ func (xds *SeldonXDSCache) AddClustersForRoute(

xds.Clusters[httpClusterName] = httpCluster
httpCluster.Routes[routeVersionKey] = true
xds.clustersToAdd[httpClusterName] = true
xds.clustersToAdd[httpClusterName] = struct{}{}

xds.Clusters[grpcClusterName] = grpcCluster
grpcCluster.Routes[routeVersionKey] = true
xds.clustersToAdd[grpcClusterName] = true
xds.clustersToAdd[grpcClusterName] = struct{}{}
}

func (xds *SeldonXDSCache) RemoveRoute(routeName string) error {
Expand Down Expand Up @@ -510,7 +515,7 @@ func (xds *SeldonXDSCache) removeRouteFromCluster(route Route, cluster TrafficSp
delete(cluster.Routes, RouteVersionKey{RouteName: route.RouteName, ModelName: split.ModelName, Version: split.ModelVersion})
if len(cluster.Routes) == 0 {
delete(xds.Clusters, clusterName)
xds.clustersToRemove[clusterName] = true
xds.clustersToRemove[clusterName] = struct{}{}
}
return nil
}
Expand Down

0 comments on commit dd44c03

Please sign in to comment.