From 9062a82707f18f2fe12242956a6c7bcc075f04e4 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Mon, 2 Dec 2024 15:18:07 +1100 Subject: [PATCH] Add job watcher --- .buildkite/rbac.yaml | 6 + .../agent-stack-k8s/templates/rbac.yaml.tpl | 7 + cmd/controller/controller.go | 5 + cmd/controller/controller_test.go | 1 + examples/config.yaml | 1 + internal/controller/config/config.go | 2 + internal/controller/controller.go | 13 + internal/controller/scheduler/job_watcher.go | 271 ++++++++++++++++++ internal/controller/scheduler/metrics.go | 74 ++++- .../fixtures/missing-service-account.yaml | 12 + internal/integration/integration_test.go | 15 + 11 files changed, 405 insertions(+), 2 deletions(-) create mode 100644 internal/controller/scheduler/job_watcher.go create mode 100644 internal/integration/fixtures/missing-service-account.yaml diff --git a/.buildkite/rbac.yaml b/.buildkite/rbac.yaml index 3fc6cf70..cac0aec0 100644 --- a/.buildkite/rbac.yaml +++ b/.buildkite/rbac.yaml @@ -34,6 +34,12 @@ rules: - pods/eviction verbs: - create + - apiGroups: + - "" + resources: + - events + verbs: + - list --- apiVersion: v1 kind: ServiceAccount diff --git a/charts/agent-stack-k8s/templates/rbac.yaml.tpl b/charts/agent-stack-k8s/templates/rbac.yaml.tpl index e50196b1..703abcf1 100644 --- a/charts/agent-stack-k8s/templates/rbac.yaml.tpl +++ b/charts/agent-stack-k8s/templates/rbac.yaml.tpl @@ -13,6 +13,7 @@ rules: - watch - create - update + - delete - apiGroups: - "" resources: @@ -33,6 +34,12 @@ rules: - pods/eviction verbs: - create + - apiGroups: + - "" + resources: + - events + verbs: + - list --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index ecb6fad1..370ba057 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -110,6 +110,11 @@ func AddConfigFlags(cmd *cobra.Command) { config.DefaultJobCancelCheckerPollInterval, "Controls the interval between job state queries while a pod is still Pending", ) + cmd.Flags().Duration( + "empty-job-grace-period", + config.DefaultEmptyJobGracePeriod, + "Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)", + ) cmd.Flags().Bool( "prohibit-kubernetes-plugin", false, diff --git a/cmd/controller/controller_test.go b/cmd/controller/controller_test.go index 6e2dda64..e4fdb1c3 100644 --- a/cmd/controller/controller_test.go +++ b/cmd/controller/controller_test.go @@ -26,6 +26,7 @@ func TestReadAndParseConfig(t *testing.T) { JobTTL: 300 * time.Second, ImagePullBackOffGracePeriod: 60 * time.Second, JobCancelCheckerPollInterval: 10 * time.Second, + EmptyJobGracePeriod: 50 * time.Second, PollInterval: 5 * time.Second, StaleJobDataTimeout: 10 * time.Second, JobCreationConcurrency: 5, diff --git a/examples/config.yaml b/examples/config.yaml index 21b6d725..b941a6ee 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -4,6 +4,7 @@ image: my.registry.dev/buildkite-agent:latest job-ttl: 5m image-pull-backoff-grace-period: 60s job-cancel-checker-poll-interval: 10s +empty-job-grace-period: 50s poll-interval: 5s stale-job-data-timeout: 10s job-creation-concurrency: 5 diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index 167a2068..83a1c589 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -18,6 +18,7 @@ const ( DefaultStaleJobDataTimeout = 10 * time.Second DefaultImagePullBackOffGracePeriod = 30 * time.Second DefaultJobCancelCheckerPollInterval = 5 * time.Second + DefaultEmptyJobGracePeriod = 30 * time.Second DefaultJobCreationConcurrency = 5 ) @@ -51,6 +52,7 @@ type Config struct { PodSpecPatch *corev1.PodSpec `json:"pod-spec-patch" validate:"omitempty"` ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"` JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"` + EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"` // WorkspaceVolume allows supplying a volume for /workspace. By default // an EmptyDir volume is created for it. diff --git a/internal/controller/controller.go b/internal/controller/controller.go index f830210b..3cca6362 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -136,7 +136,20 @@ func Run( logger.Fatal("failed to register completions informer", zap.Error(err)) } + // JobWatcher watches for jobs in bad conditions to clean up: + // * Jobs that fail without ever creating a pod + // * Jobs that stall forever without ever creating a pod + jobWatcher := scheduler.NewJobWatcher( + logger.Named("jobWatcher"), + k8sClient, + cfg, + ) + if err := jobWatcher.RegisterInformer(ctx, informerFactory); err != nil { + logger.Fatal("failed to register jobWatcher informer", zap.Error(err)) + } + // PodWatcher watches for other conditions to clean up pods: + // * Pods where an init container failed for any reason // * Pods where a container is in ImagePullBackOff for too long // * Pods that are still pending, but the Buildkite job has been cancelled podWatcher := scheduler.NewPodWatcher( diff --git a/internal/controller/scheduler/job_watcher.go b/internal/controller/scheduler/job_watcher.go new file mode 100644 index 00000000..031902cc --- /dev/null +++ b/internal/controller/scheduler/job_watcher.go @@ -0,0 +1,271 @@ +package scheduler + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/model" + + "github.com/jedib0t/go-pretty/v6/table" + "go.uber.org/zap" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/duration" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" +) + +// jobWatcher watches k8s jobs for failure to start a pod. The corresponding +// Buildkite job is failed with an error message if this happens. Also, if such +// a k8s job doesn't enter a terminal state on its own, jobWatcher sets a +// deadline so that it is cleaned up. +type jobWatcher struct { + // Logs go here + logger *zap.Logger + + k8s kubernetes.Interface + cfg *config.Config + + // Tracks stalling jobs (jobs that have yet to create pods). + stallingJobsMu sync.Mutex + stallingJobs map[*batchv1.Job]struct{} + + // This is the context passed to RegisterInformer. + // It's being stored here (grrrr!) because the k8s ResourceEventHandler + // interface doesn't have context args. (Working around an interface in a + // library outside of our control is a carve-out from the usual rule.) + // The context is needed to ensure goroutines are cleaned up. + resourceEventHandlerCtx context.Context +} + +// NewJobWatcher creates a JobWatcher. +func NewJobWatcher(logger *zap.Logger, k8sClient kubernetes.Interface, cfg *config.Config) *jobWatcher { + w := &jobWatcher{ + logger: logger, + k8s: k8sClient, + cfg: cfg, + stallingJobs: make(map[*batchv1.Job]struct{}), + } + jobsStallingGaugeFunc = func() int { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + return len(w.stallingJobs) + } + return w +} + +// RegisterInformer registers the limiter to listen for Kubernetes job events. +func (w *jobWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { + informer := factory.Batch().V1().Jobs() + jobInformer := informer.Informer() + if _, err := jobInformer.AddEventHandler(w); err != nil { + return err + } + w.resourceEventHandlerCtx = ctx // See field comment + go factory.Start(ctx.Done()) + // No need to wait for cache sync here. These are cleanup tasks, not + // barriers to prevent creating new jobs. + go w.stalledJobChecker(ctx) + return nil +} + +// OnAdd is called by k8s to inform us a resource is added. +func (w *jobWatcher) OnAdd(obj any, _ bool) { + jobWatcherOnAddEventCounter.Inc() + kjob, _ := obj.(*batchv1.Job) + if kjob == nil { + return + } + // Same logic whether we are considering pre-existing jobs, or new jobs. + w.runChecks(w.resourceEventHandlerCtx, kjob) +} + +// OnUpdate is called by k8s to inform us a resource is updated. +func (w *jobWatcher) OnUpdate(_, curr any) { + jobWatcherOnUpdateEventCounter.Inc() + kjob, _ := curr.(*batchv1.Job) + if kjob == nil { + return + } + // Same logic whether or not anything relevant changed about the job. + w.runChecks(w.resourceEventHandlerCtx, kjob) +} + +// OnDelete is called by k8s to inform us a resource is deleted. +func (w *jobWatcher) OnDelete(prev any) { + jobWatcherOnDeleteEventCounter.Inc() + kjob, _ := prev.(*batchv1.Job) + if kjob == nil { + return + } + w.removeFromStalling(kjob) + + // TODO: consider catching jobs that were deleted manually? +} + +func (w *jobWatcher) runChecks(ctx context.Context, kjob *batchv1.Job) { + log := loggerForObject(w.logger, kjob) + + if model.JobFinished(kjob) { + w.removeFromStalling(kjob) + w.checkFinishedWithoutPod(ctx, log, kjob) + } else { + w.checkStalledWithoutPod(log, kjob) + } +} + +func (w *jobWatcher) checkFinishedWithoutPod(ctx context.Context, log *zap.Logger, kjob *batchv1.Job) { + log.Debug("Checking job for finishing without a pod") + + // If the job is finished, there should be one finished pod. + if kjob.Status.Failed+kjob.Status.Succeeded > 0 { + // All's well with the world. + return + } + + jobWatcherFinishedWithoutPodCounter.Inc() + + // Because no pod has been created, the agent hasn't started. + // We can acquire the Buildkite job and fail it ourselves. + log.Info("The Kubernetes job ended without starting a pod. Failing the corresponding Buildkite job") + w.fetchEventsAndFailJob(ctx, log, kjob, "The Kubernetes job ended without starting a pod.\n") +} + +func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) { + log.Debug("Checking job for stalling without a pod") + + // If the job is not finished and there is no pod, it should start one + // before too long. Otherwise the job is stalled. + pods := kjob.Status.Active + kjob.Status.Failed + kjob.Status.Succeeded + // Ready and Terminating are subsets of Active (I think) + if utp := kjob.Status.UncountedTerminatedPods; utp != nil { + pods += int32(len(utp.Succeeded)) + pods += int32(len(utp.Failed)) + } + if pods > 0 { + // All's well with the world. + w.removeFromStalling(kjob) + return + } + + if kjob.Status.StartTime == nil { + // the _job_ hasn't even started? + return + } + + w.addToStalling(kjob) +} + +func (w *jobWatcher) fetchEventsAndFailJob(ctx context.Context, log *zap.Logger, kjob *batchv1.Job, message string) { + // List the events for the job, which might contain useful info for + // diagnosing the problem. + events := w.k8s.CoreV1().Events(w.cfg.Namespace) + evlist, err := events.List(ctx, metav1.ListOptions{ + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.kind", "Job"), + fields.OneTermEqualSelector("involvedObject.name", kjob.Name), + ).String(), + }) + if err != nil { + log.Error("Couldn't get events for job", zap.Error(err)) + message = fmt.Sprintf("%s\nCouldn't get events for job %s: %v", message, kjob.Name, err) + } + if evlist != nil { + message += "\n" + w.formatEvents(evlist) + } + + if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, kjob, message); err != nil { + // Maybe the job was cancelled in the meantime? + log.Error("Could not fail Buildkite job", zap.Error(err)) + jobWatcherBuildkiteJobFailErrorsCounter.Inc() + return + } + jobWatcherBuildkiteJobFailsCounter.Inc() +} + +func (w *jobWatcher) formatEvents(evlist *corev1.EventList) string { + if len(evlist.Items) == 0 { + return "Events: none" + } + + tw := table.NewWriter() + tw.SetStyle(table.StyleColoredDark) + tw.AppendHeader(table.Row{"LAST EVENT", "REPEATED", "TYPE", "REASON", "MESSAGE"}) + tw.AppendSeparator() + for _, event := range evlist.Items { + if event.Series == nil { + tw.AppendRow(table.Row{event.EventTime.Time, "-", event.Type, event.Reason, event.Message}) + continue + } + lastTime := event.Series.LastObservedTime.Time + firstToLast := duration.HumanDuration(lastTime.Sub(event.EventTime.Time)) + countMsg := fmt.Sprintf("x%d over %s", event.Series.Count, firstToLast) + tw.AppendRow(table.Row{lastTime, countMsg, event.Type, event.Reason, event.Message}) + } + return tw.Render() +} + +func (w *jobWatcher) addToStalling(kjob *batchv1.Job) { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + w.stallingJobs[kjob] = struct{}{} +} + +func (w *jobWatcher) removeFromStalling(kjob *batchv1.Job) { + w.stallingJobsMu.Lock() + defer w.stallingJobsMu.Unlock() + delete(w.stallingJobs, kjob) +} + +func (w *jobWatcher) stalledJobChecker(ctx context.Context) { + ticker := time.Tick(time.Second) + for { + select { + case <-ctx.Done(): + return + + case <-ticker: + // continue below + } + + // Gather stalled jobs + var stalled []*batchv1.Job + w.stallingJobsMu.Lock() + for kjob := range w.stallingJobs { + if time.Since(kjob.Status.StartTime.Time) >= w.cfg.EmptyJobGracePeriod { + stalled = append(stalled, kjob) + delete(w.stallingJobs, kjob) + } + } + w.stallingJobsMu.Unlock() + + jobWatcherStalledWithoutPodCounter.Add(float64(len(stalled))) + + // Fail BK jobs and delete k8s jobs. + jobs := w.k8s.BatchV1().Jobs(w.cfg.Namespace) + for _, kjob := range stalled { + // Fail the BK job. This lists events for the job, so delete it afterwards. + log := loggerForObject(w.logger, kjob) + stallDuration := duration.HumanDuration(time.Since(kjob.Status.StartTime.Time)) + message := fmt.Sprintf("The Kubernetes job spent %s without starting a pod.\n", stallDuration) + w.fetchEventsAndFailJob(ctx, log, kjob, message) + + if err := jobs.Delete(ctx, kjob.Name, metav1.DeleteOptions{ + PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), + }); err != nil { + jobWatcherJobCleanupErrorsCounter.WithLabelValues(string(kerrors.ReasonForError(err))).Inc() + w.logger.Error("failed to delete stalled job", zap.Error(err)) + continue + } + jobWatcherJobCleanupsCounter.Inc() + } + } +} diff --git a/internal/controller/scheduler/metrics.go b/internal/controller/scheduler/metrics.go index 49e7d542..1b346816 100644 --- a/internal/controller/scheduler/metrics.go +++ b/internal/controller/scheduler/metrics.go @@ -53,6 +53,76 @@ var ( }) ) +// Job watcher metrics +var ( + // Overridden by NewJobWatcher + jobsStallingGaugeFunc = func() int { return 0 } + + _ = promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "num_stalling_jobs", + Help: "Current number of jobs that are running but have no pods", + }, func() float64 { return float64(jobsStallingGaugeFunc()) }) + + jobWatcherOnAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "onadd_events_total", + Help: "Count of OnAdd informer events", + }) + jobWatcherOnUpdateEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "onupdate_events_total", + Help: "Count of OnUpdate informer events", + }) + jobWatcherOnDeleteEventCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "ondelete_events_total", + Help: "Count of OnDelete informer events", + }) + + jobWatcherFinishedWithoutPodCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "jobs_finished_without_pod_total", + Help: "Count of jobs that entered a terminal state (Failed or Succeeded) without a pod", + }) + jobWatcherStalledWithoutPodCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "jobs_stalled_without_pod_total", + Help: "Count of jobs that ran for too long without a pod", + }) + + jobWatcherBuildkiteJobFailsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "jobs_failed_on_buildkite_total", + Help: "Count of jobs that jobWatcher successfully acquired and failed on Buildkite", + }) + jobWatcherBuildkiteJobFailErrorsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "job_fail_on_buildkite_errors_total", + Help: "Count of errors when jobWatcher tried to acquire and fail a job on Buildkite", + }) + jobWatcherJobCleanupsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "cleanups_total", + Help: "Count of stalled jobs successfully cleaned up", + }) + jobWatcherJobCleanupErrorsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: "job_watcher", + Name: "cleanup_errors_total", + Help: "Count of errors during attempts to clean up a stalled job", + }, []string{"reason"}) +) + // Pod watcher metrics var ( @@ -151,12 +221,12 @@ var ( Namespace: promNamespace, Subsystem: "completion_watcher", Name: "cleanups_total", - Help: "Count of jobs successfully cleaned up", + Help: "Count of jobs with finished agents successfully cleaned up", }) completionWatcherJobCleanupErrorsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: promNamespace, Subsystem: "completion_watcher", Name: "cleanup_errors_total", - Help: "Count of errors during attempts to clean up a job", + Help: "Count of errors during attempts to clean up a job with a finished agent", }, []string{"reason"}) ) diff --git a/internal/integration/fixtures/missing-service-account.yaml b/internal/integration/fixtures/missing-service-account.yaml new file mode 100644 index 00000000..cc2a128f --- /dev/null +++ b/internal/integration/fixtures/missing-service-account.yaml @@ -0,0 +1,12 @@ +steps: + - label: ":x:" + agents: + queue: "{{.queue}}" + plugins: + - kubernetes: + podSpec: + serviceAccount: does-not-exist + containers: + - image: buildkite/agent:latest + command: + - "true" diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index f8e7c1ae..7427772a 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -391,6 +391,21 @@ func TestInvalidPodJSON(t *testing.T) { ) } +func TestMissingServiceAccount(t *testing.T) { + tc := testcase{ + T: t, + Fixture: "missing-service-account.yaml", + Repo: repoHTTP, + GraphQL: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint), + }.Init() + ctx := context.Background() + pipelineID := tc.PrepareQueueAndPipelineWithCleanup(ctx) + tc.StartController(ctx, cfg) + build := tc.TriggerBuild(ctx, pipelineID) + tc.AssertFail(ctx, build) + tc.AssertLogsContain(build, "error looking up service account") +} + func TestEnvVariables(t *testing.T) { tc := testcase{ T: t,