diff --git a/scheduler/pkg/envoy/processor/server_test.go b/scheduler/pkg/envoy/processor/server_test.go index 9b3a199814..13ec5e2f03 100644 --- a/scheduler/pkg/envoy/processor/server_test.go +++ b/scheduler/pkg/envoy/processor/server_test.go @@ -2,7 +2,6 @@ package processor import ( "context" - "reflect" "slices" "strconv" "testing" @@ -30,7 +29,7 @@ var permanentClusterNames = []string{"pipelinegateway_http", "pipelinegateway_gr func TestFetch(t *testing.T) { g := NewGomegaWithT(t) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() logger := log.New() @@ -73,83 +72,66 @@ func TestFetch(t *testing.T) { } func testInitialFetch(g *WithT, inc *IncrementalProcessor, c client.ADSClient) func(t *testing.T) { - secondFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http") - - expectedClusters := make([][]string, 2) - - expectedClusters[0] = permanentClusterNames - expectedClusters[1] = secondFetch + firstFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http") return func(t *testing.T) { + fecthAndVerifyResponse(permanentClusterNames, c, g) ops := []func(inc *IncrementalProcessor, g *WithT){ createTestServer("server", 1), createTestModel("model", "server", 1, []int{0}, 1, []store.ModelReplicaState{store.Available}), } - - for _, op := range ops { - op(inc, g) - } - - for _, expectedClusterNames := range expectedClusters { - resp, err := c.Fetch() - g.Expect(err).To(BeNil()) - actualClusterNames := make([]string, 0) - for _, r := range resp.Resources { - cluster := &clusterv3.Cluster{} - err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{}) - g.Expect(err).To(BeNil()) - actualClusterNames = append(actualClusterNames, cluster.Name) + go func() { + for _, op := range ops { + op(inc, g) } - slices.Sort(actualClusterNames) - slices.Sort(expectedClusterNames) - g.Expect(reflect.DeepEqual(actualClusterNames, expectedClusterNames)).To(BeTrue()) + }() - err = c.Ack() - g.Expect(err).To(BeNil()) - } + g.Eventually(inc.xdsCache.Clusters).WithPolling(100 * time.Millisecond).WithTimeout(5 * time.Second).Should(HaveKey(MatchRegexp(`^model_1`))) + + fecthAndVerifyResponse(firstFetch, c, g) } } func testUpdateModelVersion(g *WithT, inc *IncrementalProcessor, c client.ADSClient) func(t *testing.T) { - firstFetch := append(permanentClusterNames, "model_1_grpc", "model_1_http", "model_2_grpc", "model_2_http") secondFetch := append(permanentClusterNames, "model_2_grpc", "model_2_http") - expectedClusters := make([][]string, 2) - - expectedClusters[0] = firstFetch - expectedClusters[1] = secondFetch - return func(t *testing.T) { - ops := []func(inc *IncrementalProcessor, g *WithT){ createTestModel("model", "server", 1, []int{0}, 2, []store.ModelReplicaState{store.Available}), } + go func() { + for _, op := range ops { + op(inc, g) + } + }() - for _, op := range ops { - op(inc, g) - } - - for _, expectedClusterNames := range expectedClusters { + g.Eventually(inc.xdsCache.Clusters).WithPolling(100 * time.Millisecond).WithTimeout(5 * time.Second).Should(HaveKey(MatchRegexp(`^model_2`))) - resp, err := c.Fetch() - g.Expect(err).To(BeNil()) - actualClusterNames := make([]string, 0) - for _, r := range resp.Resources { - cluster := &clusterv3.Cluster{} - err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{}) - g.Expect(err).To(BeNil()) - actualClusterNames = append(actualClusterNames, cluster.Name) - } - slices.Sort(actualClusterNames) - slices.Sort(expectedClusterNames) - g.Expect(reflect.DeepEqual(actualClusterNames, expectedClusterNames)).To(BeTrue()) + // version 2 exists + fecthAndVerifyResponse(secondFetch, c, g) + } +} - err = c.Ack() - g.Expect(err).To(BeNil()) - } +func fecthAndVerifyResponse(expectedClusterNames []string, c client.ADSClient, g *WithT) { + slices.Sort(expectedClusterNames) + g.Expect(fetch(c, g)).Should(ContainElements(expectedClusterNames)) +} +func fetch(c client.ADSClient, g *WithT) []string { + resp, err := c.Fetch() + g.Expect(err).To(BeNil()) + actualClusterNames := make([]string, 0) + for _, r := range resp.Resources { + cluster := &clusterv3.Cluster{} + err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{}) + g.Expect(err).To(BeNil()) + actualClusterNames = append(actualClusterNames, cluster.Name) } + slices.Sort(actualClusterNames) + err = c.Ack() + g.Expect(err).To(BeNil()) + return actualClusterNames } func startAdsServer(inc *IncrementalProcessor, port uint) error { diff --git a/scheduler/pkg/envoy/xdscache/seldoncache.go b/scheduler/pkg/envoy/xdscache/seldoncache.go index 1355fff34f..4c02118bb0 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache.go @@ -40,21 +40,24 @@ const ( envoyDownstreamClientCertName = "downstream_client" envoyUpstreamServerCertName = "upstream_server" envoyUpstreamClientCertName = "upstream_client" - snapshotType = "snapshot" ) type SeldonXDSCache struct { + // https://github.com/envoyproxy/go-control-plane?tab=readme-ov-file#resource-caching + // each envoy resourece is managed independently, using ADS (aggregated discovery service), so + // updates can be sequenced in a way that reduces the susceptibility to "no cluster found" + // responses muxCache *cache.MuxCache - snapshotCache cache.SnapshotCache - cds *cache.LinearCache - lds *cache.LinearCache - sds *cache.LinearCache + snapshotCache cache.SnapshotCache // routes + cds *cache.LinearCache // clusters + lds *cache.LinearCache // listener + sds *cache.LinearCache // secrets Clusters map[string]Cluster Pipelines map[string]PipelineRoute Routes map[string]Route - clustersToAdd map[string]bool - clustersToRemove map[string]bool + clustersToAdd map[string]struct{} + clustersToRemove map[string]struct{} pipelineGatewayDetails *PipelineGatewayDetails secrets map[string]Secret logger logrus.FieldLogger @@ -73,8 +76,8 @@ func NewSeldonXDSCache(logger logrus.FieldLogger, pipelineGatewayDetails *Pipeli Clusters: make(map[string]Cluster), Pipelines: make(map[string]PipelineRoute), Routes: make(map[string]Route), - clustersToAdd: make(map[string]bool), - clustersToRemove: make(map[string]bool), + clustersToAdd: make(map[string]struct{}), + clustersToRemove: make(map[string]struct{}), pipelineGatewayDetails: pipelineGatewayDetails, secrets: make(map[string]Secret), logger: logger.WithField("source", "SeldonXDSCache"), @@ -111,6 +114,7 @@ func (xds *SeldonXDSCache) newSnapshotVersion() string { } func (xds *SeldonXDSCache) init() error { + const snapshotType = "snapshot" linearLogger := xds.logger.WithField("source", "LinearCache") snapshotLogger := xds.logger.WithField("source", "SnapshotCache") @@ -356,6 +360,7 @@ func (xds *SeldonXDSCache) RemoveClusters() error { return xds.cds.UpdateResources(nil, clustersToRemove) } +// updates are batched - always check if the state has changed func (xds *SeldonXDSCache) shouldRemoveCluster(name string) bool { cluster, ok := xds.Clusters[name] return !ok || len(cluster.Routes) < 1 @@ -470,11 +475,11 @@ func (xds *SeldonXDSCache) AddClustersForRoute( xds.Clusters[httpClusterName] = httpCluster httpCluster.Routes[routeVersionKey] = true - xds.clustersToAdd[httpClusterName] = true + xds.clustersToAdd[httpClusterName] = struct{}{} xds.Clusters[grpcClusterName] = grpcCluster grpcCluster.Routes[routeVersionKey] = true - xds.clustersToAdd[grpcClusterName] = true + xds.clustersToAdd[grpcClusterName] = struct{}{} } func (xds *SeldonXDSCache) RemoveRoute(routeName string) error { @@ -510,7 +515,7 @@ func (xds *SeldonXDSCache) removeRouteFromCluster(route Route, cluster TrafficSp delete(cluster.Routes, RouteVersionKey{RouteName: route.RouteName, ModelName: split.ModelName, Version: split.ModelVersion}) if len(cluster.Routes) == 0 { delete(xds.Clusters, clusterName) - xds.clustersToRemove[clusterName] = true + xds.clustersToRemove[clusterName] = struct{}{} } return nil }