diff --git a/dataclients/kubernetes/client_test.go b/dataclients/kubernetes/client_test.go index 0dc2ac36bf..63c2615d24 100644 --- a/dataclients/kubernetes/client_test.go +++ b/dataclients/kubernetes/client_test.go @@ -42,7 +42,7 @@ func TestClientGetEndpointAddresses(t *testing.T) { // client.LoadAll() is not called - addrs := client.GetEndpointAddresses("namespace1", "service1") + addrs := client.GetEndpointAddresses("", "namespace1", "service1") assert.Nil(t, addrs) }) @@ -55,13 +55,13 @@ func TestClientGetEndpointAddresses(t *testing.T) { _, err := client.LoadAll() require.NoError(t, err) - addrs := client.GetEndpointAddresses("namespace1", "service1") + addrs := client.GetEndpointAddresses("", "namespace1", "service1") assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs) // test subsequent call returns the expected values even when previous result was modified addrs[0] = "modified" - addrs = client.GetEndpointAddresses("namespace1", "service1") + addrs = client.GetEndpointAddresses("", "namespace1", "service1") assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs) }) @@ -76,13 +76,13 @@ func TestClientGetEndpointAddresses(t *testing.T) { _, err := client.LoadAll() require.NoError(t, err) - addrs := client.GetEndpointAddresses("namespace1", "service1") + addrs := client.GetEndpointAddresses("", "namespace1", "service1") assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs) // test subsequent call returns the expected values even when previous result was modified addrs[0] = "modified" - addrs = client.GetEndpointAddresses("namespace1", "service1") + addrs = client.GetEndpointAddresses("", "namespace1", "service1") assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs) }) } @@ -94,7 +94,7 @@ func TestClientLoadEndpointAddresses(t *testing.T) { "testdata/ingressV1/ingress-data/lb-target-multi.yaml", ) - addrs, err := client.LoadEndpointAddresses("namespace1", "service1") + addrs, err := client.LoadEndpointAddresses("", "namespace1", "service1") assert.NoError(t, err) assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs) }) @@ -107,7 +107,7 @@ func TestClientLoadEndpointAddresses(t *testing.T) { "testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-ready.yaml", ) - addrs, err := client.LoadEndpointAddresses("namespace1", "service1") + addrs, err := client.LoadEndpointAddresses("", "namespace1", "service1") assert.NoError(t, err) assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs) }) diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index 451109e060..87b4dc00bd 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -562,7 +562,7 @@ func collectReadyEndpoints(endpointSlices *endpointSliceList) map[definitions.Re } // loadEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices API. -func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, error) { +func (c *clusterClient) loadEndpointAddresses(zone, namespace, name string) ([]string, error) { var result []string if c.enableEndpointSlices { url := fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace) + @@ -579,7 +579,11 @@ func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, } for _, eps := range ready { - result = eps.addresses() + if zone != "" { + result = eps.addressesByZone(zone) + } else { + result = eps.addresses() + } break } } else { diff --git a/dataclients/kubernetes/clusterstate.go b/dataclients/kubernetes/clusterstate.go index 702e6beb2c..dd2842633b 100644 --- a/dataclients/kubernetes/clusterstate.go +++ b/dataclients/kubernetes/clusterstate.go @@ -50,7 +50,7 @@ func (state *clusterState) getServiceRG(namespace, name string) (*service, error } // GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices. -func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string { +func (state *clusterState) GetEndpointsByService(zone, namespace, name, protocol string, servicePort *servicePort) []string { epID := endpointID{ ResourceID: newResourceID(namespace, name), Protocol: protocol, @@ -66,7 +66,7 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin var targets []string if state.enableEndpointSlices { if eps, ok := state.endpointSlices[epID.ResourceID]; ok { - targets = eps.targetsByServicePort("TCP", protocol, servicePort) + targets = eps.targetsByServicePort(zone, "TCP", protocol, servicePort) } else { return nil } @@ -84,7 +84,7 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin } // getEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices. -func (state *clusterState) getEndpointAddresses(namespace, name string) []string { +func (state *clusterState) getEndpointAddresses(zone, namespace, name string) []string { rID := newResourceID(namespace, name) state.mu.Lock() @@ -93,7 +93,11 @@ func (state *clusterState) getEndpointAddresses(namespace, name string) []string var addresses []string if state.enableEndpointSlices { if eps, ok := state.endpointSlices[rID]; ok { - addresses = eps.addresses() + if zone != "" { + addresses = eps.addressesByZone(zone) + } else { + addresses = eps.addresses() + } } else { return nil } @@ -110,7 +114,7 @@ func (state *clusterState) getEndpointAddresses(namespace, name string) []string } // GetEndpointsByTarget returns the skipper endpoints for kubernetes endpoints or endpointslices. -func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol, scheme string, target *definitions.BackendPort) []string { +func (state *clusterState) GetEndpointsByTarget(zone, namespace, name, protocol, scheme string, target *definitions.BackendPort) []string { epID := endpointID{ ResourceID: newResourceID(namespace, name), Protocol: protocol, @@ -126,7 +130,7 @@ func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol, schem var targets []string if state.enableEndpointSlices { if eps, ok := state.endpointSlices[epID.ResourceID]; ok { - targets = eps.targetsByServiceTarget(protocol, scheme, target) + targets = eps.targetsByServiceTarget(zone, protocol, scheme, target) } else { return nil } diff --git a/dataclients/kubernetes/clusterstate_test.go b/dataclients/kubernetes/clusterstate_test.go index f7c55e96a8..57d37c2fce 100644 --- a/dataclients/kubernetes/clusterstate_test.go +++ b/dataclients/kubernetes/clusterstate_test.go @@ -57,7 +57,7 @@ func benchmarkCachedEndpoints(b *testing.B, n int) { b.ResetTimer() dummy := []string{} for i := 0; i < b.N; i++ { - dummy = cs.GetEndpointsByTarget("default", "foo-0", "TCP", "http", &definitions.BackendPort{}) + dummy = cs.GetEndpointsByTarget("", "default", "foo-0", "TCP", "http", &definitions.BackendPort{}) } dummy2 = dummy } diff --git a/dataclients/kubernetes/endpointslices.go b/dataclients/kubernetes/endpointslices.go index e91818c80b..8e5d020f4b 100644 --- a/dataclients/kubernetes/endpointslices.go +++ b/dataclients/kubernetes/endpointslices.go @@ -43,7 +43,7 @@ func (eps *skipperEndpointSlice) getPort(protocol, pName string, pValue int) int return port } -func (eps *skipperEndpointSlice) targetsByServicePort(protocol, scheme string, servicePort *servicePort) []string { +func (eps *skipperEndpointSlice) targetsByServicePort(zone, protocol, scheme string, servicePort *servicePort) []string { var port int if servicePort.Name != "" { port = eps.getPort(protocol, servicePort.Name, servicePort.Port) @@ -58,21 +58,45 @@ func (eps *skipperEndpointSlice) targetsByServicePort(protocol, scheme string, s } result := make([]string, 0, len(eps.Endpoints)) + resultByZone := make([]string, 0, len(eps.Endpoints)) for _, ep := range eps.Endpoints { + if ep.Zone == zone { + resultByZone = append(resultByZone, formatEndpointString(ep.Address, scheme, port)) + } result = append(result, formatEndpointString(ep.Address, scheme, port)) } + if len(resultByZone) >= minEndpointsByZone { + return resultByZone + } return result } -func (eps *skipperEndpointSlice) targetsByServiceTarget(protocol, scheme string, serviceTarget *definitions.BackendPort) []string { +func (eps *skipperEndpointSlice) targetsByServiceTarget(zone, protocol, scheme string, serviceTarget *definitions.BackendPort) []string { pName, _ := serviceTarget.Value.(string) pValue, _ := serviceTarget.Value.(int) port := eps.getPort(protocol, pName, pValue) result := make([]string, 0, len(eps.Endpoints)) + resultByZone := make([]string, 0, len(eps.Endpoints)) for _, ep := range eps.Endpoints { + if ep.Zone == zone { + resultByZone = append(resultByZone, formatEndpointString(ep.Address, scheme, port)) + } result = append(result, formatEndpointString(ep.Address, scheme, port)) } + if len(resultByZone) >= minEndpointsByZone { + return resultByZone + } + return result +} + +func (eps *skipperEndpointSlice) addressesByZone(zone string) []string { + result := make([]string, 0, len(eps.Endpoints)) + for _, ep := range eps.Endpoints { + if ep.Zone == zone { + result = append(result, ep.Address) + } + } return result } diff --git a/dataclients/kubernetes/ingress.go b/dataclients/kubernetes/ingress.go index d44520f5c1..5e674ca113 100644 --- a/dataclients/kubernetes/ingress.go +++ b/dataclients/kubernetes/ingress.go @@ -43,6 +43,7 @@ type ingressContext struct { defaultFilters defaultFilters certificateRegistry *certregistry.CertRegistry calculateTraffic func([]*weightedIngressBackend) map[string]backendTraffic + zone string } type ingress struct { diff --git a/dataclients/kubernetes/ingressv1.go b/dataclients/kubernetes/ingressv1.go index ed2431ed5b..cb48dcd02f 100644 --- a/dataclients/kubernetes/ingressv1.go +++ b/dataclients/kubernetes/ingressv1.go @@ -118,7 +118,7 @@ func convertPathRuleV1( protocol = p } - eps = state.GetEndpointsByService(ns, svcName, protocol, servicePort) + eps = state.GetEndpointsByService(ic.zone, ns, svcName, protocol, servicePort) } if len(eps) == 0 { ic.logger.Tracef("Target endpoints not found, shuntroute for %s:%s", svcName, svcPort) @@ -370,6 +370,7 @@ func (ing *ingress) convertDefaultBackendV1( } eps = state.GetEndpointsByService( + ic.zone, ns, svcName, protocol, diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 523b71fdd8..cd7a392e59 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -27,6 +27,7 @@ const ( servicePortEnvVar = "KUBERNETES_SERVICE_PORT" httpRedirectRouteID = "kube__redirect" defaultEastWestDomain = "skipper.cluster.local" + minEndpointsByZone = 3 ) // PathMode values are used to control the ingress path interpretation. The path mode can @@ -240,6 +241,10 @@ type Options struct { // DefaultLoadBalancerAlgorithm sets the default algorithm to be used for load balancing between backend endpoints, // available options: roundRobin, consistentHash, random, powerOfRandomNChoices DefaultLoadBalancerAlgorithm string + + // TopologyZone + // TODO: explain its use + TopologyZone string } // Client is a Skipper DataClient implementation used to create routes based on Kubernetes Ingress settings. @@ -576,18 +581,18 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters { // GetEndpointAddresses returns the list of all addresses for the given service // loaded by previous call to LoadAll or LoadUpdate. -func (c *Client) GetEndpointAddresses(ns, name string) []string { +func (c *Client) GetEndpointAddresses(zone, ns, name string) []string { c.mu.Lock() defer c.mu.Unlock() if c.state == nil { return nil } - return c.state.getEndpointAddresses(ns, name) + return c.state.getEndpointAddresses(zone, ns, name) } // LoadEndpointAddresses returns the list of all addresses for the given service. -func (c *Client) LoadEndpointAddresses(namespace, name string) ([]string, error) { - return c.ClusterClient.loadEndpointAddresses(namespace, name) +func (c *Client) LoadEndpointAddresses(zone, namespace, name string) ([]string, error) { + return c.ClusterClient.loadEndpointAddresses(zone, namespace, name) } func compareStringList(a, b []string) []string { diff --git a/dataclients/kubernetes/routegroup.go b/dataclients/kubernetes/routegroup.go index 4ec23928f9..5cc00fe861 100644 --- a/dataclients/kubernetes/routegroup.go +++ b/dataclients/kubernetes/routegroup.go @@ -41,6 +41,7 @@ type routeGroupContext struct { calculateTraffic func([]*definitions.BackendReference) map[string]backendTraffic defaultLoadBalancerAlgorithm string certificateRegistry *certregistry.CertRegistry + zone string } type routeContext struct { @@ -187,6 +188,7 @@ func applyServiceBackend(ctx *routeGroupContext, backend *definitions.SkipperBac } eps := ctx.state.GetEndpointsByTarget( + ctx.zone, namespaceString(ctx.routeGroup.Metadata.Namespace), s.Meta.Name, "TCP", @@ -569,6 +571,7 @@ func (r *routeGroups) convert(s *clusterState, df defaultFilters, loggingEnabled calculateTraffic: getBackendTrafficCalculator[*definitions.BackendReference](r.options.BackendTrafficAlgorithm), defaultLoadBalancerAlgorithm: r.options.DefaultLoadBalancerAlgorithm, certificateRegistry: cr, + zone: r.options.TopologyZone, } ri, err := transformRouteGroup(ctx) diff --git a/routesrv/redishandler.go b/routesrv/redishandler.go index 8caec16a6e..1e931cc71b 100644 --- a/routesrv/redishandler.go +++ b/routesrv/redishandler.go @@ -42,7 +42,7 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func getRedisAddresses(opts *skipper.Options, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) { return func() ([]byte, error) { - a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + a := kdc.GetEndpointAddresses("", opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a) m.UpdateGauge("redis_endpoints", float64(len(a))) diff --git a/skipper.go b/skipper.go index 83a12a673a..847ce23bfa 100644 --- a/skipper.go +++ b/skipper.go @@ -286,6 +286,9 @@ type Options struct { // KubernetesRedisServicePort to be used to lookup ring shards dynamically KubernetesRedisServicePort int + // KubernetesTopologyZone TODO + KubernetesTopologyZone string + // KubernetesForceService overrides the default Skipper functionality to route traffic using Kubernetes Endpoints, // instead using Kubernetes Services. KubernetesForceService bool @@ -1423,14 +1426,14 @@ func getKubernetesRedisAddrUpdater(opts *Options, kdc *kubernetes.Client, loaded // has polled the data once or kdc.GetEndpointAdresses should be blocking // call to kubernetes API return func() ([]string, error) { - a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + a := kdc.GetEndpointAddresses("", opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) log.Debugf("GetEndpointAddresses found %d redis endpoints", len(a)) return joinPort(a, opts.KubernetesRedisServicePort), nil } } else { return func() ([]string, error) { - a, err := kdc.LoadEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + a, err := kdc.LoadEndpointAddresses("", opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) log.Debugf("LoadEndpointAddresses found %d redis endpoints, err: %v", len(a), err) return joinPort(a, opts.KubernetesRedisServicePort), err