Skip to content

Commit

Permalink
some renaming and test updates
Browse files Browse the repository at this point in the history
  • Loading branch information
driev committed Nov 29, 2024
1 parent dbee329 commit 1d66c06
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 45 deletions.
18 changes: 9 additions & 9 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,14 @@ func (p *IncrementalProcessor) addTrafficForExperiment(routeName string, exp *ex
switch exp.ResourceType {
case experiment.PipelineResourceType:

var mirrorSplit *resources.PipelineTrafficSplits
trafficSplits := make([]resources.PipelineTrafficSplits, len(exp.Candidates))
var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplits{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
mirrorSplit = &resources.PipelineTrafficSplits{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)
Expand Down Expand Up @@ -467,20 +467,20 @@ func (p *IncrementalProcessor) addPipeline(pipelineName string) error {
if exp.Deleted {
return fmt.Errorf("Experiment on pipeline %s, but %s is deleted", pip.Name, *exp.Default)
}
var mirrorSplit *resources.PipelineTrafficSplits
trafficSplits := make([]resources.PipelineTrafficSplits, len(exp.Candidates))
var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplits{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
mirrorSplit = &resources.PipelineTrafficSplits{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)
} else {
logger.Infof("Adding normal pipeline route %s", routeName)
p.xdsCache.AddPipelineRoute(routeName, []resources.PipelineTrafficSplits{{PipelineName: pip.Name, TrafficWeight: 100}}, nil)
p.xdsCache.AddPipelineRoute(routeName, []resources.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil)
}

return p.updateEnvoy()
Expand Down
52 changes: 46 additions & 6 deletions scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,14 +873,18 @@ func TestEnvoySettings(t *testing.T) {

count := 0
for _, rawMessage := range rawMessages {
snapshotRoute := &routev3.RouteConfiguration{}
err := protojson.Unmarshal(rawMessage, snapshotRoute)
snapshotRouteConfig := &routev3.RouteConfiguration{}
err := protojson.Unmarshal(rawMessage, snapshotRouteConfig)
g.Expect(err).To(BeNil())

resultingRoute := resultingRoutes[snapshotRoute.Name]
g.Expect(resultingRoute).To(Not(BeNil()))

g.Expect(len(snapshotRoute.VirtualHosts[0].Routes)).Should(Equal(len(resultingRoute.VirtualHosts[0].Routes)))
resultingRouteConfig := resultingRoutes[snapshotRouteConfig.Name]
g.Expect(resultingRouteConfig).To(Not(BeNil()))
g.Expect(len(snapshotRouteConfig.VirtualHosts)).Should(Equal(1))
g.Expect(len(resultingRouteConfig.VirtualHosts)).Should(Equal(1))
snapshotRoutes := getTrafficSplits(snapshotRouteConfig.VirtualHosts[0])
resultingRoutes := getTrafficSplits(resultingRouteConfig.VirtualHosts[0])
g.Expect(len(resultingRoutes)).Should(Equal(len(snapshotRoutes)))
g.Expect(resultingRoutes).Should(ConsistOf(snapshotRoutes))
count++
}
g.Expect(len(resultingRoutes)).To(Equal(count))
Expand Down Expand Up @@ -960,7 +964,43 @@ func createSnapshot(g Gomega, resources []types.Resource, filename string) {

_, err = file.Write(jsonData)
g.Expect(err).To(BeNil())
}

func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
trafficSplits := make([]resources.Route, 0)

for _, route := range virtualHost.Routes {
trafficSplit := resources.Route{
RouteName: route.Name,
Clusters: make([]resources.TrafficSplit, 0),
}

clusterSpecificer := route.GetRoute().GetClusterSpecifier()

fmt.Printf("%v", clusterSpecificer)

switch route.GetRoute().GetClusterSpecifier().(type) {
case *routev3.RouteAction_WeightedClusters:
weightedClusters := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_WeightedClusters)

for _, weightedCluster := range weightedClusters.WeightedClusters.Clusters {
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
ModelName: weightedCluster.Name,
TrafficWeight: weightedCluster.Weight.Value,
})
}
case *routev3.RouteAction_Cluster:
cluster := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_Cluster)
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
ModelName: cluster.Cluster,
TrafficWeight: 100,
})

}

}

return trafficSplits
}

