Skip to content

Commit 2313e2b

Browse files
authored
Merge pull request kubernetes#113176 from alculquicondor/finalizer_metric
Add metric for terminated pods with tracking finalizer
2 parents b9973d2 + 12d308f commit 2313e2b

File tree

5 files changed

+197
-19
lines changed

5 files changed

+197
-19
lines changed

pkg/controller/job/job_controller.go

+5-9
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta
246246
// When a pod is created, enqueue the controller that manages it and update its expectations.
247247
func (jm *Controller) addPod(obj interface{}) {
248248
pod := obj.(*v1.Pod)
249+
recordFinishedPodWithTrackingFinalizer(nil, pod)
249250
if pod.DeletionTimestamp != nil {
250251
// on a restart of the controller, it's possible a new pod shows up in a state that
251252
// is already pending deletion. Prevent the pod from being a creation observation.
@@ -288,6 +289,7 @@ func (jm *Controller) addPod(obj interface{}) {
288289
func (jm *Controller) updatePod(old, cur interface{}) {
289290
curPod := cur.(*v1.Pod)
290291
oldPod := old.(*v1.Pod)
292+
recordFinishedPodWithTrackingFinalizer(oldPod, curPod)
291293
if curPod.ResourceVersion == oldPod.ResourceVersion {
292294
// Periodic resync will send update events for all known pods.
293295
// Two different versions of the same pod will always have different RVs.
@@ -362,6 +364,9 @@ func (jm *Controller) updatePod(old, cur interface{}) {
362364
// obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item.
363365
func (jm *Controller) deletePod(obj interface{}, final bool) {
364366
pod, ok := obj.(*v1.Pod)
367+
if final {
368+
recordFinishedPodWithTrackingFinalizer(pod, nil)
369+
}
365370

366371
// When a delete is dropped, the relist will notice a pod in the store not
367372
// in the list, leading to the insertion of a tombstone object which contains
@@ -1673,15 +1678,6 @@ func removeTrackingAnnotationPatch(job *batch.Job) []byte {
16731678
return patchBytes
16741679
}
16751680

1676-
func hasJobTrackingFinalizer(pod *v1.Pod) bool {
1677-
for _, fin := range pod.Finalizers {
1678-
if fin == batch.JobTrackingFinalizer {
1679-
return true
1680-
}
1681-
}
1682-
return false
1683-
}
1684-
16851681
type uncountedTerminatedPods struct {
16861682
succeeded sets.String
16871683
failed sets.String

pkg/controller/job/metrics/metrics.go

+18
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ var (
8383
Help: "The number of finished Pods that are fully tracked",
8484
},
8585
[]string{"completion_mode", "result"})
86+
87+
// TerminatedPodsWithTrackingFinalizer records the addition and removal of
88+
// terminated pods that have the finalizer batch.kubernetes.io/job-tracking,
89+
// regardless of whether they are owned by a Job.
90+
TerminatedPodsTrackingFinalizerTotal = metrics.NewCounterVec(
91+
&metrics.CounterOpts{
92+
Subsystem: JobControllerSubsystem,
93+
Name: "terminated_pods_tracking_finalizer_total",
94+
Help: `The number of terminated pods (phase=Failed|Succeeded)
95+
that have the finalizer batch.kubernetes.io/job-tracking
96+
The event label can be "add" or "delete".`,
97+
}, []string{"event"})
8698
)
8799

88100
const (
@@ -109,6 +121,11 @@ const (
109121

110122
Succeeded = "succeeded"
111123
Failed = "failed"
124+
125+
// Possible values for "event" label in the terminated_pods_tracking_finalizer
126+
// metric.
127+
Add = "add"
128+
Delete = "delete"
112129
)
113130

114131
var registerMetrics sync.Once
@@ -120,5 +137,6 @@ func Register() {
120137
legacyregistry.MustRegister(JobSyncNum)
121138
legacyregistry.MustRegister(JobFinishedNum)
122139
legacyregistry.MustRegister(JobPodsFinished)
140+
legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal)
123141
})
124142
}

pkg/controller/job/tracking_utils.go

+32
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ import (
2020
"fmt"
2121
"sync"
2222

23+
batch "k8s.io/api/batch/v1"
24+
v1 "k8s.io/api/core/v1"
2325
"k8s.io/apimachinery/pkg/util/sets"
2426
"k8s.io/client-go/tools/cache"
2527
"k8s.io/klog/v2"
28+
"k8s.io/kubernetes/pkg/controller/job/metrics"
2629
)
2730

2831
// uidSetKeyFunc to parse out the key from a uidSet.
@@ -118,3 +121,32 @@ func (u *uidTrackingExpectations) deleteExpectations(jobKey string) {
118121
func newUIDTrackingExpectations() *uidTrackingExpectations {
119122
return &uidTrackingExpectations{store: cache.NewStore(uidSetKeyFunc)}
120123
}
124+
125+
func hasJobTrackingFinalizer(pod *v1.Pod) bool {
126+
for _, fin := range pod.Finalizers {
127+
if fin == batch.JobTrackingFinalizer {
128+
return true
129+
}
130+
}
131+
return false
132+
}
133+
134+
func recordFinishedPodWithTrackingFinalizer(oldPod, newPod *v1.Pod) {
135+
was := isFinishedPodWithTrackingFinalizer(oldPod)
136+
is := isFinishedPodWithTrackingFinalizer(newPod)
137+
if was == is {
138+
return
139+
}
140+
var event = metrics.Delete
141+
if is {
142+
event = metrics.Add
143+
}
144+
metrics.TerminatedPodsTrackingFinalizerTotal.WithLabelValues(event).Inc()
145+
}
146+
147+
func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool {
148+
if pod == nil {
149+
return false
150+
}
151+
return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod)
152+
}

pkg/controller/job/tracking_utils_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@ limitations under the License.
1717
package job
1818

1919
import (
20+
"fmt"
2021
"sync"
2122
"testing"
2223

2324
"github.com/google/go-cmp/cmp"
25+
batch "k8s.io/api/batch/v1"
26+
v1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/component-base/metrics/testutil"
29+
"k8s.io/kubernetes/pkg/controller/job/metrics"
2430
)
2531

2632
func TestUIDTrackingExpectations(t *testing.T) {
@@ -116,3 +122,108 @@ func TestUIDTrackingExpectations(t *testing.T) {
116122
}
117123
}
118124
}
125+
126+
func TestRecordFinishedPodWithTrackingFinalizer(t *testing.T) {
127+
metrics.Register()
128+
cases := map[string]struct {
129+
oldPod *v1.Pod
130+
newPod *v1.Pod
131+
wantAdd int
132+
wantDelete int
133+
}{
134+
"new non-finished Pod with finalizer": {
135+
newPod: &v1.Pod{
136+
ObjectMeta: metav1.ObjectMeta{
137+
Finalizers: []string{batch.JobTrackingFinalizer},
138+
},
139+
Status: v1.PodStatus{
140+
Phase: v1.PodPending,
141+
},
142+
},
143+
},
144+
"pod with finalizer fails": {
145+
oldPod: &v1.Pod{
146+
ObjectMeta: metav1.ObjectMeta{
147+
Finalizers: []string{batch.JobTrackingFinalizer},
148+
},
149+
Status: v1.PodStatus{
150+
Phase: v1.PodRunning,
151+
},
152+
},
153+
newPod: &v1.Pod{
154+
ObjectMeta: metav1.ObjectMeta{
155+
Finalizers: []string{batch.JobTrackingFinalizer},
156+
},
157+
Status: v1.PodStatus{
158+
Phase: v1.PodFailed,
159+
},
160+
},
161+
wantAdd: 1,
162+
},
163+
"pod with finalizer succeeds": {
164+
oldPod: &v1.Pod{
165+
ObjectMeta: metav1.ObjectMeta{
166+
Finalizers: []string{batch.JobTrackingFinalizer},
167+
},
168+
Status: v1.PodStatus{
169+
Phase: v1.PodRunning,
170+
},
171+
},
172+
newPod: &v1.Pod{
173+
ObjectMeta: metav1.ObjectMeta{
174+
Finalizers: []string{batch.JobTrackingFinalizer},
175+
},
176+
Status: v1.PodStatus{
177+
Phase: v1.PodSucceeded,
178+
},
179+
},
180+
wantAdd: 1,
181+
},
182+
"succeeded pod loses finalizer": {
183+
oldPod: &v1.Pod{
184+
ObjectMeta: metav1.ObjectMeta{
185+
Finalizers: []string{batch.JobTrackingFinalizer},
186+
},
187+
Status: v1.PodStatus{
188+
Phase: v1.PodSucceeded,
189+
},
190+
},
191+
newPod: &v1.Pod{
192+
Status: v1.PodStatus{
193+
Phase: v1.PodSucceeded,
194+
},
195+
},
196+
wantDelete: 1,
197+
},
198+
"pod without finalizer removed": {
199+
oldPod: &v1.Pod{
200+
Status: v1.PodStatus{
201+
Phase: v1.PodSucceeded,
202+
},
203+
},
204+
},
205+
}
206+
for name, tc := range cases {
207+
t.Run(name, func(t *testing.T) {
208+
metrics.TerminatedPodsTrackingFinalizerTotal.Reset()
209+
recordFinishedPodWithTrackingFinalizer(tc.oldPod, tc.newPod)
210+
if err := validateTerminatedPodsTrackingFinalizerTotal(metrics.Add, tc.wantAdd); err != nil {
211+
t.Errorf("Failed validating terminated_pods_tracking_finalizer_total(add): %v", err)
212+
}
213+
if err := validateTerminatedPodsTrackingFinalizerTotal(metrics.Delete, tc.wantDelete); err != nil {
214+
t.Errorf("Failed validating terminated_pods_tracking_finalizer_total(delete): %v", err)
215+
}
216+
})
217+
}
218+
}
219+
220+
func validateTerminatedPodsTrackingFinalizerTotal(event string, want int) error {
221+
got, err := testutil.GetCounterMetricValue(metrics.TerminatedPodsTrackingFinalizerTotal.WithLabelValues(event))
222+
if err != nil {
223+
return err
224+
}
225+
if int(got) != want {
226+
return fmt.Errorf("got value %d, want %d", int(got), want)
227+
}
228+
return nil
229+
}

test/integration/job/job_test.go

+31-10
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,7 @@ func TestMetrics(t *testing.T) {
7878
closeFn, restConfig, clientSet, ns := setup(t, "simple")
7979
defer closeFn()
8080
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
81-
defer func() {
82-
cancel()
83-
}()
81+
defer cancel()
8482

8583
testCases := map[string]struct {
8684
job *batchv1.Job
@@ -144,13 +142,14 @@ func TestMetrics(t *testing.T) {
144142
validateJobSucceeded(ctx, t, clientSet, jobObj)
145143

146144
// verify metric values after the job is finished
147-
validateMetricValue(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
148-
validateMetricValue(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric)
145+
validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
146+
validateCounterMetric(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric)
147+
validateTerminatedPodsTrackingFinalizerMetric(t, int(*jobObj.Spec.Parallelism))
149148
})
150149
}
151150
}
152151

153-
func validateMetricValue(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
152+
func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
154153
t.Helper()
155154
var cmpErr error
156155
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
@@ -166,13 +165,24 @@ func validateMetricValue(t *testing.T, counterVec *basemetrics.CounterVec, wantM
166165
return true, nil
167166
})
168167
if err != nil {
169-
t.Errorf("Failed waiting for expected metric delta: %q", err)
168+
t.Errorf("Failed waiting for expected metric: %q", err)
170169
}
171170
if cmpErr != nil {
172171
t.Error(cmpErr)
173172
}
174173
}
175174

175+
func validateTerminatedPodsTrackingFinalizerMetric(t *testing.T, want int) {
176+
validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
177+
Value: want,
178+
Labels: []string{metrics.Add},
179+
})
180+
validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
181+
Value: want,
182+
Labels: []string{metrics.Delete},
183+
})
184+
}
185+
176186
// TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed
177187
// in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on
178188
// the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is
@@ -238,6 +248,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
238248
defer func() {
239249
cancel()
240250
}()
251+
resetMetrics()
241252
restConfig.QPS = 200
242253
restConfig.Burst = 200
243254

