-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
ee90a66
commit 2ba4df1
Showing
11 changed files
with
373 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
package scheduler | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" | ||
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model" | ||
|
||
"github.com/jedib0t/go-pretty/v6/table" | ||
"go.uber.org/zap" | ||
|
||
batchv1 "k8s.io/api/batch/v1" | ||
corev1 "k8s.io/api/core/v1" | ||
kerrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/fields" | ||
"k8s.io/apimachinery/pkg/util/duration" | ||
"k8s.io/client-go/informers" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/utils/ptr" | ||
) | ||
|
||
// jobWatcher watches k8s jobs for failure to start a pod. The corresponding | ||
// Buildkite job is failed with an error message if this happens. Also, if such | ||
// a k8s job doesn't enter a terminal state on its own, jobWatcher sets a | ||
// deadline so that it is cleaned up. | ||
type jobWatcher struct { | ||
// Logs go here | ||
logger *zap.Logger | ||
|
||
k8s kubernetes.Interface | ||
cfg *config.Config | ||
|
||
// Tracks stalling jobs (jobs that have yet to create pods). | ||
stallingJobsMu sync.Mutex | ||
stallingJobs map[*batchv1.Job]struct{} | ||
|
||
// This is the context passed to RegisterInformer. | ||
// It's being stored here (grrrr!) because the k8s ResourceEventHandler | ||
// interface doesn't have context args. (Working around an interface in a | ||
// library outside of our control is a carve-out from the usual rule.) | ||
// The context is needed to ensure goroutines are cleaned up. | ||
resourceEventHandlerCtx context.Context | ||
} | ||
|
||
// NewJobWatcher creates a JobWatcher. | ||
func NewJobWatcher(logger *zap.Logger, k8sClient kubernetes.Interface, cfg *config.Config) *jobWatcher { | ||
return &jobWatcher{ | ||
logger: logger, | ||
k8s: k8sClient, | ||
cfg: cfg, | ||
stallingJobs: make(map[*batchv1.Job]struct{}), | ||
} | ||
} | ||
|
||
// RegisterInformer registers the limiter to listen for Kubernetes job events. | ||
func (w *jobWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { | ||
informer := factory.Batch().V1().Jobs() | ||
jobInformer := informer.Informer() | ||
if _, err := jobInformer.AddEventHandler(w); err != nil { | ||
return err | ||
} | ||
w.resourceEventHandlerCtx = ctx // See field comment | ||
go factory.Start(ctx.Done()) | ||
// No need to wait for cache sync here. These are cleanup tasks, not | ||
// barriers to prevent creating new jobs. | ||
go w.stalledJobChecker(ctx) | ||
return nil | ||
} | ||
|
||
// OnAdd is called by k8s to inform us a resource is added. | ||
func (w *jobWatcher) OnAdd(obj any, _ bool) { | ||
jobWatcherOnAddEventCounter.Inc() | ||
kjob, _ := obj.(*batchv1.Job) | ||
if kjob == nil { | ||
return | ||
} | ||
// Same logic whether we are considering pre-existing jobs, or new jobs. | ||
w.runChecks(w.resourceEventHandlerCtx, kjob) | ||
} | ||
|
||
// OnUpdate is called by k8s to inform us a resource is updated. | ||
func (w *jobWatcher) OnUpdate(_, curr any) { | ||
jobWatcherOnUpdateEventCounter.Inc() | ||
kjob, _ := curr.(*batchv1.Job) | ||
if kjob == nil { | ||
return | ||
} | ||
// Same logic whether or not anything relevant changed about the job. | ||
w.runChecks(w.resourceEventHandlerCtx, kjob) | ||
} | ||
|
||
// OnDelete is called by k8s to inform us a resource is deleted. | ||
func (w *jobWatcher) OnDelete(prev any) { | ||
jobWatcherOnDeleteEventCounter.Inc() | ||
kjob, _ := prev.(*batchv1.Job) | ||
if kjob == nil { | ||
return | ||
} | ||
w.removeFromStalling(kjob) | ||
|
||
// TODO: consider catching jobs that were deleted manually? | ||
} | ||
|
||
func (w *jobWatcher) runChecks(ctx context.Context, kjob *batchv1.Job) { | ||
log := loggerForObject(w.logger, kjob) | ||
|
||
if model.JobFinished(kjob) { | ||
w.removeFromStalling(kjob) | ||
w.checkFinishedWithoutPod(ctx, log, kjob) | ||
} else { | ||
w.checkStalledWithoutPod(log, kjob) | ||
} | ||
} | ||
|
||
func (w *jobWatcher) checkFinishedWithoutPod(ctx context.Context, log *zap.Logger, kjob *batchv1.Job) { | ||
log.Debug("Checking job for finishing without a pod") | ||
|
||
// If the job is finished, there should be one finished pod. | ||
if kjob.Status.Failed+kjob.Status.Succeeded > 0 { | ||
// All's well with the world. | ||
return | ||
} | ||
|
||
// Because no pod has been created, the agent hasn't started. | ||
// We can acquire the Buildkite job and fail it ourselves. | ||
log.Info("The Kuberentes job ended without starting a pod. Failing the corresponding Buildkite job") | ||
|
||
w.fetchEventsAndFailJob(ctx, log, kjob, "The Kubernetes job ended without starting a pod.\n") | ||
} | ||
|
||
func (w *jobWatcher) checkStalledWithoutPod(log *zap.Logger, kjob *batchv1.Job) { | ||
log.Debug("Checking job for stalling without a pod") | ||
|
||
// If the job is not finished and there is no pod, it should start one | ||
// before too long. Otherwise the job is stalled. | ||
pods := kjob.Status.Active + kjob.Status.Failed + kjob.Status.Succeeded | ||
// Ready and Terminating are subsets of Active (I think) | ||
if utp := kjob.Status.UncountedTerminatedPods; utp != nil { | ||
pods += int32(len(utp.Succeeded)) | ||
pods += int32(len(utp.Failed)) | ||
} | ||
if pods > 0 { | ||
// All's well with the world. | ||
w.removeFromStalling(kjob) | ||
return | ||
} | ||
|
||
if kjob.Status.StartTime == nil { | ||
// the _job_ hasn't even started? | ||
return | ||
} | ||
|
||
w.addToStalling(kjob) | ||
} | ||
|
||
func (w *jobWatcher) fetchEventsAndFailJob(ctx context.Context, log *zap.Logger, kjob *batchv1.Job, message string) { | ||
// List the events for the job, which might contain useful info for | ||
// diagnosing the problem. | ||
events := w.k8s.CoreV1().Events(w.cfg.Namespace) | ||
evlist, err := events.List(ctx, metav1.ListOptions{ | ||
FieldSelector: fields.AndSelectors( | ||
fields.OneTermEqualSelector("involvedObject.kind", "Job"), | ||
fields.OneTermEqualSelector("involvedObject.name", kjob.Name), | ||
).String(), | ||
}) | ||
if err != nil { | ||
log.Error("Couldn't get events for job", zap.Error(err)) | ||
message = fmt.Sprintf("%s\nCouldn't get events for job %s: %v", message, kjob.Name, err) | ||
} | ||
if evlist != nil { | ||
message += "\n" + w.formatEvents(evlist) | ||
} | ||
|
||
if err := acquireAndFailForObject(ctx, log, w.k8s, w.cfg, kjob, message); err != nil { | ||
// Maybe the job was cancelled in the meantime? | ||
log.Error("Could not fail Buildkite job", zap.Error(err)) | ||
jobWatcherBuildkiteJobFailErrorsCounter.Inc() | ||
return | ||
} | ||
jobWatcherBuildkiteJobFailsCounter.Inc() | ||
} | ||
|
||
func (w *jobWatcher) formatEvents(evlist *corev1.EventList) string { | ||
if len(evlist.Items) == 0 { | ||
return "Events: none" | ||
} | ||
|
||
tw := table.NewWriter() | ||
tw.SetStyle(table.StyleColoredDark) | ||
tw.AppendHeader(table.Row{"LAST EVENT", "REPEATED", "TYPE", "REASON", "MESSAGE"}) | ||
tw.AppendSeparator() | ||
for _, event := range evlist.Items { | ||
if event.Series == nil { | ||
tw.AppendRow(table.Row{event.EventTime.Time, "-", event.Type, event.Reason, event.Message}) | ||
continue | ||
} | ||
lastTime := event.Series.LastObservedTime.Time | ||
firstToLast := duration.HumanDuration(lastTime.Sub(event.EventTime.Time)) | ||
countMsg := fmt.Sprintf("x%d over %s", event.Series.Count, firstToLast) | ||
tw.AppendRow(table.Row{lastTime, countMsg, event.Type, event.Reason, event.Message}) | ||
} | ||
return tw.Render() | ||
} | ||
|
||
func (w *jobWatcher) addToStalling(kjob *batchv1.Job) { | ||
w.stallingJobsMu.Lock() | ||
defer w.stallingJobsMu.Unlock() | ||
w.stallingJobs[kjob] = struct{}{} | ||
} | ||
|
||
func (w *jobWatcher) removeFromStalling(kjob *batchv1.Job) { | ||
w.stallingJobsMu.Lock() | ||
defer w.stallingJobsMu.Unlock() | ||
delete(w.stallingJobs, kjob) | ||
} | ||
|
||
func (w *jobWatcher) stalledJobChecker(ctx context.Context) { | ||
ticker := time.Tick(time.Second) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
|
||
case <-ticker: | ||
// continue below | ||
} | ||
|
||
// Gather stalled jobs | ||
var stalled []*batchv1.Job | ||
w.stallingJobsMu.Lock() | ||
for kjob := range w.stallingJobs { | ||
if time.Since(kjob.Status.StartTime.Time) >= w.cfg.EmptyJobGracePeriod { | ||
stalled = append(stalled, kjob) | ||
delete(w.stallingJobs, kjob) | ||
} | ||
} | ||
w.stallingJobsMu.Unlock() | ||
|
||
// Fail BK jobs and delete k8s jobs. | ||
jobs := w.k8s.BatchV1().Jobs(w.cfg.Namespace) | ||
for _, kjob := range stalled { | ||
// Fail the BK job. This lists events for the job, so delete it afterwards. | ||
log := loggerForObject(w.logger, kjob) | ||
stallDuration := duration.HumanDuration(time.Since(kjob.Status.StartTime.Time)) | ||
message := fmt.Sprintf("The Kubernetes job spent %s without starting a pod.\n", stallDuration) | ||
w.fetchEventsAndFailJob(ctx, log, kjob, message) | ||
|
||
if err := jobs.Delete(ctx, kjob.Name, metav1.DeleteOptions{ | ||
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), | ||
}); err != nil { | ||
jobWatcherJobCleanupErrorsCounter.WithLabelValues(string(kerrors.ReasonForError(err))).Inc() | ||
w.logger.Error("failed to delete stalled job", zap.Error(err)) | ||
continue | ||
} | ||
jobWatcherJobCleanupsCounter.Inc() | ||
} | ||
} | ||
} |
Oops, something went wrong.