Skip to content

Commit ce18cd9

Browse files
authored
feature: fetch and use endpointslices if available (zalando#2565)
increase minor version because of the importance of the change and the required RBAC clusterrole change refactor: split out endpointslices and endpoints from ingress definitions refactor: split out non service resources and rename file refactor: value receiver -> ptr receiver doc: -enable-kubernetes-endpointslices=true to enable EndpointSlices to scale out more than 1000 endpoints doc: change RBAC clusterroles to reflect the endpointslices change test: add coverage for Targets() test: dataclients/kubernetes add failing testcase for named service target port fix: empty port name is fine, because if so there is only one allowed by kubernetes itself, otherwise port has a mandatory name Signed-off-by: Alexander Yastrebov <[email protected]> Signed-off-by: Sandor Szücs <[email protected]>
1 parent 8a3aba7 commit ce18cd9

File tree

94 files changed

+2727
-316
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+2727
-316
lines changed

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v0.17
1+
v0.18

config/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ type Config struct {
161161
KubernetesPathModeString string `yaml:"kubernetes-path-mode"`
162162
KubernetesPathMode kubernetes.PathMode `yaml:"-"`
163163
KubernetesNamespace string `yaml:"kubernetes-namespace"`
164+
KubernetesEnableEndpointSlices bool `yaml:"enable-kubernetes-endpointslices"`
164165
KubernetesEnableEastWest bool `yaml:"enable-kubernetes-east-west"`
165166
KubernetesEastWestDomain string `yaml:"kubernetes-east-west-domain"`
166167
KubernetesEastWestRangeDomains *listFlag `yaml:"kubernetes-east-west-range-domains"`
@@ -448,6 +449,7 @@ func NewConfig() *Config {
448449
flag.StringVar(&cfg.WhitelistedHealthCheckCIDR, "whitelisted-healthcheck-cidr", "", "sets the iprange/CIDRS to be whitelisted during healthcheck")
449450
flag.StringVar(&cfg.KubernetesPathModeString, "kubernetes-path-mode", "kubernetes-ingress", "controls the default interpretation of Kubernetes ingress paths: <kubernetes-ingress|path-regexp|path-prefix>")
450451
flag.StringVar(&cfg.KubernetesNamespace, "kubernetes-namespace", "", "watch only this namespace for ingresses")
452+
flag.BoolVar(&cfg.KubernetesEnableEndpointSlices, "enable-kubernetes-endpointslices", false, "Enables that skipper fetches Kubernetes endpointslices instead of endpoints to scale more than 1000 pods within a service")
451453
flag.BoolVar(&cfg.KubernetesEnableEastWest, "enable-kubernetes-east-west", false, "*Deprecated*: use kubernetes-east-west-range feature. Enables east-west communication, which automatically adds routes for Ingress objects with hostname <name>.<namespace>.skipper.cluster.local")
452454
flag.StringVar(&cfg.KubernetesEastWestDomain, "kubernetes-east-west-domain", "", "*Deprecated*: use kubernetes-east-west-range feature. Sets the east-west domain, defaults to .skipper.cluster.local")
453455
flag.Var(cfg.KubernetesEastWestRangeDomains, "kubernetes-east-west-range-domains", "set the the cluster internal domains for east west traffic. Identified routes to such domains will include the -kubernetes-east-west-range-predicates")
@@ -790,6 +792,7 @@ func (c *Config) ToOptions() skipper.Options {
790792
WhitelistedHealthCheckCIDR: whitelistCIDRS,
791793
KubernetesPathMode: c.KubernetesPathMode,
792794
KubernetesNamespace: c.KubernetesNamespace,
795+
KubernetesEnableEndpointslices: c.KubernetesEnableEndpointSlices,
793796
KubernetesEnableEastWest: c.KubernetesEnableEastWest,
794797
KubernetesEastWestDomain: c.KubernetesEastWestDomain,
795798
KubernetesEastWestRangeDomains: c.KubernetesEastWestRangeDomains.values,

dataclients/kubernetes/clusterclient.go

+129-37
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,18 @@ const (
2929
IngressesV1ClusterURI = "/apis/networking.k8s.io/v1/ingresses"
3030
ZalandoResourcesClusterURI = "/apis/zalando.org/v1"
3131
RouteGroupsName = "routegroups"
32-
routeGroupsClusterURI = "/apis/zalando.org/v1/routegroups"
32+
RouteGroupsClusterURI = "/apis/zalando.org/v1/routegroups"
3333
routeGroupClassKey = "zalando.org/routegroup.class"
3434
ServicesClusterURI = "/api/v1/services"
3535
EndpointsClusterURI = "/api/v1/endpoints"
36+
EndpointSlicesClusterURI = "/apis/discovery.k8s.io/v1/endpointslices"
3637
SecretsClusterURI = "/api/v1/secrets"
3738
defaultKubernetesURL = "http://localhost:8001"
3839
IngressesV1NamespaceFmt = "/apis/networking.k8s.io/v1/namespaces/%s/ingresses"
39-
routeGroupsNamespaceFmt = "/apis/zalando.org/v1/namespaces/%s/routegroups"
40+
RouteGroupsNamespaceFmt = "/apis/zalando.org/v1/namespaces/%s/routegroups"
4041
ServicesNamespaceFmt = "/api/v1/namespaces/%s/services"
4142
EndpointsNamespaceFmt = "/api/v1/namespaces/%s/endpoints"
43+
EndpointSlicesNamespaceFmt = "/apis/discovery.k8s.io/v1/namespaces/%s/endpointslices"
4244
SecretsNamespaceFmt = "/api/v1/namespaces/%s/secrets"
4345
serviceAccountDir = "/var/run/secrets/kubernetes.io/serviceaccount/"
4446
serviceAccountTokenKey = "token"
@@ -55,6 +57,7 @@ type clusterClient struct {
5557
routeGroupsURI string
5658
servicesURI string
5759
endpointsURI string
60+
endpointSlicesURI string
5861
secretsURI string
5962
tokenProvider secrets.SecretsProvider
6063
tokenFile string
@@ -65,11 +68,14 @@ type clusterClient struct {
6568
ingressClass *regexp.Regexp
6669
httpClient *http.Client
6770

68-
ingressLabelSelectors string
69-
servicesLabelSelectors string
70-
endpointsLabelSelectors string
71-
secretsLabelSelectors string
72-
routeGroupsLabelSelectors string
71+
ingressLabelSelectors string
72+
servicesLabelSelectors string
73+
endpointsLabelSelectors string
74+
endpointSlicesLabelSelectors string
75+
secretsLabelSelectors string
76+
routeGroupsLabelSelectors string
77+
78+
enableEndpointSlices bool
7379

7480
loggedMissingRouteGroups bool
7581
routeGroupValidator *definitions.RouteGroupValidator
@@ -149,22 +155,25 @@ func newClusterClient(o Options, apiURL, ingCls, rgCls string, quit <-chan struc
149155
}
150156

151157
c := &clusterClient{
152-
ingressesURI: IngressesV1ClusterURI,
153-
routeGroupsURI: routeGroupsClusterURI,
154-
servicesURI: ServicesClusterURI,
155-
endpointsURI: EndpointsClusterURI,
156-
secretsURI: SecretsClusterURI,
157-
ingressClass: ingClsRx,
158-
ingressLabelSelectors: toLabelSelectorQuery(o.IngressLabelSelectors),
159-
servicesLabelSelectors: toLabelSelectorQuery(o.ServicesLabelSelectors),
160-
endpointsLabelSelectors: toLabelSelectorQuery(o.EndpointsLabelSelectors),
161-
secretsLabelSelectors: toLabelSelectorQuery(o.SecretsLabelSelectors),
162-
routeGroupsLabelSelectors: toLabelSelectorQuery(o.RouteGroupsLabelSelectors),
163-
routeGroupClass: rgClsRx,
164-
httpClient: httpClient,
165-
apiURL: apiURL,
166-
certificateRegistry: o.CertificateRegistry,
167-
routeGroupValidator: &definitions.RouteGroupValidator{},
158+
ingressesURI: IngressesV1ClusterURI,
159+
routeGroupsURI: RouteGroupsClusterURI,
160+
servicesURI: ServicesClusterURI,
161+
endpointsURI: EndpointsClusterURI,
162+
endpointSlicesURI: EndpointSlicesClusterURI,
163+
secretsURI: SecretsClusterURI,
164+
ingressClass: ingClsRx,
165+
ingressLabelSelectors: toLabelSelectorQuery(o.IngressLabelSelectors),
166+
servicesLabelSelectors: toLabelSelectorQuery(o.ServicesLabelSelectors),
167+
endpointsLabelSelectors: toLabelSelectorQuery(o.EndpointsLabelSelectors),
168+
endpointSlicesLabelSelectors: toLabelSelectorQuery(o.EndpointSlicesLabelSelectors),
169+
secretsLabelSelectors: toLabelSelectorQuery(o.SecretsLabelSelectors),
170+
routeGroupsLabelSelectors: toLabelSelectorQuery(o.RouteGroupsLabelSelectors),
171+
routeGroupClass: rgClsRx,
172+
httpClient: httpClient,
173+
apiURL: apiURL,
174+
certificateRegistry: o.CertificateRegistry,
175+
routeGroupValidator: &definitions.RouteGroupValidator{},
176+
enableEndpointSlices: o.KubernetesEnableEndpointslices,
168177
}
169178

170179
if o.KubernetesInCluster {
@@ -220,9 +229,10 @@ func toLabelSelectorQuery(selectors map[string]string) string {
220229

221230
func (c *clusterClient) setNamespace(namespace string) {
222231
c.ingressesURI = fmt.Sprintf(IngressesV1NamespaceFmt, namespace)
223-
c.routeGroupsURI = fmt.Sprintf(routeGroupsNamespaceFmt, namespace)
232+
c.routeGroupsURI = fmt.Sprintf(RouteGroupsNamespaceFmt, namespace)
224233
c.servicesURI = fmt.Sprintf(ServicesNamespaceFmt, namespace)
225234
c.endpointsURI = fmt.Sprintf(EndpointsNamespaceFmt, namespace)
235+
c.endpointSlicesURI = fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace)
226236
c.secretsURI = fmt.Sprintf(SecretsNamespaceFmt, namespace)
227237
}
228238

@@ -455,6 +465,81 @@ func (c *clusterClient) loadEndpoints() (map[definitions.ResourceID]*endpoint, e
455465
return result, nil
456466
}
457467

468+
// loadEndpointSlices is different from the other load$Kind()
469+
// functions because there are 1..N endpointslices created for a given
470+
// service. endpointslices need to be deduplicated and state needs to
471+
// be checked. We read all endpointslices and create de-duplicated
472+
// business objects skipperEndpointSlice instead of raw Kubernetes
473+
// objects, because we need just a clean list of load balancer
474+
// members. The returned map will return the full list of ready
475+
// non-terminating endpoints that should be in the load balancer of a
476+
// given service, check endpointSlice.ToResourceID().
477+
func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skipperEndpointSlice, error) {
478+
var endpointSlices endpointSliceList
479+
480+
if err := c.getJSON(c.endpointSlicesURI+c.endpointSlicesLabelSelectors, &endpointSlices); err != nil {
481+
log.Debugf("requesting all endpointslices failed: %v", err)
482+
return nil, err
483+
}
484+
485+
log.Debugf("all endpointslices received: %d", len(endpointSlices.Items))
486+
mapSlices := make(map[definitions.ResourceID][]*endpointSlice)
487+
for _, endpointSlice := range endpointSlices.Items {
488+
resID := endpointSlice.ToResourceID() // service resource ID
489+
mapSlices[resID] = append(mapSlices[resID], endpointSlice)
490+
}
491+
492+
result := make(map[definitions.ResourceID]*skipperEndpointSlice)
493+
for resID, epSlices := range mapSlices {
494+
if len(epSlices) == 0 {
495+
continue
496+
}
497+
498+
result[resID] = &skipperEndpointSlice{
499+
Meta: epSlices[0].Meta,
500+
}
501+
502+
terminatingEps := make(map[string]struct{})
503+
resEps := make(map[string]*skipperEndpoint)
504+
505+
for i := range epSlices {
506+
507+
for _, ep := range epSlices[i].Endpoints {
508+
// Addresses [1..100] of the same AddressType, as kube-proxy we use the first
509+
// see also https://github.com/kubernetes/kubernetes/issues/106267
510+
address := ep.Addresses[0]
511+
if _, ok := terminatingEps[address]; ok {
512+
// already known terminating
513+
} else if ep.isTerminating() {
514+
terminatingEps[address] = struct{}{}
515+
// if we had this one with a non terminating condition,
516+
// we should delete it, because of eventual consistency
517+
// it is actually terminating
518+
delete(resEps, address)
519+
} else if ep.Conditions == nil {
520+
// if conditions are nil then we need to treat is as ready
521+
resEps[address] = &skipperEndpoint{
522+
Address: address,
523+
Zone: ep.Zone,
524+
}
525+
} else if ep.isReady() {
526+
resEps[address] = &skipperEndpoint{
527+
Address: address,
528+
Zone: ep.Zone,
529+
}
530+
}
531+
}
532+
533+
result[resID].Ports = epSlices[i].Ports
534+
}
535+
for _, o := range resEps {
536+
result[resID].Endpoints = append(result[resID].Endpoints, o)
537+
}
538+
}
539+
540+
return result, nil
541+
}
542+
458543
func (c *clusterClient) logMissingRouteGroupsOnce() {
459544
if c.loggedMissingRouteGroups {
460545
return
@@ -468,7 +553,6 @@ func (c *clusterClient) fetchClusterState() (*clusterState, error) {
468553
var (
469554
err error
470555
ingressesV1 []*definitions.IngressV1Item
471-
secrets map[definitions.ResourceID]*secret
472556
)
473557
ingressesV1, err = c.loadIngressesV1()
474558
if err != nil {
@@ -492,24 +576,32 @@ func (c *clusterClient) fetchClusterState() (*clusterState, error) {
492576
return nil, err
493577
}
494578

495-
endpoints, err := c.loadEndpoints()
496-
if err != nil {
497-
return nil, err
579+
state := &clusterState{
580+
ingressesV1: ingressesV1,
581+
routeGroups: routeGroups,
582+
services: services,
583+
cachedEndpoints: make(map[endpointID][]string),
584+
enableEndpointSlices: c.enableEndpointSlices,
585+
}
586+
587+
if c.enableEndpointSlices {
588+
state.endpointSlices, err = c.loadEndpointSlices()
589+
if err != nil {
590+
return nil, err
591+
}
592+
} else {
593+
state.endpoints, err = c.loadEndpoints()
594+
if err != nil {
595+
return nil, err
596+
}
498597
}
499598

500599
if c.certificateRegistry != nil {
501-
secrets, err = c.loadSecrets()
600+
state.secrets, err = c.loadSecrets()
502601
if err != nil {
503602
return nil, err
504603
}
505604
}
506605

507-
return &clusterState{
508-
ingressesV1: ingressesV1,
509-
routeGroups: routeGroups,
510-
services: services,
511-
endpoints: endpoints,
512-
secrets: secrets,
513-
cachedEndpoints: make(map[endpointID][]string),
514-
}, nil
606+
return state, nil
515607
}

dataclients/kubernetes/clusterstate.go

+54-21
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ import (
1111
)
1212

1313
type clusterState struct {
14-
mu sync.Mutex
15-
ingressesV1 []*definitions.IngressV1Item
16-
routeGroups []*definitions.RouteGroupItem
17-
services map[definitions.ResourceID]*service
18-
endpoints map[definitions.ResourceID]*endpoint
19-
secrets map[definitions.ResourceID]*secret
20-
cachedEndpoints map[endpointID][]string
14+
mu sync.Mutex
15+
ingressesV1 []*definitions.IngressV1Item
16+
routeGroups []*definitions.RouteGroupItem
17+
services map[definitions.ResourceID]*service
18+
endpoints map[definitions.ResourceID]*endpoint
19+
endpointSlices map[definitions.ResourceID]*skipperEndpointSlice
20+
secrets map[definitions.ResourceID]*secret
21+
cachedEndpoints map[endpointID][]string
22+
enableEndpointSlices bool
2123
}
2224

2325
func (state *clusterState) getService(namespace, name string) (*service, error) {
@@ -47,6 +49,7 @@ func (state *clusterState) getServiceRG(namespace, name string) (*service, error
4749
return s, nil
4850
}
4951

52+
// GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices.
5053
func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string {
5154
epID := endpointID{
5255
ResourceID: newResourceID(namespace, name),
@@ -60,18 +63,29 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin
6063
return cached
6164
}
6265

63-
ep, ok := state.endpoints[epID.ResourceID]
64-
if !ok {
65-
return nil
66+
var targets []string
67+
if state.enableEndpointSlices {
68+
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
69+
targets = eps.targetsByServicePort("TCP", protocol, servicePort)
70+
} else {
71+
return nil
72+
}
73+
} else {
74+
if ep, ok := state.endpoints[epID.ResourceID]; ok {
75+
targets = ep.targetsByServicePort(protocol, servicePort)
76+
} else {
77+
return nil
78+
}
6679
}
6780

68-
targets := ep.targetsByServicePort(protocol, servicePort)
6981
sort.Strings(targets)
7082
state.cachedEndpoints[epID] = targets
7183
return targets
7284
}
7385

74-
func (state *clusterState) GetEndpointsByName(namespace, name, protocol string) []string {
86+
// GetEndpointsByName returns the skipper endpoints for kubernetes endpoints or endpointslices.
87+
// This function works only correctly for endpointslices (and likely endpoints) with one port with the same protocol ("TCP", "UDP").
88+
func (state *clusterState) GetEndpointsByName(namespace, name, protocol, scheme string) []string {
7589
epID := endpointID{
7690
ResourceID: newResourceID(namespace, name),
7791
Protocol: protocol,
@@ -82,19 +96,29 @@ func (state *clusterState) GetEndpointsByName(namespace, name, protocol string)
8296
return cached
8397
}
8498

85-
ep, ok := state.endpoints[epID.ResourceID]
86-
if !ok {
87-
return nil
99+
var targets []string
100+
if state.enableEndpointSlices {
101+
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
102+
targets = eps.targets(protocol, scheme)
103+
} else {
104+
return nil
105+
}
106+
} else {
107+
if ep, ok := state.endpoints[epID.ResourceID]; ok {
108+
targets = ep.targets(scheme)
109+
} else {
110+
return nil
111+
}
88112
}
89113

90-
targets := ep.targets(protocol)
91114
sort.Strings(targets)
92115
state.cachedEndpoints[epID] = targets
93116
return targets
94117

95118
}
96119

97-
func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol string, target *definitions.BackendPort) []string {
120+
// GetEndpointsByTarget returns the skipper endpoints for kubernetes endpoints or endpointslices.
121+
func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol, scheme string, target *definitions.BackendPort) []string {
98122
epID := endpointID{
99123
ResourceID: newResourceID(namespace, name),
100124
Protocol: protocol,
@@ -107,12 +131,21 @@ func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol string
107131
return cached
108132
}
109133

110-
ep, ok := state.endpoints[epID.ResourceID]
111-
if !ok {
112-
return nil
134+
var targets []string
135+
if state.enableEndpointSlices {
136+
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
137+
targets = eps.targetsByServiceTarget(protocol, scheme, target)
138+
} else {
139+
return nil
140+
}
141+
} else {
142+
if ep, ok := state.endpoints[epID.ResourceID]; ok {
143+
targets = ep.targetsByServiceTarget(scheme, target)
144+
} else {
145+
return nil
146+
}
113147
}
114148

115-
targets := ep.targetsByServiceTarget(protocol, target)
116149
sort.Strings(targets)
117150
state.cachedEndpoints[epID] = targets
118151
return targets

0 commit comments

Comments
 (0)