Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(envoy): fixing a test #6163

Merged
merged 25 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions scheduler/pkg/envoy/processor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ var permanentClusterNames = []string{"pipelinegateway_http", "pipelinegateway_gr

func TestFetch(t *testing.T) {
g := NewGomegaWithT(t)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// The timeout might be a bit conservative when running as part of the CI test suite
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

logger := log.New()
Expand Down Expand Up @@ -81,15 +82,16 @@ func testInitialFetch(g *WithT, inc *IncrementalProcessor, c client.ADSClient) f
expectedClusters[1] = secondFetch

return func(t *testing.T) {

ops := []func(inc *IncrementalProcessor, g *WithT){
createTestServer("server", 1),
createTestModel("model", "server", 1, []int{0}, 1, []store.ModelReplicaState{store.Available}),
}

for _, op := range ops {
op(inc, g)
}
go func() {
for _, op := range ops {
op(inc, g)
}
}()

for _, expectedClusterNames := range expectedClusters {
resp, err := c.Fetch()
Expand All @@ -103,7 +105,7 @@ func testInitialFetch(g *WithT, inc *IncrementalProcessor, c client.ADSClient) f
}
slices.Sort(actualClusterNames)
slices.Sort(expectedClusterNames)
g.Expect(reflect.DeepEqual(actualClusterNames, expectedClusterNames)).To(BeTrue())
g.Expect(reflect.DeepEqual(actualClusterNames, expectedClusterNames)).To(BeTrue(), "%v does not equal %v", actualClusterNames, expectedClusterNames)

err = c.Ack()
g.Expect(err).To(BeNil())
Expand All @@ -121,7 +123,6 @@ func testUpdateModelVersion(g *WithT, inc *IncrementalProcessor, c client.ADSCli
expectedClusters[1] = secondFetch

return func(t *testing.T) {

ops := []func(inc *IncrementalProcessor, g *WithT){
createTestModel("model", "server", 1, []int{0}, 2, []store.ModelReplicaState{store.Available}),
}
Expand All @@ -148,7 +149,6 @@ func testUpdateModelVersion(g *WithT, inc *IncrementalProcessor, c client.ADSCli
err = c.Ack()
g.Expect(err).To(BeNil())
}

}
}

Expand Down
29 changes: 17 additions & 12 deletions scheduler/pkg/envoy/xdscache/seldoncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,24 @@ const (
envoyDownstreamClientCertName = "downstream_client"
envoyUpstreamServerCertName = "upstream_server"
envoyUpstreamClientCertName = "upstream_client"
snapshotType = "snapshot"
)

type SeldonXDSCache struct {
// https://github.com/envoyproxy/go-control-plane?tab=readme-ov-file#resource-caching
// each envoy resourece is managed independently, using ADS (aggregated discovery service), so
// updates can be sequenced in a way that reduces the susceptibility to "no cluster found"
// responses
muxCache *cache.MuxCache
snapshotCache cache.SnapshotCache
cds *cache.LinearCache
lds *cache.LinearCache
sds *cache.LinearCache
snapshotCache cache.SnapshotCache // routes
cds *cache.LinearCache // clusters
lds *cache.LinearCache // listener
sds *cache.LinearCache // secrets

Clusters map[string]Cluster
Pipelines map[string]PipelineRoute
Routes map[string]Route
clustersToAdd map[string]bool
clustersToRemove map[string]bool
clustersToAdd map[string]struct{}
clustersToRemove map[string]struct{}
pipelineGatewayDetails *PipelineGatewayDetails
secrets map[string]Secret
logger logrus.FieldLogger
Expand All @@ -73,8 +76,8 @@ func NewSeldonXDSCache(logger logrus.FieldLogger, pipelineGatewayDetails *Pipeli
Clusters: make(map[string]Cluster),
Pipelines: make(map[string]PipelineRoute),
Routes: make(map[string]Route),
clustersToAdd: make(map[string]bool),
clustersToRemove: make(map[string]bool),
clustersToAdd: make(map[string]struct{}),
clustersToRemove: make(map[string]struct{}),
pipelineGatewayDetails: pipelineGatewayDetails,
secrets: make(map[string]Secret),
logger: logger.WithField("source", "SeldonXDSCache"),
Expand Down Expand Up @@ -111,6 +114,7 @@ func (xds *SeldonXDSCache) newSnapshotVersion() string {
}

func (xds *SeldonXDSCache) init() error {
const snapshotType = "snapshot"
linearLogger := xds.logger.WithField("source", "LinearCache")
snapshotLogger := xds.logger.WithField("source", "SnapshotCache")

Expand Down Expand Up @@ -356,6 +360,7 @@ func (xds *SeldonXDSCache) RemoveClusters() error {
return xds.cds.UpdateResources(nil, clustersToRemove)
}

// updates are batched - always check if the state has changed
func (xds *SeldonXDSCache) shouldRemoveCluster(name string) bool {
cluster, ok := xds.Clusters[name]
return !ok || len(cluster.Routes) < 1
Expand Down Expand Up @@ -470,11 +475,11 @@ func (xds *SeldonXDSCache) AddClustersForRoute(

xds.Clusters[httpClusterName] = httpCluster
httpCluster.Routes[routeVersionKey] = true
xds.clustersToAdd[httpClusterName] = true
xds.clustersToAdd[httpClusterName] = struct{}{}

xds.Clusters[grpcClusterName] = grpcCluster
grpcCluster.Routes[routeVersionKey] = true
xds.clustersToAdd[grpcClusterName] = true
xds.clustersToAdd[grpcClusterName] = struct{}{}
}

func (xds *SeldonXDSCache) RemoveRoute(routeName string) error {
Expand Down Expand Up @@ -510,7 +515,7 @@ func (xds *SeldonXDSCache) removeRouteFromCluster(route Route, cluster TrafficSp
delete(cluster.Routes, RouteVersionKey{RouteName: route.RouteName, ModelName: split.ModelName, Version: split.ModelVersion})
if len(cluster.Routes) == 0 {
delete(xds.Clusters, clusterName)
xds.clustersToRemove[clusterName] = true
xds.clustersToRemove[clusterName] = struct{}{}
}
return nil
}
Expand Down
Loading