@@ -556,6 +567,7 @@ func TestParallelJob(t *testing.T) {
556567
defer closeFn()
557568
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
558569
defer cancel()
570+
resetMetrics()
559571

560572
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
561573
Spec: batchv1.JobSpec{
@@ -631,6 +643,9 @@ func TestParallelJob(t *testing.T) {
631643
}
632644
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
633645
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
646+
if tc.trackWithFinalizers {
647+
validateTerminatedPodsTrackingFinalizerMetric(t, 7)
648+
}
634649
})
635650
}
636651
}
@@ -803,9 +818,8 @@ func TestIndexedJob(t *testing.T) {
803818
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
804819
defer closeFn()
805820
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
806-
defer func() {
807-
cancel()
808-
}()
821+
defer cancel()
822+
resetMetrics()
809823

810824
mode := batchv1.IndexedCompletion
811825
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@@ -863,6 +877,9 @@ func TestIndexedJob(t *testing.T) {
863877
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
864878
validateJobSucceeded(ctx, t, clientSet, jobObj)
865879
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
880+
if wFinalizers {
881+
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
882+
}
866883
})
867884
}
868885
}
@@ -957,6 +974,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
957974
restConfig.QPS = 1
958975
restConfig.Burst = 1
959976
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
977+
resetMetrics()
960978
defer cancel()
961979
restConfig.QPS = 200
962980
restConfig.Burst = 200
@@ -989,6 +1007,8 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
9891007
t.Fatalf("Failed to delete job: %v", err)
9901008
}
9911009
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
1010+
// Pods never finished, so they are not counted in the metric.
1011+
validateTerminatedPodsTrackingFinalizerMetric(t, 0)
9921012
})
9931013
}
9941014
}
@@ -1676,6 +1696,7 @@ func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.
16761696
}
16771697

16781698
func resetMetrics() {
1699+
metrics.TerminatedPodsTrackingFinalizerTotal.Reset()
16791700
metrics.JobFinishedNum.Reset()
16801701
metrics.JobPodsFinished.Reset()
16811702
}

0 commit comments

Comments
 (0)