diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index ad18dd18a..d9ab1e0ea 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -67,7 +67,7 @@ func NewControllers( ) []controller.Controller { cluster := state.NewCluster(clock, kubeClient) - p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster) + p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock) evictionQueue := terminator.NewQueue(kubeClient, recorder) disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p) diff --git a/pkg/controllers/disruption/orchestration/suite_test.go b/pkg/controllers/disruption/orchestration/suite_test.go index fcbd12e7a..bad7f7fe5 100644 --- a/pkg/controllers/disruption/orchestration/suite_test.go +++ b/pkg/controllers/disruption/orchestration/suite_test.go @@ -80,7 +80,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock) queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) }) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 0cb929ae5..44e2adc69 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -92,7 +92,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) recorder = test.NewEventRecorder() - prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock) queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue) }) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 59e2fe0b9..5bf01d161 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -83,10 +84,12 @@ type Provisioner struct { cluster *state.Cluster recorder events.Recorder cm *pretty.ChangeMonitor + clock clock.Clock } func NewProvisioner(kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster, + clock clock.Clock, ) *Provisioner { p := &Provisioner{ batcher: NewBatcher(), @@ -96,6 +99,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, cluster: cluster, recorder: recorder, cm: pretty.NewChangeMonitor(), + clock: clock, } return p } @@ -302,7 +306,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat if err != nil { return nil, fmt.Errorf("getting daemon pods, %w", err) } - return scheduler.NewScheduler(p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder), nil + return scheduler.NewScheduler(p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil } func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 1bdda17ab..0a9a8348f 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "sort" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" @@ -29,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog/v2" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -47,7 +49,7 @@ import ( func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool, cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology, instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*corev1.Pod, - recorder events.Recorder) *Scheduler { + recorder events.Recorder, clock clock.Clock) *Scheduler { // if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint // during preference relaxation @@ -74,6 +76,7 @@ func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool, remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) { return np.Name, corev1.ResourceList(np.Spec.Limits) }), + clock: clock, } s.calculateExistingNodeClaims(stateNodes, daemonSetPods) return s @@ -92,6 +95,7 @@ type Scheduler struct { cluster *state.Cluster recorder events.Recorder kubeClient client.Client + clock clock.Clock } // Results contains the results of the scheduling operation @@ -206,10 +210,19 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { errors := map[*corev1.Pod]error{} QueueDepth.DeletePartialMatch(prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx)}) // Reset the metric for the controller, so we don't keep old ids around q := NewQueue(pods...) + + startTime := s.clock.Now() + lastLogTime := s.clock.Now() + batchSize := len(q.pods) for { QueueDepth.With( prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}, ).Set(float64(len(q.pods))) + + if s.clock.Since(lastLogTime) > time.Minute { + log.FromContext(ctx).WithValues("pods-scheduled", batchSize-len(q.pods), "pods-remaining", len(q.pods), "duration", s.clock.Since(startTime).Truncate(time.Second), "scheduling-id", string(s.id)).Info("computing pod scheduling...") + lastLogTime = s.clock.Now() + } // Try the next pod pod, ok := q.Pop() if !ok { diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 1e4ab2540..507d9e111 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -166,7 +166,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { client := fakecr.NewFakeClient() pods := makeDiversePods(podCount) - cluster = state.NewCluster(&clock.RealClock{}, client) + clock := &clock.RealClock{} + cluster = state.NewCluster(clock, client) domains := map[string]sets.Set[string]{} topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods) if err != nil { @@ -176,7 +177,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { scheduler := scheduling.NewScheduler(client, []*v1.NodePool{nodePool}, cluster, nil, topology, map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil, - events.NewRecorder(&record.FakeRecorder{})) + events.NewRecorder(&record.FakeRecorder{}), clock) b.ResetTimer() // Pack benchmark diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index dab8a6d03..b494a67f3 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -98,7 +98,7 @@ var _ = BeforeSuite(func() { nodeStateController = informer.NewNodeController(env.Client, cluster) nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster) podStateController = informer.NewPodController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock) }) var _ = AfterSuite(func() { diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 1d0530b5c..2461fdf8b 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -79,7 +79,7 @@ var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) cluster = state.NewCluster(fakeClock, env.Client) nodeController = informer.NewNodeController(env.Client, cluster) - prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) + prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock) daemonsetController = informer.NewDaemonSetController(env.Client, cluster) instanceTypes, _ := cloudProvider.GetInstanceTypes(ctx, nil) instanceTypeMap = map[string]*cloudprovider.InstanceType{}