Skip to content

Commit

Permalink
Use actual pods from K8S Informer for scaling rate (knative#3055)
Browse files Browse the repository at this point in the history
* refactor where we use rate

* extract concurrencyPerPod

* use approximateZero

* use pods count from Informer

* get pods when record

* use 1 as min actual pod

* add unit test

* add unit tests for cmd

* lint

* address comments

* remove unuse func

* short locked scope

* address comment

* wrap func into func

* address comments

* add dot

* revert handle file

* exclude test coverage check for main files

* clean cache in tests

* removed cache

* remove extral func call
  • Loading branch information
yanweiguo authored and knative-prow-robot committed Feb 8, 2019
1 parent 507454e commit 6ff096f
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 67 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
/pkg/**/testing/** coverage-excluded=true
/vendor/** coverage-excluded=true
/test/** coverage-excluded=true
/cmd/**/main.go coverage-excluded=true
31 changes: 21 additions & 10 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"fmt"
"log"
"time"

Expand All @@ -41,6 +42,7 @@ import (
"k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
Expand Down Expand Up @@ -123,8 +125,6 @@ func main() {
// Watch the autoscaler config map and dynamically update autoscaler config.
configMapWatcher.Watch(autoscaler.ConfigName, dynConfig.Update)

multiScaler := autoscaler.NewMultiScaler(dynConfig, stopCh, uniScalerFactory, logger)

opt := reconciler.Options{
KubeClientSet: kubeClientSet,
ServingClientSet: servingClientSet,
Expand All @@ -138,6 +138,8 @@ func main() {
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
hpaInformer := kubeInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers()

// uniScalerFactory depends endpointsInformer to be set.
multiScaler := autoscaler.NewMultiScaler(dynConfig, stopCh, uniScalerFactoryFunc(endpointsInformer), logger)
kpaScaler := kpa.NewKPAScaler(servingClientSet, scaleClient, logger, configMapWatcher)
kpaCtl := kpa.NewController(&opt, paInformer, endpointsInformer, multiScaler, kpaScaler, dynConfig)
hpaCtl := hpa.NewController(&opt, paInformer, hpaInformer)
Expand Down Expand Up @@ -215,15 +217,24 @@ func buildRESTMapper(kubeClientSet kubernetes.Interface, stopCh <-chan struct{})
return rm
}

func uniScalerFactory(metric *autoscaler.Metric, dynamicConfig *autoscaler.DynamicConfig) (autoscaler.UniScaler, error) {
// Create a stats reporter which tags statistics by PA namespace, configuration name, and PA name.
reporter, err := autoscaler.NewStatsReporter(metric.Namespace,
labelValueOrEmpty(metric, serving.ServiceLabelKey), labelValueOrEmpty(metric, serving.ConfigurationLabelKey), metric.Name)
if err != nil {
return nil, err
}
func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) func(metric *autoscaler.Metric, dynamicConfig *autoscaler.DynamicConfig) (autoscaler.UniScaler, error) {
return func(metric *autoscaler.Metric, dynamicConfig *autoscaler.DynamicConfig) (autoscaler.UniScaler, error) {
// Create a stats reporter which tags statistics by PA namespace, configuration name, and PA name.
reporter, err := autoscaler.NewStatsReporter(metric.Namespace,
labelValueOrEmpty(metric, serving.ServiceLabelKey), labelValueOrEmpty(metric, serving.ConfigurationLabelKey), metric.Name)
if err != nil {
return nil, err
}

return autoscaler.New(dynamicConfig, metric.Spec.TargetConcurrency, reporter), nil
revName := metric.Labels[serving.RevisionLabelKey]
if revName == "" {
return nil, fmt.Errorf("No Revision label found in Metric: %v", metric)
}

return autoscaler.New(dynamicConfig, metric.Namespace,
reconciler.GetServingK8SServiceNameForObj(revName), endpointsInformer,
metric.Spec.TargetConcurrency, reporter)
}
}

func labelValueOrEmpty(metric *autoscaler.Metric, labelKey string) string {
Expand Down
46 changes: 46 additions & 0 deletions cmd/autoscaler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@ package main
import (
"testing"

"github.com/knative/serving/pkg/apis/serving"
"github.com/knative/serving/pkg/autoscaler"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
fakeK8s "k8s.io/client-go/kubernetes/fake"
)

const (
testNamespace = "test-namespace"
testRevision = "test-Revision"
)

func TestLabelValueOrEmpty(t *testing.T) {
Expand Down Expand Up @@ -53,3 +62,40 @@ func TestLabelValueOrEmpty(t *testing.T) {
})
}
}

func TestUniScalerFactoryFunc(t *testing.T) {
uniScalerFactory := getTestUniScalerFactory()
metric := &autoscaler.Metric{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testRevision,
Labels: map[string]string{serving.RevisionLabelKey: testRevision},
},
}
dynamicConfig := &autoscaler.DynamicConfig{}

if _, err := uniScalerFactory(metric, dynamicConfig); err != nil {
t.Errorf("got error from uniScalerFactory: %v", err)
}
}

func TestUniScalerFactoryFunc_FailWhenRevisionLabelMissing(t *testing.T) {
uniScalerFactory := getTestUniScalerFactory()
metric := &autoscaler.Metric{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testRevision,
},
}
dynamicConfig := &autoscaler.DynamicConfig{}

if _, err := uniScalerFactory(metric, dynamicConfig); err == nil {
t.Errorf("expected error when revision label missing but got none")
}
}

func getTestUniScalerFactory() func(metric *autoscaler.Metric, dynamicConfig *autoscaler.DynamicConfig) (autoscaler.UniScaler, error) {
kubeClient := fakeK8s.NewSimpleClientset()
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, 0)
return uniScalerFactoryFunc(kubeInformer.Core().V1().Endpoints())
}
169 changes: 113 additions & 56 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package autoscaler

