diff --git a/config/config.go b/config/config.go index 16b205fb7f..6674d2a625 100644 --- a/config/config.go +++ b/config/config.go @@ -174,6 +174,9 @@ type Config struct { KubernetesRedisServiceNamespace string `yaml:"kubernetes-redis-service-namespace"` KubernetesRedisServiceName string `yaml:"kubernetes-redis-service-name"` KubernetesRedisServicePort int `yaml:"kubernetes-redis-service-port"` + KubernetesZoneAwareEnabled bool `yaml:"kubernetes-zone-aware"` + KubernetesEndpointsURL string `yaml:"kubernetes-endpoints-url"` + KubernetesPodZone string `yaml:"kubernetes-pod-zone"` KubernetesBackendTrafficAlgorithmString string `yaml:"kubernetes-backend-traffic-algorithm"` KubernetesBackendTrafficAlgorithm kubernetes.BackendTrafficAlgorithm `yaml:"-"` KubernetesDefaultLoadBalancerAlgorithm string `yaml:"kubernetes-default-lb-algorithm"` @@ -473,6 +476,11 @@ func NewConfig() *Config { flag.StringVar(&cfg.KubernetesRedisServiceNamespace, "kubernetes-redis-service-namespace", "", "Sets namespace for redis to be used to lookup endpoints") flag.StringVar(&cfg.KubernetesRedisServiceName, "kubernetes-redis-service-name", "", "Sets name for redis to be used to lookup endpoints") flag.IntVar(&cfg.KubernetesRedisServicePort, "kubernetes-redis-service-port", 6379, "Sets the port for redis to be used to lookup endpoints") + + flag.BoolVar(&cfg.KubernetesZoneAwareEnabled, "kubernetes-zone-aware", false, "Enables Kubernetes zone aware routes, requires -kubernetes-endpoints-url") + flag.StringVar(&cfg.KubernetesEndpointsURL, "kubernetes-endpoints-url", "", "Sets URL to lookup /endpoints from routesrv") + flag.StringVar(&cfg.KubernetesPodZone, "kubernetes-pod-zone", "", "Sets the Zone of the pod, you should set it via Kuibernetes donwards API to enable skipper to know in which zone it runs.") + flag.StringVar(&cfg.KubernetesBackendTrafficAlgorithmString, "kubernetes-backend-traffic-algorithm", kubernetes.TrafficPredicateAlgorithm.String(), "sets the algorithm to be used for traffic splitting between backends: traffic-predicate or traffic-segment-predicate") flag.StringVar(&cfg.KubernetesDefaultLoadBalancerAlgorithm, "kubernetes-default-lb-algorithm", kubernetes.DefaultLoadBalancerAlgorithm, "sets the default algorithm to be used for load balancing between backend endpoints, available options: roundRobin, consistentHash, random, powerOfRandomNChoices") @@ -830,6 +838,9 @@ func (c *Config) ToOptions() skipper.Options { KubernetesRedisServiceNamespace: c.KubernetesRedisServiceNamespace, KubernetesRedisServiceName: c.KubernetesRedisServiceName, KubernetesRedisServicePort: c.KubernetesRedisServicePort, + KubernetesZoneAwareEnabled: c.KubernetesZoneAwareEnabled, + KubernetesEndpointsURL: c.KubernetesEndpointsURL, + KubernetesPodZone: c.KubernetesPodZone, KubernetesBackendTrafficAlgorithm: c.KubernetesBackendTrafficAlgorithm, KubernetesDefaultLoadBalancerAlgorithm: c.KubernetesDefaultLoadBalancerAlgorithm, diff --git a/dataclients/kubernetes/clusterstate.go b/dataclients/kubernetes/clusterstate.go index 702e6beb2c..413ae4159b 100644 --- a/dataclients/kubernetes/clusterstate.go +++ b/dataclients/kubernetes/clusterstate.go @@ -19,6 +19,7 @@ type clusterState struct { endpointSlices map[definitions.ResourceID]*skipperEndpointSlice secrets map[definitions.ResourceID]*secret cachedEndpoints map[endpointID][]string + ridToEpID map[definitions.ResourceID]endpointID enableEndpointSlices bool } @@ -49,13 +50,17 @@ func (state *clusterState) getServiceRG(namespace, name string) (*service, error return s, nil } -// GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices. -func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string { - epID := endpointID{ +func getEpID(namespace, name, protocol string, servicePort *servicePort) endpointID { + return endpointID{ ResourceID: newResourceID(namespace, name), Protocol: protocol, TargetPort: servicePort.TargetPort.String(), } +} + +// GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices. +func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string { + epID := getEpID(namespace, name, protocol, servicePort) state.mu.Lock() defer state.mu.Unlock() diff --git a/dataclients/kubernetes/ingressv1.go b/dataclients/kubernetes/ingressv1.go index ed2431ed5b..ef3927b20d 100644 --- a/dataclients/kubernetes/ingressv1.go +++ b/dataclients/kubernetes/ingressv1.go @@ -100,6 +100,7 @@ func convertPathRuleV1( return nil, err } + protocol := "http" servicePort, err := svc.getServicePortV1(svcPort) if err != nil { // service definition is wrong or no pods @@ -113,7 +114,6 @@ func convertPathRuleV1( } else if forceKubernetesService { eps = []string{serviceNameBackend(svcName, ns, servicePort)} } else { - protocol := "http" if p, ok := metadata.Annotations[skipperBackendProtocolAnnotationKey]; ok { protocol = p } @@ -155,6 +155,12 @@ func convertPathRuleV1( LBAlgorithm: getLoadBalancerAlgorithm(metadata, defaultLoadBalancerAlgorithm), HostRegexps: hostRegexp, } + // zone aware + // TODO(sszuecs): lookup and store in clusterState ridToEpID + //epID := getEpID(ns, name, protocol, servicePort) + // r.Id + // ic.state. + setPathV1(pathMode, r, prule.PathType, prule.Path) traffic.apply(r) return r, nil diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 523b71fdd8..4b24c803bf 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -400,6 +400,45 @@ func mapRoutes(routes []*eskip.Route) (map[string]*eskip.Route, []*eskip.Route) return routesById, uniqueRoutes } +type EndpointsMap map[string][]*Endpoint + +type Endpoint struct { + Address string `json:"addr"` + Zone string `json:"zone"` + Port int `json:"port"` +} + +func endpointMapName(ns, name string, port int) string { + return fmt.Sprintf("%s/%s/%d", ns, name, port) +} + +func (c *Client) GetEndpointsMap() EndpointsMap { + if c.state == nil { + return nil + } + result := make(EndpointsMap) + + c.mu.Lock() + for resID, epSlice := range c.state.endpointSlices { + for _, port := range epSlice.Ports { + id := endpointMapName(resID.Namespace, resID.Name, port.Port) + eps := make([]*Endpoint, 0, len(epSlice.Endpoints)) + + for _, ep := range epSlice.Endpoints { + eps = append(eps, &Endpoint{ + Address: ep.Address, + Zone: ep.Zone, + Port: port.Port, + }) + } + result[id] = eps + } + } + c.mu.Unlock() + + return result +} + func (c *Client) loadAndConvert() ([]*eskip.Route, error) { c.mu.Lock() state, err := c.ClusterClient.fetchClusterState() diff --git a/proxy/healthy_endpoints.go b/proxy/healthy_endpoints.go index 70c0366361..8a1c9c6092 100644 --- a/proxy/healthy_endpoints.go +++ b/proxy/healthy_endpoints.go @@ -9,7 +9,6 @@ import ( type healthyEndpoints struct { rnd *rand.Rand - endpointRegistry *routing.EndpointRegistry maxUnhealthyEndpointsRatio float64 } diff --git a/proxy/proxy.go b/proxy/proxy.go index 0babad8da5..32ae3386d7 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -360,6 +360,12 @@ type Params struct { // PassiveHealthCheck defines the parameters for the healthy endpoints checker. PassiveHealthCheck *PassiveHealthCheck + + // ZoneAwareEndpoints + ZoneAwareEndpoints bool + + // Zone + Zone string } type ( @@ -438,6 +444,7 @@ type Proxy struct { registry *routing.EndpointRegistry fadein *fadeIn heathlyEndpoints *healthyEndpoints + zoneAwareEndpoints *zoneAwareEndpoints roundTripper http.RoundTripper priorityRoutes []PriorityRoute flags Flags @@ -581,6 +588,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint { endpoints := rt.LBEndpoints endpoints = p.fadein.filterFadeIn(endpoints, rt) endpoints = p.heathlyEndpoints.filterHealthyEndpoints(ctx, endpoints, p.metrics) + endpoints = p.zoneAwareEndpoints.filterZoneEndpoints(ctx, endpoints) lbctx := &routing.LBContext{ Request: ctx.request, @@ -841,10 +849,18 @@ func WithParams(p Params) *Proxy { if p.EnablePassiveHealthCheck { healthyEndpointsChooser = &healthyEndpoints{ rnd: rand.New(loadbalancer.NewLockedSource()), - endpointRegistry: p.EndpointRegistry, maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio, } } + + var zoneAwareEP *zoneAwareEndpoints + if p.ZoneAwareEndpoints { + zoneAwareEP = &zoneAwareEndpoints{ + zone: p.Zone, + endpointRegistry: p.EndpointRegistry, + } + } + return &Proxy{ routing: p.Routing, registry: p.EndpointRegistry, @@ -853,6 +869,7 @@ func WithParams(p Params) *Proxy { endpointRegistry: p.EndpointRegistry, }, heathlyEndpoints: healthyEndpointsChooser, + zoneAwareEndpoints: zoneAwareEP, roundTripper: p.CustomHttpRoundTripperWrap(tr), priorityRoutes: p.PriorityRoutes, flags: p.Flags, diff --git a/routesrv/eskipbytes.go b/routesrv/eskipbytes.go index da9f47c593..3ba4d67aa4 100644 --- a/routesrv/eskipbytes.go +++ b/routesrv/eskipbytes.go @@ -65,6 +65,9 @@ type eskipBytes struct { tracer ot.Tracer metrics metrics.Metrics now func() time.Time + + // zone aware + epMap []byte } // formatAndSet takes a slice of routes and stores them eskip-formatted @@ -111,6 +114,11 @@ func (e *eskipBytes) compressLocked(data []byte) []byte { return buf.Bytes() } +func (e *eskipBytes) endpointsGetHandler(rw http.ResponseWriter, r *http.Request) { + rw.WriteHeader(200) + rw.Write(e.epMap) +} + func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) { span := tracing.CreateSpan("serve_routes", r.Context(), e.tracer) defer span.Finish() diff --git a/routesrv/polling.go b/routesrv/polling.go index ff54136cf5..20a2a5a699 100644 --- a/routesrv/polling.go +++ b/routesrv/polling.go @@ -31,6 +31,10 @@ type poller struct { timeout time.Duration quit chan struct{} + // zone awareness + updateEndpoints func() ([]byte, error) + //epMap []byte + // Preprocessors defaultFilters *eskip.DefaultFilters oauth2Config *auth.OAuthConfig @@ -100,6 +104,13 @@ func (p *poller) poll(wg *sync.WaitGroup) { span.Finish() + epMapBytes, err := p.updateEndpoints() + if err != nil { + log.Errorf("Failed to update endpoints: %v", err) + } else { + p.b.epMap = epMapBytes + } + select { case <-p.quit: log.Info(LogPollingStopped) diff --git a/routesrv/routesrv.go b/routesrv/routesrv.go index 4c4d249a5c..de3494679b 100644 --- a/routesrv/routesrv.go +++ b/routesrv/routesrv.go @@ -2,6 +2,7 @@ package routesrv import ( "context" + "encoding/json" "fmt" "net/http" "os" @@ -77,6 +78,7 @@ func New(opts skipper.Options) (*RouteServer, error) { mux := http.NewServeMux() mux.Handle("/health", bs) mux.Handle("/routes", b) + supportHandler := http.NewServeMux() supportHandler.Handle("/metrics", metricsHandler) supportHandler.Handle("/metrics/", metricsHandler) @@ -113,6 +115,11 @@ func New(opts skipper.Options) (*RouteServer, error) { mux.Handle("/swarm/redis/shards", rh) } + if opts.KubernetesZoneAwareEnabled { + log.Infof("Expose /endpoints for zone aware traffic") + mux.HandleFunc("GET /endpoints", b.endpointsGetHandler) + } + rs.server = &http.Server{ Addr: opts.Address, Handler: mux, @@ -128,10 +135,18 @@ func New(opts skipper.Options) (*RouteServer, error) { } rs.poller = &poller{ - client: dataclient, - timeout: opts.SourcePollTimeout, - b: b, - quit: make(chan struct{}), + client: dataclient, + timeout: opts.SourcePollTimeout, + b: b, + quit: make(chan struct{}), + updateEndpoints: func() ([]byte, error) { + epMap := dataclient.GetEndpointsMap() + epMapBytes, err := json.Marshal(epMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal EndpointsMap: %w", err) + } + return epMapBytes, nil + }, defaultFilters: opts.DefaultFilters, editRoute: opts.EditRoute, cloneRoute: opts.CloneRoute, diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index 4609d7bc96..fc1af8b918 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -1,6 +1,11 @@ package routing import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" "sync" "sync/atomic" "time" @@ -8,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/net" ) const defaultLastSeenTimeout = 1 * time.Minute @@ -94,9 +100,15 @@ func newEntry() *entry { return result } +type member struct { + endpoint string + zone string +} + type EndpointRegistry struct { lastSeenTimeout time.Duration statsResetPeriod time.Duration + endpointsUpdatePeriod time.Duration minRequests int64 minHealthCheckDropProbability float64 maxHealthCheckDropProbability float64 @@ -105,6 +117,11 @@ type EndpointRegistry struct { now func() time.Time data sync.Map // map[string]*entry + + endpoints sync.Map // map[string][]*kubernetes.Endpoint + endpointsClient *net.Client + endpointsURL *url.URL + zone string } var _ PostProcessor = &EndpointRegistry{} @@ -116,6 +133,10 @@ type RegistryOptions struct { MinRequests int64 MinHealthCheckDropProbability float64 MaxHealthCheckDropProbability float64 + ZoneAwareEnabled bool + EndpointsUpdatePeriod time.Duration + EndpointsURL string + Zone string } func (r *EndpointRegistry) Do(routes []*Route) []*Route { @@ -156,6 +177,7 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { func (r *EndpointRegistry) updateStats() { ticker := time.NewTicker(r.statsResetPeriod) + defer ticker.Stop() for { r.data.Range(func(key, value any) bool { @@ -190,6 +212,114 @@ func (r *EndpointRegistry) updateStats() { } } +type EndpointsMap map[string][]*Endpoint + +type Endpoint struct { + Address string `json:"addr"` + Zone string `json:"zone"` + Port int `json:"port"` +} + +func (r *EndpointRegistry) GetEndpoints(svc string) ([]*Endpoint, bool) { + dat, ok := r.endpoints.Load(svc) + if !ok { + return nil, false + } + members, ok := dat.([]*Endpoint) + if !ok { + return nil, false + } + return members, true +} + +func (r *EndpointRegistry) fetchEndpoints() (EndpointsMap, error) { + + req, err := http.NewRequest("GET", r.endpointsURL.String(), nil) + if err != nil { + return nil, fmt.Errorf("failed to create fetch endpoints request: %w", err) + } + + rsp, err := r.endpointsClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to get fetch endpoints: %w", err) + } + defer rsp.Body.Close() + + // TODO(sszuecs): whatever we have in routesrv as API response we need to adapt here + buf, err := io.ReadAll(rsp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read body: %w", err) + } + log.Debugf("Go %s", buf) + + var result EndpointsMap + err = json.Unmarshal(buf, &result) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal into kubernetes.EndpointsMap: %w", err) + } + return result, nil +} + +func (r *EndpointRegistry) updateEndpoints() { + ticker := time.NewTicker(r.endpointsUpdatePeriod) + defer ticker.Stop() + + for { + mapMembers, err := r.fetchEndpoints() + if err != nil { + log.Errorf("Failed to fetch endpoints: %v", err) + } else { + active := make(map[string]struct{}) + log.Infof("Got %d members", len(mapMembers)) + + // create/update + for svc, members := range mapMembers { + active[svc] = struct{}{} + for _, member := range members { + log.Debugf("service: %s, with addr: %s, in zone: %s, with port: %d", svc, member.Address, member.Zone, member.Port) + } + + // constraint: to make sense we have >=2 zones and 3 pods in each as minimum + if len(members) >= 6 { + filteredMembers := make([]*Endpoint, 0) + for _, member := range members { + if member.Zone == r.zone { + filteredMembers = append(filteredMembers, member) + } + } + // constraint: 3 pods in zone + if len(filteredMembers) >= 3 { + r.endpoints.Store(svc, filteredMembers) + log.Infof("Endpointregistry zone aware service: %s with %d endpoints in zone: %s", svc, len(filteredMembers), r.zone) + } else { + log.Infof("Endpointregistry zone aware service fallback to all zones: %s too few 3 >= %d endpoints found in zone %s", svc, len(filteredMembers), r.zone) + r.endpoints.Store(svc, members) + } + + } else { + log.Infof("Endpointregistry zone aware service fallback to all zones: %s too few 6 >= %d endpoints found", svc, len(members)) + r.endpoints.Store(svc, members) + } + } + + // cleanup endpoints + r.endpoints.Range(func(a, _ any) bool { + svc := a.(string) + if _, ok := active[svc]; !ok { + r.endpoints.Delete(svc) + } + return true + }) + } + + select { + case <-r.quit: + return + case <-ticker.C: + } + } +} + func (r *EndpointRegistry) Close() { close(r.quit) } @@ -199,12 +329,24 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { o.LastSeenTimeout = defaultLastSeenTimeout } + var epURL *url.URL + if u, err := url.Parse(o.EndpointsURL); err != nil { + log.Errorf("Failed to parse %q: %v", o.EndpointsURL, err) + log.Infof("Disable zone awareness endpoint updating") + o.ZoneAwareEnabled = false + } else { + epURL = u + } + registry := &EndpointRegistry{ lastSeenTimeout: o.LastSeenTimeout, statsResetPeriod: o.StatsResetPeriod, minRequests: o.MinRequests, minHealthCheckDropProbability: o.MinHealthCheckDropProbability, maxHealthCheckDropProbability: o.MaxHealthCheckDropProbability, + endpointsURL: epURL, + endpointsUpdatePeriod: 5 * time.Second, + zone: o.Zone, quit: make(chan struct{}), @@ -214,6 +356,15 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { if o.PassiveHealthCheckEnabled { go registry.updateStats() } + if o.ZoneAwareEnabled { + registry.endpointsClient = net.NewClient(net.Options{ + MaxIdleConnsPerHost: 3, + MaxIdleConns: 10, + IdleConnTimeout: 10 * time.Second, + }) + + go registry.updateEndpoints() + } return registry } diff --git a/skipper.go b/skipper.go index bce6097b9d..66433dd900 100644 --- a/skipper.go +++ b/skipper.go @@ -286,6 +286,15 @@ type Options struct { // KubernetesRedisServicePort to be used to lookup ring shards dynamically KubernetesRedisServicePort int + // KubernetesZoneAwareEnabled + KubernetesZoneAwareEnabled bool + + // KubernetesEndpointsURL + KubernetesEndpointsURL string + + // KubernetesPodZone + KubernetesPodZone string + // KubernetesForceService overrides the default Skipper functionality to route traffic using Kubernetes Endpoints, // instead using Kubernetes Services. KubernetesForceService bool @@ -1992,6 +2001,10 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { MinRequests: passiveHealthCheck.MinRequests, MinHealthCheckDropProbability: passiveHealthCheck.MinDropProbability, MaxHealthCheckDropProbability: passiveHealthCheck.MaxDropProbability, + ZoneAwareEnabled: o.KubernetesZoneAwareEnabled, + EndpointsUpdatePeriod: 10 * time.Second, + EndpointsURL: o.KubernetesEndpointsURL, + Zone: o.KubernetesPodZone, }) ro := routing.Options{ FilterRegistry: o.filterRegistry(),