func getEndpoints(loadAssignment *endpointv3.ClusterLoadAssignment) []resources.Endpoint {
Expand Down
12 changes: 6 additions & 6 deletions scheduler/pkg/envoy/resources/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ type Listener struct {
type Route struct {
RouteName string
LogPayloads bool
Clusters []TrafficSplits
Mirror *TrafficSplits
Clusters []TrafficSplit
Mirror *TrafficSplit
}

type TrafficSplits struct {
type TrafficSplit struct {
ModelName string
ModelVersion uint32
TrafficWeight uint32
Expand Down Expand Up @@ -61,11 +61,11 @@ type Endpoint struct {

type PipelineRoute struct {
RouteName string
Clusters []PipelineTrafficSplits
Mirror *PipelineTrafficSplits
Clusters []PipelineTrafficSplit
Mirror *PipelineTrafficSplit
}

type PipelineTrafficSplits struct {
type PipelineTrafficSplit struct {
PipelineName string
TrafficWeight uint32
}
Expand Down
24 changes: 12 additions & 12 deletions scheduler/pkg/envoy/resources/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func createMirrorRouteAction(trafficWeight uint32, rest bool) []*route.RouteActi

// weighted clusters do not play well with session affinity see https://github.com/envoyproxy/envoy/issues/8167
// Traffic shifting may need to be reinvesigated https://github.com/envoyproxy/envoy/pull/18207
func createWeightedModelClusterAction(clusterTraffics []TrafficSplits, mirrorTraffics []TrafficSplits, rest bool) *route.Route_Route {
func createWeightedModelClusterAction(clusterTraffics []TrafficSplit, mirrorTraffics []TrafficSplit, rest bool) *route.Route_Route {
// Add Weighted Clusters with given traffic percentages to each internal model
var splits []*route.WeightedCluster_ClusterWeight
var mirrors []*route.RouteAction_RequestMirrorPolicy
Expand Down Expand Up @@ -263,17 +263,17 @@ func makeModelHttpRoute(r *Route, rt *route.Route, isMirror bool) {
}

if isMirror {
rt.Action = createWeightedModelClusterAction([]TrafficSplits{*r.Mirror}, []TrafficSplits{}, true)
rt.Action = createWeightedModelClusterAction([]TrafficSplit{*r.Mirror}, []TrafficSplit{}, true)
} else {
rt.Action = createWeightedModelClusterAction(r.Clusters, []TrafficSplits{}, true)
rt.Action = createWeightedModelClusterAction(r.Clusters, []TrafficSplit{}, true)
}

if r.LogPayloads {
rt.ResponseHeadersToAdd = modelRouteHeaders
}
}

func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplits, rt *route.Route, isGrpc bool) {
func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplit, rt *route.Route, isGrpc bool) {
if isGrpc {
rt.Name = r.RouteName + "_grpc_experiment"
rt.Match.PathSpecifier = modelRouteMatchPathGrpc
Expand Down Expand Up @@ -373,9 +373,9 @@ func makeModelGrpcRoute(r *Route, rt *route.Route, isMirror bool) {
}

if isMirror {
rt.Action = createWeightedModelClusterAction([]TrafficSplits{*r.Mirror}, []TrafficSplits{}, false)
rt.Action = createWeightedModelClusterAction([]TrafficSplit{*r.Mirror}, []TrafficSplit{}, false)
} else {
rt.Action = createWeightedModelClusterAction(r.Clusters, []TrafficSplits{}, false)
rt.Action = createWeightedModelClusterAction(r.Clusters, []TrafficSplit{}, false)
}

if r.LogPayloads {
Expand All @@ -392,7 +392,7 @@ func getPipelineModelName(pipelineName string) string {
return fmt.Sprintf("%s.%s", pipelineName, SeldonPipelineHeaderSuffix)
}

func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplits, mirrorTraffics []PipelineTrafficSplits, rest bool) *route.Route_Route {
func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplit, mirrorTraffics []PipelineTrafficSplit, rest bool) *route.Route_Route {
// Add Weighted Clusters with given traffic percentages to each internal model
var splits []*route.WeightedCluster_ClusterWeight
var mirrors []*route.RouteAction_RequestMirrorPolicy
Expand Down Expand Up @@ -466,9 +466,9 @@ func makePipelineHttpRoute(r *PipelineRoute, rt *route.Route, isMirror bool) {
}

if isMirror {
rt.Action = createWeightedPipelineClusterAction([]PipelineTrafficSplits{*r.Mirror}, []PipelineTrafficSplits{}, true)
rt.Action = createWeightedPipelineClusterAction([]PipelineTrafficSplit{*r.Mirror}, []PipelineTrafficSplit{}, true)
} else {
rt.Action = createWeightedPipelineClusterAction(r.Clusters, []PipelineTrafficSplits{}, true)
rt.Action = createWeightedPipelineClusterAction(r.Clusters, []PipelineTrafficSplit{}, true)
}
}

Expand All @@ -493,13 +493,13 @@ func makePipelineGrpcRoute(r *PipelineRoute, rt *route.Route, isMirror bool) {
}

if isMirror {
rt.Action = createWeightedPipelineClusterAction([]PipelineTrafficSplits{*r.Mirror}, []PipelineTrafficSplits{}, false)
rt.Action = createWeightedPipelineClusterAction([]PipelineTrafficSplit{*r.Mirror}, []PipelineTrafficSplit{}, false)
} else {
rt.Action = createWeightedPipelineClusterAction(r.Clusters, []PipelineTrafficSplits{}, false)
rt.Action = createWeightedPipelineClusterAction(r.Clusters, []PipelineTrafficSplit{}, false)
}
}

func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTrafficSplits, rt *route.Route, isGrpc bool) {
func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTrafficSplit, rt *route.Route, isGrpc bool) {
if isGrpc {
rt.Name = r.RouteName + "_grpc_experiment"
rt.Match.PathSpecifier = pipelineRoutePathGrpc
Expand Down
16 changes: 8 additions & 8 deletions scheduler/pkg/envoy/resources/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMakeRoute(t *testing.T) {
modelRoutes: []*Route{
{
RouteName: "r1",
Clusters: []TrafficSplits{
Clusters: []TrafficSplit{
{
ModelName: "m1",
ModelVersion: 1,
Expand All @@ -50,7 +50,7 @@ func TestMakeRoute(t *testing.T) {
pipelineRoutes: []*PipelineRoute{
{
RouteName: "r1",
Clusters: []PipelineTrafficSplits{
Clusters: []PipelineTrafficSplit{
{
PipelineName: "p1",
TrafficWeight: 100,
Expand All @@ -66,7 +66,7 @@ func TestMakeRoute(t *testing.T) {
pipelineRoutes: []*PipelineRoute{
{
RouteName: "r1",
Clusters: []PipelineTrafficSplits{
Clusters: []PipelineTrafficSplit{
{
PipelineName: "p1",
TrafficWeight: 50,
Expand All @@ -86,7 +86,7 @@ func TestMakeRoute(t *testing.T) {
pipelineRoutes: []*PipelineRoute{
{
RouteName: "r1",
Clusters: []PipelineTrafficSplits{
Clusters: []PipelineTrafficSplit{
{
PipelineName: "p1",
TrafficWeight: 50,
Expand All @@ -96,7 +96,7 @@ func TestMakeRoute(t *testing.T) {
TrafficWeight: 50,
},
},
Mirror: &PipelineTrafficSplits{
Mirror: &PipelineTrafficSplit{
PipelineName: "p3",
TrafficWeight: 100,
},
Expand All @@ -110,7 +110,7 @@ func TestMakeRoute(t *testing.T) {
modelRoutes: []*Route{
{
RouteName: "r1",
Clusters: []TrafficSplits{
Clusters: []TrafficSplit{
{
ModelName: "m1",
ModelVersion: 1,
Expand All @@ -136,7 +136,7 @@ func TestMakeRoute(t *testing.T) {
modelRoutes: []*Route{
{
RouteName: "r1",
Clusters: []TrafficSplits{
Clusters: []TrafficSplit{
{
ModelName: "m1",
ModelVersion: 1,
Expand All @@ -152,7 +152,7 @@ func TestMakeRoute(t *testing.T) {
GrpcCluster: "g1",
},
},
Mirror: &TrafficSplits{
Mirror: &TrafficSplit{
ModelName: "m3",
ModelVersion: 1,
TrafficWeight: 100,
Expand Down
6 changes: 3 additions & 3 deletions scheduler/pkg/envoy/xdscache/seldoncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (xds *SeldonXDSCache) SecretContents() []types.Resource {
return r
}

func (xds *SeldonXDSCache) AddPipelineRoute(routeName string, trafficSplits []resources.PipelineTrafficSplits, mirror *resources.PipelineTrafficSplits) {
func (xds *SeldonXDSCache) AddPipelineRoute(routeName string, trafficSplits []resources.PipelineTrafficSplit, mirror *resources.PipelineTrafficSplit) {
xds.RemovePipelineRoute(routeName)
pipelineRoute, ok := xds.Pipelines[routeName]
if !ok {
Expand Down Expand Up @@ -274,7 +274,7 @@ func (xds *SeldonXDSCache) AddRouteClusterTraffic(
route.LogPayloads = true
}

clusterTraffic := resources.TrafficSplits{
clusterTraffic := resources.TrafficSplit{
ModelName: modelName,
ModelVersion: modelVersion.GetVersion(),
TrafficWeight: trafficPercent,
Expand Down Expand Up @@ -358,7 +358,7 @@ func getClusterNames(modelVersion *store.ModelVersion) (string, string) {
return httpClusterName, grpcClusterName
}

func (xds *SeldonXDSCache) removeRouteFromCluster(routeName string, route resources.Route, cluster resources.TrafficSplits) error {
func (xds *SeldonXDSCache) removeRouteFromCluster(routeName string, route resources.Route, cluster resources.TrafficSplit) error {
httpCluster, ok := xds.Clusters[cluster.HttpCluster]
if !ok {
return fmt.Errorf("Can't find http cluster for route %s cluster %s route %+v", routeName, cluster.HttpCluster, route)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func benchmarkRouteContents(b *testing.B, numResources uint) {
x := NewSeldonXDSCache(logrus.New(), nil)

for n := 0; n < int(numResources); n++ {
x.AddPipelineRoute(strconv.Itoa(n), []resources.PipelineTrafficSplits{{PipelineName: strconv.Itoa(n), TrafficWeight: 100}}, nil)
x.AddPipelineRoute(strconv.Itoa(n), []resources.PipelineTrafficSplit{{PipelineName: strconv.Itoa(n), TrafficWeight: 100}}, nil)

x.AddRouteClusterTraffic(fmt.Sprintf("model-%d", n), store.NewDefaultModelVersion(&scheduler.Model{Meta: &scheduler.MetaData{Name: fmt.Sprintf("model-%d", n)}}, 1), 100, false)
}
Expand Down

0 comments on commit 1d66c06

Please sign in to comment.