import (
"context"
"errors"
"math"
"strings"
"sync"
"time"

"github.com/knative/pkg/logging"
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
)

const (
Expand Down Expand Up @@ -199,25 +204,44 @@ func (agg *perPodAggregation) podWeight(now time.Time) float64 {
// Autoscaler stores current state of an instance of an autoscaler
type Autoscaler struct {
*DynamicConfig
key string
target float64
stats map[statKey]Stat
statsMutex sync.Mutex
panicking bool
panicTime *time.Time
maxPanicPods float64
reporter StatsReporter
targetMutex sync.RWMutex
key string
namespace string
revisionService string
endpointsLister corev1listers.EndpointsLister
panicking bool
panicTime *time.Time
maxPanicPods float64
reporter StatsReporter

// targetMutex guards the elements in the block below.
targetMutex sync.RWMutex
target float64

// statsMutex guards the elements in the block below.
statsMutex sync.Mutex
stats map[statKey]Stat
}

// New creates a new instance of autoscaler
func New(dynamicConfig *DynamicConfig, target float64, reporter StatsReporter) *Autoscaler {
return &Autoscaler{
DynamicConfig: dynamicConfig,
target: target,
stats: make(map[statKey]Stat),
reporter: reporter,
func New(
dynamicConfig *DynamicConfig,
namespace string,
revisionService string,
endpointsInformer corev1informers.EndpointsInformer,
target float64,
reporter StatsReporter) (*Autoscaler, error) {
if endpointsInformer == nil {
return nil, errors.New("Empty interface of EndpointsInformer")
}
return &Autoscaler{
DynamicConfig: dynamicConfig,
namespace: namespace,
revisionService: revisionService,
endpointsLister: endpointsInformer.Lister(),
target: target,
stats: make(map[statKey]Stat),
reporter: reporter,
}, nil
}

// Update reconfigures the UniScaler according to the MetricSpec.
Expand All @@ -235,6 +259,7 @@ func (a *Autoscaler) Record(ctx context.Context, stat Stat) {
logger.Errorf("Missing time from stat: %+v", stat)
return
}

a.statsMutex.Lock()
defer a.statsMutex.Unlock()

Expand All @@ -249,46 +274,15 @@ func (a *Autoscaler) Record(ctx context.Context, stat Stat) {
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
logger := logging.FromContext(ctx)

a.targetMutex.RLock()
defer a.targetMutex.RUnlock()

a.statsMutex.Lock()
defer a.statsMutex.Unlock()
readyPods, err := a.readyPods()
if err != nil {
logger.Errorw("Failed to get Endpoints via K8S Lister", zap.Error(err))
return 0, false
}

config := a.Current()

// 60 second window
stableData := newTotalAggregation(config.StableWindow)

// 6 second window
panicData := newTotalAggregation(config.PanicWindow)

// Last stat per Pod
lastStat := make(map[string]Stat)

// accumulate stats into their respective buckets
for key, stat := range a.stats {
instant := key.time
if instant.Add(config.PanicWindow).After(now) {
panicData.aggregate(stat)
}
if instant.Add(config.StableWindow).After(now) {
stableData.aggregate(stat)

// If there's no last stat for this pod, set it
if _, ok := lastStat[stat.PodName]; !ok {
lastStat[stat.PodName] = stat
}
// If the current last stat is older than the new one, override
if lastStat[stat.PodName].Time.Before(*stat.Time) {
lastStat[stat.PodName] = stat
}
} else {
// Drop metrics after 60 seconds
delete(a.stats, key)
}
}

stableData, panicData, lastStat := a.aggregateData(now, config.StableWindow, config.PanicWindow)
observedStablePods := stableData.observedPods(now)
// Do nothing when we have no data.
if observedStablePods < 1.0 {
Expand All @@ -310,15 +304,17 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
observedPanicConcurrency := panicData.observedConcurrency(now)
observedStableConcurrencyPerPod := stableData.observedConcurrencyPerPod(now)
observedPanicConcurrencyPerPod := panicData.observedConcurrencyPerPod(now)

target := a.targetConcurrency()
// Desired pod count is observed concurrency of revision over desired (stable) concurrency per pod.
// The scaling up rate limited to within MaxScaleUpRate.
desiredStablePodCount := a.podCountLimited(observedStableConcurrency/a.target, observedStablePods)
desiredPanicPodCount := a.podCountLimited(observedPanicConcurrency/a.target, observedStablePods)
desiredStablePodCount := a.podCountLimited(observedStableConcurrency/target, readyPods)
desiredPanicPodCount := a.podCountLimited(observedPanicConcurrency/target, readyPods)

a.reporter.ReportObservedPodCount(observedStablePods)
a.reporter.ReportStableRequestConcurrency(observedStableConcurrencyPerPod)
a.reporter.ReportPanicRequestConcurrency(observedPanicConcurrencyPerPod)
a.reporter.ReportTargetRequestConcurrency(a.target)
a.reporter.ReportTargetRequestConcurrency(target)

logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.",
observedStableConcurrencyPerPod, config.StableWindow, stableData.probeCount, observedStablePods)
Expand All @@ -334,7 +330,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
}

// Begin panicking when we cross the 6 second concurrency threshold.
if !a.panicking && observedPanicPods > 0.0 && observedPanicConcurrencyPerPod >= (a.target*2) {
if !a.panicking && observedPanicPods > 0.0 && observedPanicConcurrencyPerPod >= (target*2) {
logger.Info("PANICKING")
a.panicking = true
a.panicTime = &now
Expand All @@ -361,10 +357,71 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
return desiredPodCount, true
}

func (a *Autoscaler) aggregateData(now time.Time, stableWindow, panicWindow time.Duration) (*totalAggregation, *totalAggregation, map[string]Stat) {
a.statsMutex.Lock()
defer a.statsMutex.Unlock()

// 60 second window
stableData := newTotalAggregation(stableWindow)

// 6 second window
panicData := newTotalAggregation(panicWindow)

// Last stat per Pod
lastStat := make(map[string]Stat)

// Accumulate stats into their respective buckets
for key, stat := range a.stats {
instant := key.time
if instant.Add(panicWindow).After(now) {
panicData.aggregate(stat)
}
if instant.Add(stableWindow).After(now) {
stableData.aggregate(stat)

// If there's no last stat for this pod, set it
if _, ok := lastStat[stat.PodName]; !ok {
lastStat[stat.PodName] = stat
} else if lastStat[stat.PodName].Time.Before(*stat.Time) {
// If the current last stat is older than the new one, override
lastStat[stat.PodName] = stat
}
} else {
// Drop metrics after 60 seconds
delete(a.stats, key)
}
}
return stableData, panicData, lastStat
}

func (a *Autoscaler) targetConcurrency() float64 {
a.targetMutex.RLock()
defer a.targetMutex.RUnlock()
return a.target
}

func (a *Autoscaler) podCountLimited(desiredPodCount, currentPodCount float64) float64 {
return math.Min(desiredPodCount, a.Current().MaxScaleUpRate*currentPodCount)
}

func (a *Autoscaler) readyPods() (float64, error) {
readyPods := 0
endpoints, err := a.endpointsLister.Endpoints(a.namespace).Get(a.revisionService)
if apierrors.IsNotFound(err) {
// Treat not found as zero endpoints, it either hasn't been created
// or it has been torn down.
} else if err != nil {
return 0, err
} else {
for _, es := range endpoints.Subsets {
readyPods += len(es.Addresses)
}
}

// Use 1 as minimum for multiplication and division.
return math.Max(1, float64(readyPods)), nil
}

func isActivator(podName string) bool {
// TODO(#2282): This can cause naming collisions.
return strings.HasPrefix(podName, ActivatorPodName)
Expand Down
Loading

0 comments on commit 6ff096f

Please sign in to comment.