From c16c7ab3649fecac89fe109fc3fccd1efbba7ec1 Mon Sep 17 00:00:00 2001 From: Abhinav Dhulipala Date: Wed, 23 Oct 2024 17:51:47 -0700 Subject: [PATCH] total pending reason metric --- .gitignore | 1 + exporter/fixtures/squeue_fallback.txt | 8 +- exporter/fixtures/squeue_out.json | 289 +++++++++++++++++++++++++- exporter/jobs.go | 68 ++++-- exporter/jobs_test.go | 34 ++- exporter/main_test.go | 2 +- exporter/server.go | 2 +- 7 files changed, 382 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index 70942a3..9decc64 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ coverage.html coverage.out .DS_Store dist +**/__debug_bin* diff --git a/exporter/fixtures/squeue_fallback.txt b/exporter/fixtures/squeue_fallback.txt index fcca1a1..437e462 100644 --- a/exporter/fixtures/squeue_fallback.txt +++ b/exporter/fixtures/squeue_fallback.txt @@ -1,7 +1,7 @@ -{"a": "account1", "id": 26515966, "end_time": "2023-09-21T00:21:42", "state": "RUNNING", "p": "hw-h", "cpu": 1, "mem": "128G", "array_id": "N/A"} -{"a": "account1", "id": 50580016, "end_time": "2023-09-21T14:31:11", "state": "RUNNING", "p": "hw-l", "cpu": 1, "mem": "62.50G", "array_id": "N/A"} -{"a": "account1", "id": 51447051, "end_time": "N/A", "state": "PENDING", "p": "hw-h", "cpu": 1, "mem": "40000M", "array_id": "N/A"} -{"a": "account1", "id": 18804, "end_time": "NONE", "state": "PENDING", "p": "magma", "cpu": 24, "mem": "118G", "array_id": "N/A"} +{"a": "account1", "id": 26515966, "end_time": "2023-09-21T00:21:42", "state": "RUNNING", "p": "hw-h", "cpu": 1, "mem": "128G", "array_id": "N/A", "r": "cs10"} +{"a": "account1", "id": 50580016, "end_time": "2023-09-21T14:31:11", "state": "RUNNING", "p": "hw-l", "cpu": 1, "mem": "62.50G", "array_id": "N/A", "r": "cs10"} +{"a": "account1", "id": 51447051, "end_time": "N/A", "state": "PENDING", "p": "hw-h", "cpu": 1, "mem": "40000M", "array_id": "N/A", "r": "(Dependency)"} +{"a": "account1", "id": 18804, "end_time": "NONE", "state": "PENDING", "p": "magma", "cpu": 24, "mem": "118G", "array_id": "N/A", "r": "(Priority)"} # test counter inc with faulty inputs {"a": "account1", "id": 18805, "end_time": "NONE", "state": "PENDING", "p": "magma", "cpu": xx, "mem": "118G", "array_id": "N/A"} {"a": "account1", "id": 18806, "end_time": "NONE", "state": "PENDING", "p": "magma", "cpu": xx, "mem": "118G", "array_id": "N/A"} diff --git a/exporter/fixtures/squeue_out.json b/exporter/fixtures/squeue_out.json index ae7f384..547249a 100644 --- a/exporter/fixtures/squeue_out.json +++ b/exporter/fixtures/squeue_out.json @@ -149,6 +149,293 @@ "user_name": "bkd", "wckey": "", "current_working_directory": "/somedir/on/nfs" - } + }, + { + "account": "account1", + "accrue_time": 0, + "admin_comment": "", + "allocating_node": "bkd", + "array_job_id": { + "set": true, + "infinite": false, + "number": 58948420 + }, + "array_task_id": { + "set": false, + "infinite": false, + "number": 0 + }, + "array_max_tasks": { + "set": true, + "infinite": false, + "number": 0 + }, + "array_task_string": "1-10", + "association_id": 4001, + "batch_features": "", + "batch_flag": true, + "batch_host": "", + "flags": [ + "EXACT_TASK_COUNT_REQUESTED", + "EXACT_CPU_COUNT_REQUESTED", + "EXACT_MEMORY_REQUESTED", + "USING_DEFAULT_QOS", + "USING_DEFAULT_WCKEY", + "DEPENDENT" + ], + "burst_buffer": "", + "burst_buffer_state": "", + "cluster": "rivos", + "cluster_features": "", + "command": "", + "comment": "", + "container": "", + "container_id": "", + "contiguous": false, + "core_spec": 0, + "thread_spec": 32766, + "cores_per_socket": { + "set": false, + "infinite": false, + "number": 0 + }, + "billable_tres": { + "set": false, + "infinite": false, + "number": 0.0 + }, + "cpus_per_task": { + "set": true, + "infinite": false, + "number": 1 + }, + "cpu_frequency_minimum": { + "set": false, + "infinite": false, + "number": 0 + }, + "cpu_frequency_maximum": { + "set": false, + "infinite": false, + "number": 0 + }, + "cpu_frequency_governor": { + "set": false, + "infinite": false, + "number": 0 + }, + "cpus_per_tres": "", + "cron": "", + "deadline": 1729802248, + "delay_boot": { + "set": true, + "infinite": false, + "number": 0 + }, + "dependency": "afterok:58948419(unfulfilled)", + "derived_exit_code": { + "set": true, + "infinite": false, + "number": 0 + }, + "eligible_time": 0, + "end_time": 0, + "excluded_nodes": "", + "exit_code": { + "set": true, + "infinite": false, + "number": 0 + }, + "extra": "", + "failed_node": "", + "features": "", + "federation_origin": "", + "federation_siblings_active": "", + "federation_siblings_viable": "", + "gres_detail": [ + ], + "group_id": 1977700000, + "group_name": "rvs", + "het_job_id": { + "set": true, + "infinite": false, + "number": 0 + }, + "het_job_id_set": "", + "het_job_offset": { + "set": true, + "infinite": false, + "number": 0 + }, + "job_id": 58948420, + "job_resources": { + }, + "job_size_str": [ + ], + "job_state": "PENDING", + "last_sched_evaluation": 1729715848, + "licenses": "rtl_single_core@r,", + "mail_type": [ + ], + "mail_user": "bkd", + "max_cpus": { + "set": true, + "infinite": false, + "number": 0 + }, + "max_nodes": { + "set": true, + "infinite": false, + "number": 0 + }, + "mcs_label": "", + "memory_per_tres": "", + "name": "some job name", + "network": "", + "nodes": "", + "nice": 0, + "tasks_per_core": { + "set": false, + "infinite": true, + "number": 0 + }, + "tasks_per_tres": { + "set": true, + "infinite": false, + "number": 0 + }, + "tasks_per_node": { + "set": true, + "infinite": false, + "number": 0 + }, + "tasks_per_socket": { + "set": false, + "infinite": true, + "number": 0 + }, + "tasks_per_board": { + "set": true, + "infinite": false, + "number": 0 + }, + "cpus": { + "set": true, + "infinite": false, + "number": 1 + }, + "node_count": { + "set": true, + "infinite": false, + "number": 1 + }, + "tasks": { + "set": true, + "infinite": false, + "number": 1 + }, + "partition": "hw-m", + "prefer": "", + "memory_per_cpu": { + "set": false, + "infinite": false, + "number": 0 + }, + "memory_per_node": { + "set": true, + "infinite": false, + "number": 131072 + }, + "minimum_cpus_per_node": { + "set": true, + "infinite": false, + "number": 1 + }, + "minimum_tmp_disk_per_node": { + "set": true, + "infinite": false, + "number": 0 + }, + "power": { + "flags": [ + ] + }, + "preempt_time": 0, + "preemptable_time": 0, + "pre_sus_time": 0, + "hold": false, + "priority": { + "set": true, + "infinite": false, + "number": 1368 + }, + "profile": [ + "NOT_SET" + ], + "qos": "normal", + "reboot": false, + "required_nodes": "", + "minimum_switches": 0, + "requeue": true, + "resize_time": 0, + "restart_cnt": 0, + "resv_name": "", + "scheduled_nodes": "", + "selinux_context": "", + "shared": [ + "oversubscribe" + ], + "exclusive": [ + "false" + ], + "oversubscribe": true, + "show_flags": [ + "DETAIL", + "LOCAL" + ], + "sockets_per_board": 0, + "sockets_per_node": { + "set": false, + "infinite": false, + "number": 0 + }, + "start_time": 0, + "state_description": "", + "state_reason": "Dependency", + "standard_error": "\/path\/to\/some\/dir.\/logs\/slurm-58948420.out", + "standard_input": "\/dev\/null", + "standard_output": "\/path\/to\/some\/dir\/.\/logs\/slurm-58948420.out", + "submit_time": 1729715848, + "suspend_time": 0, + "system_comment": "", + "time_limit": { + "set": true, + "infinite": false, + "number": 1439 + }, + "time_minimum": { + "set": true, + "infinite": false, + "number": 0 + }, + "threads_per_core": { + "set": false, + "infinite": false, + "number": 0 + }, + "tres_bind": "", + "tres_freq": "", + "tres_per_job": "", + "tres_per_node": "", + "tres_per_socket": "", + "tres_per_task": "", + "tres_req_str": "cpu=1,mem=128G,node=1,billing=1", + "tres_alloc_str": "", + "user_id": 1234, + "user_name": "bkd", + "maximum_switch_wait_time": 0, + "wckey": "", + "current_working_directory": "\/path\/to\/some\/dir" + } ] } diff --git a/exporter/jobs.go b/exporter/jobs.go index 4a5d3ba..a31330b 100644 --- a/exporter/jobs.go +++ b/exporter/jobs.go @@ -8,6 +8,7 @@ import ( "bytes" "encoding/json" "fmt" + "regexp" "strings" "time" @@ -32,6 +33,7 @@ type JobMetric struct { UserName string `json:"user_name"` Features string `json:"features"` JobResources JobResource `json:"job_resources"` + StateReason string `json:"state_reason"` } type squeueResponse struct { @@ -109,14 +111,15 @@ func (jcf *JobCliFallbackFetcher) fetch() ([]JobMetric, error) { for i, line := range bytes.Split(squeue, []byte("\n")) { var metric struct { - Account string `json:"a"` - JobId float64 `json:"id"` - EndTime NAbleTime `json:"end_time"` - JobState string `json:"state"` - Partition string `json:"p"` - UserName string `json:"u"` - Cpu int64 `json:"cpu"` - Mem string `json:"mem"` + Account string `json:"a"` + JobId float64 `json:"id"` + EndTime NAbleTime `json:"end_time"` + JobState string `json:"state"` + Partition string `json:"p"` + UserName string `json:"u"` + Cpu int64 `json:"cpu"` + Mem string `json:"mem"` + StateReason string `json:"r"` } if err := json.Unmarshal(line, &metric); err != nil { slog.Error(fmt.Sprintf("squeue fallback parse error: failed on line %d `%s`", i, line)) @@ -129,13 +132,24 @@ func (jcf *JobCliFallbackFetcher) fetch() ([]JobMetric, error) { jcf.errCounter.Inc() continue } + re := regexp.MustCompile(`^\((?P(\w)+)\)$`) + if metric.JobState == "PENDING" { + if matches := re.FindStringSubmatch(metric.StateReason); matches != nil { + metric.StateReason = matches[re.SubexpIndex("reason")] + } else { + slog.Error(fmt.Sprintf("squeue failed to pull pending state reason. Got state reason: %s", metric.StateReason)) + jcf.errCounter.Inc() + } + } + openapiJobMetric := JobMetric{ - Account: metric.Account, - JobId: metric.JobId, - JobState: metric.JobState, - Partition: metric.Partition, - UserName: metric.UserName, - EndTime: float64(metric.EndTime.Unix()), + Account: metric.Account, + JobId: metric.JobId, + JobState: metric.JobState, + Partition: metric.Partition, + UserName: metric.UserName, + EndTime: float64(metric.EndTime.Unix()), + StateReason: metric.StateReason, JobResources: JobResource{ AllocCpus: float64(metric.Cpu), AllocNodes: map[string]*NodeResource{"0": {Mem: mem}}, @@ -255,6 +269,23 @@ func parsePartitionJobMetrics(jobs []JobMetric) map[string]*PartitionJobMetric { return partitionMetric } +type StateReasonMetric struct { + pendingStateCount map[string]float64 +} + +func parseStateReasonMetric(jobs []JobMetric) *StateReasonMetric { + metric := StateReasonMetric{ + pendingStateCount: make(map[string]float64), + } + + for _, job := range jobs { + if job.JobState == "PENDING" { + metric.pendingStateCount[job.StateReason]++ + } + } + return &metric +} + type FeatureJobMetric struct { allocMem float64 allocCpu float64 @@ -298,6 +329,8 @@ type JobsCollector struct { featureJobMemAlloc *prometheus.Desc featureJobCpuAlloc *prometheus.Desc featureJobTotal *prometheus.Desc + // reason metrics + pendingReasonTotal *prometheus.Desc // exporter metrics jobScrapeDuration *prometheus.Desc jobScrapeError prometheus.Counter @@ -326,6 +359,7 @@ func NewJobsController(config *Config) *JobsCollector { featureJobMemAlloc: prometheus.NewDesc("slurm_feature_mem_alloc", "alloc mem consumed per feature", []string{"feature"}, nil), featureJobCpuAlloc: prometheus.NewDesc("slurm_feature_cpu_alloc", "alloc cpu consumed per feature", []string{"feature"}, nil), featureJobTotal: prometheus.NewDesc("slurm_feature_total", "alloc cpu consumed per feature", []string{"feature"}, nil), + pendingReasonTotal: prometheus.NewDesc("slurm_pending_reason_total", "count of the reason jobs are pending", []string{"reason"}, nil), jobScrapeDuration: prometheus.NewDesc("slurm_job_scrape_duration", fmt.Sprintf("how long the cmd %v took (ms)", cliOpts.squeue), nil, nil), jobScrapeError: prometheus.NewCounter(prometheus.CounterOpts{ Name: "slurm_job_scrape_error", @@ -347,6 +381,7 @@ func (jc *JobsCollector) Describe(ch chan<- *prometheus.Desc) { ch <- jc.featureJobMemAlloc ch <- jc.featureJobCpuAlloc ch <- jc.featureJobTotal + ch <- jc.pendingReasonTotal ch <- jc.jobScrapeDuration ch <- jc.jobScrapeError.Desc() } @@ -414,4 +449,9 @@ func (jc *JobsCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(jc.featureJobTotal, prometheus.GaugeValue, metric.total, feature) } } + + stateReasonMetric := parseStateReasonMetric(jobMetrics) + for pendingReason, pendingCount := range stateReasonMetric.pendingStateCount { + ch <- prometheus.MustNewConstMetric(jc.pendingReasonTotal, prometheus.GaugeValue, pendingCount, pendingReason) + } } diff --git a/exporter/jobs_test.go b/exporter/jobs_test.go index 91bf8c9..91b7492 100644 --- a/exporter/jobs_test.go +++ b/exporter/jobs_test.go @@ -95,7 +95,7 @@ func TestUserJobMetric(t *testing.T) { for user, metric := range parseUserJobMetrics(jms) { if user == expectedUser { - assert.Equal(1., metric.totalJobCount) + assert.Equal(2., metric.totalJobCount) assert.Equal(1., metric.allocCpu[state]) assert.Equal(1., metric.stateJobCount[state]) assert.Equal(6.4e+13, metric.allocMemory[state]) @@ -332,3 +332,35 @@ func TestJsonJobFetcherCacheMiss(t *testing.T) { // assert cache hit assert.Equal(2, scraper.CallCount) } + +func TestParseStateReasonMetric_Fallback(t *testing.T) { + assert := assert.New(t) + scraper := &MockScraper{fixture: "fixtures/squeue_fallback.txt"} + cliFallbackFetcher := &JobCliFallbackFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](0), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + jobMetrics, err := cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(jobMetrics) + assert.NoError(err) + m := parseStateReasonMetric(jobMetrics) + assert.NotEmpty(m.pendingStateCount) + assert.Equal(m.pendingStateCount["Dependency"], 1.) + assert.Equal(m.pendingStateCount["Priority"], 1.) +} +func TestParseStateReasonMetric_Json(t *testing.T) { + assert := assert.New(t) + scraper := &MockScraper{fixture: "fixtures/squeue_out.json"} + JsonFetcher := &JobJsonFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](0), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + jobMetrics, err := JsonFetcher.FetchMetrics() + assert.NotEmpty(jobMetrics) + assert.NoError(err) + m := parseStateReasonMetric(jobMetrics) + assert.NotEmpty(m.pendingStateCount) + assert.Equal(m.pendingStateCount["Dependency"], 1.) +} diff --git a/exporter/main_test.go b/exporter/main_test.go index 5262ff5..57af1f6 100644 --- a/exporter/main_test.go +++ b/exporter/main_test.go @@ -74,7 +74,7 @@ func TestNewConfig_NonDefault(t *testing.T) { cliFlags := CliFlags{SlurmCliFallback: true} config, err := NewConfig(&cliFlags) assert.Nil(err) - expected := []string{"squeue", "--states=all", "-h", "-r", "-o", `{"a": "%a", "id": %A, "end_time": "%e", "u": "%u", "state": "%T", "p": "%P", "cpu": %C, "mem": "%m", "array_id": "%K"}`} + expected := []string{"squeue", "--states=all", "-h", "-r", "-o", `{"a": "%a", "id": %A, "end_time": "%e", "u": "%u", "state": "%T", "p": "%P", "cpu": %C, "mem": "%m", "array_id": "%K", "r": "%R"}`} assert.Equal(expected, config.cliOpts.squeue) } diff --git a/exporter/server.go b/exporter/server.go index 680fe12..096264e 100644 --- a/exporter/server.go +++ b/exporter/server.go @@ -141,7 +141,7 @@ func NewConfig(cliFlags *CliFlags) (*Config, error) { if cliOpts.fallback { // we define a custom json format that we convert back into the openapi format if cliFlags.SlurmSqueueOverride == "" { - cliOpts.squeue = []string{"squeue", "--states=all", "-h", "-r", "-o", `{"a": "%a", "id": %A, "end_time": "%e", "u": "%u", "state": "%T", "p": "%P", "cpu": %C, "mem": "%m", "array_id": "%K"}`} + cliOpts.squeue = []string{"squeue", "--states=all", "-h", "-r", "-o", `{"a": "%a", "id": %A, "end_time": "%e", "u": "%u", "state": "%T", "p": "%P", "cpu": %C, "mem": "%m", "array_id": "%K", "r": "%R"}`} } if cliFlags.SlurmSinfoOverride == "" { cliOpts.sinfo = []string{"sinfo", "-h", "-o", `{"s": "%T", "mem": %m, "n": "%n", "l": "%O", "p": "%R", "fmem": "%e", "cstate": "%C", "w": %w}`}