From 3afae44c23d0baad51282343ffecccc79e79d9c4 Mon Sep 17 00:00:00 2001 From: abhinavDhulipala <46908860+abhinavDhulipala@users.noreply.github.com> Date: Thu, 29 Aug 2024 17:57:00 -0700 Subject: [PATCH] [exporter/jobs] fix JobMetric Fetcher cache bug (#99) * fix and cover cli fetcher cache bug * refactor test utils into seperate file * cover json job fetcher --- exporter/jobs.go | 119 ++++++++++++++++---------------- exporter/jobs_test.go | 152 +++++++++++++++++++++++++++++++++-------- exporter/mock_utils.go | 71 +++++++++++++++++++ exporter/utils.go | 31 --------- exporter/utils_test.go | 26 ------- 5 files changed, 255 insertions(+), 144 deletions(-) create mode 100644 exporter/mock_utils.go diff --git a/exporter/jobs.go b/exporter/jobs.go index c671b42..4a5d3ba 100644 --- a/exporter/jobs.go +++ b/exporter/jobs.go @@ -55,13 +55,28 @@ type JobJsonFetcher struct { errCounter prometheus.Counter } -func (jjf *JobJsonFetcher) FetchMetrics() ([]JobMetric, error) { +func (jjf *JobJsonFetcher) fetch() ([]JobMetric, error) { data, err := jjf.scraper.FetchRawBytes() if err != nil { jjf.errCounter.Inc() return nil, err } - return jjf.cache.FetchOrThrottle(func() ([]JobMetric, error) { return parseJobMetrics(data) }) + var squeue squeueResponse + err = json.Unmarshal(data, &squeue) + if err != nil { + slog.Error("Unmarshaling node metrics %q", err) + return nil, err + } + for _, j := range squeue.Jobs { + for _, resource := range j.JobResources.AllocNodes { + resource.Mem *= 1e9 + } + } + return squeue.Jobs, nil +} + +func (jjf *JobJsonFetcher) FetchMetrics() ([]JobMetric, error) { + return jjf.cache.FetchOrThrottle(jjf.fetch) } func (jjf *JobJsonFetcher) ScrapeDuration() time.Duration { @@ -78,65 +93,11 @@ type JobCliFallbackFetcher struct { errCounter prometheus.Counter } -func (jcf *JobCliFallbackFetcher) FetchMetrics() ([]JobMetric, error) { - data, err := jcf.scraper.FetchRawBytes() +func (jcf *JobCliFallbackFetcher) fetch() ([]JobMetric, error) { + squeue, err := jcf.scraper.FetchRawBytes() if err != nil { - jcf.errCounter.Inc() return nil, err } - return jcf.cache.FetchOrThrottle(func() ([]JobMetric, error) { return parseCliFallback(data, jcf.errCounter) }) -} - -func (jcf *JobCliFallbackFetcher) ScrapeDuration() time.Duration { - return jcf.scraper.Duration() -} - -func (jcf *JobCliFallbackFetcher) ScrapeError() prometheus.Counter { - return jcf.errCounter -} - -func totalAllocMem(resource *JobResource) float64 { - var allocMem float64 - for _, node := range resource.AllocNodes { - allocMem += node.Mem - } - return allocMem -} - -func parseJobMetrics(jsonJobList []byte) ([]JobMetric, error) { - var squeue squeueResponse - err := json.Unmarshal(jsonJobList, &squeue) - if err != nil { - slog.Error("Unmarshaling node metrics %q", err) - return nil, err - } - for _, j := range squeue.Jobs { - for _, resource := range j.JobResources.AllocNodes { - resource.Mem *= 1e9 - } - } - return squeue.Jobs, nil -} - -type NAbleTime struct{ time.Time } - -// report beginning of time in the case of N/A -func (nat *NAbleTime) UnmarshalJSON(data []byte) error { - var tString string - if err := json.Unmarshal(data, &tString); err != nil { - return err - } - nullSet := map[string]struct{}{"N/A": {}, "NONE": {}} - if _, ok := nullSet[tString]; ok { - nat.Time = time.Time{} - return nil - } - t, err := time.Parse("2006-01-02T15:04:05", tString) - nat.Time = t - return err -} - -func parseCliFallback(squeue []byte, errorCounter prometheus.Counter) ([]JobMetric, error) { jobMetrics := make([]JobMetric, 0) // clean input squeue = bytes.TrimSpace(squeue) @@ -159,13 +120,13 @@ func parseCliFallback(squeue []byte, errorCounter prometheus.Counter) ([]JobMetr } if err := json.Unmarshal(line, &metric); err != nil { slog.Error(fmt.Sprintf("squeue fallback parse error: failed on line %d `%s`", i, line)) - errorCounter.Inc() + jcf.errCounter.Inc() continue } mem, err := MemToFloat(metric.Mem) if err != nil { slog.Error(fmt.Sprintf("squeue fallback parse error: failed on line %d `%s` with err `%q`", i, line, err)) - errorCounter.Inc() + jcf.errCounter.Inc() continue } openapiJobMetric := JobMetric{ @@ -185,6 +146,44 @@ func parseCliFallback(squeue []byte, errorCounter prometheus.Counter) ([]JobMetr return jobMetrics, nil } +func (jcf *JobCliFallbackFetcher) FetchMetrics() ([]JobMetric, error) { + return jcf.cache.FetchOrThrottle(jcf.fetch) +} + +func (jcf *JobCliFallbackFetcher) ScrapeDuration() time.Duration { + return jcf.scraper.Duration() +} + +func (jcf *JobCliFallbackFetcher) ScrapeError() prometheus.Counter { + return jcf.errCounter +} + +func totalAllocMem(resource *JobResource) float64 { + var allocMem float64 + for _, node := range resource.AllocNodes { + allocMem += node.Mem + } + return allocMem +} + +type NAbleTime struct{ time.Time } + +// report beginning of time in the case of N/A +func (nat *NAbleTime) UnmarshalJSON(data []byte) error { + var tString string + if err := json.Unmarshal(data, &tString); err != nil { + return err + } + nullSet := map[string]struct{}{"N/A": {}, "NONE": {}} + if _, ok := nullSet[tString]; ok { + nat.Time = time.Time{} + return nil + } + t, err := time.Parse("2006-01-02T15:04:05", tString) + nat.Time = t + return err +} + type UserJobMetric struct { stateJobCount map[string]float64 totalJobCount float64 diff --git a/exporter/jobs_test.go b/exporter/jobs_test.go index d77eed7..91bf8c9 100644 --- a/exporter/jobs_test.go +++ b/exporter/jobs_test.go @@ -44,11 +44,14 @@ func TestNewJobsController(t *testing.T) { func TestParseJobMetrics(t *testing.T) { assert := assert.New(t) - fixture, err := MockJobInfoScraper.FetchRawBytes() - assert.Nil(err) - jms, err := parseJobMetrics(fixture) - assert.Nil(err) - assert.NotEmpty(jms) + scraper := &MockScraper{fixture: "fixtures/squeue_out.json"} + fetcher := &JobJsonFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{}), + } + jms, err := fetcher.fetch() + assert.NoError(err) // test parse of single job var job *JobMetric for _, m := range jms { @@ -63,22 +66,27 @@ func TestParseJobMetrics(t *testing.T) { func TestParseCliFallback(t *testing.T) { assert := assert.New(t) - fetcher := MockScraper{fixture: "fixtures/squeue_fallback.txt"} - data, err := fetcher.FetchRawBytes() - assert.Nil(err) - counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}) - metrics, err := parseCliFallback(data, counter) + cliFallbackFetcher := &JobCliFallbackFetcher{ + scraper: &MockScraper{fixture: "fixtures/squeue_fallback.txt"}, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + metrics, err := cliFallbackFetcher.fetch() assert.Nil(err) assert.NotEmpty(metrics) - assert.Equal(2., CollectCounterValue(counter)) + assert.Equal(2., CollectCounterValue(cliFallbackFetcher.errCounter)) } func TestUserJobMetric(t *testing.T) { // setup assert := assert.New(t) - fixture, err := MockJobInfoScraper.FetchRawBytes() - assert.Nil(err) - jms, err := parseJobMetrics(fixture) + scraper := &MockScraper{fixture: "fixtures/squeue_out.json"} + fetcher := &JobJsonFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{}), + } + jms, err := fetcher.fetch() assert.Nil(err) //test @@ -158,9 +166,13 @@ func TestJobCollect_Fallback(t *testing.T) { func TestParsePartitionJobMetrics(t *testing.T) { assert := assert.New(t) - fixture, err := MockJobInfoScraper.FetchRawBytes() - assert.Nil(err) - jms, err := parseJobMetrics(fixture) + scraper := &MockScraper{fixture: "fixtures/squeue_out.json"} + fetcher := &JobJsonFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{}), + } + jms, err := fetcher.fetch() assert.Nil(err) partitionJobMetrics := parsePartitionJobMetrics(jms) @@ -169,9 +181,13 @@ func TestParsePartitionJobMetrics(t *testing.T) { func TestParsePartMetrics(t *testing.T) { assert := assert.New(t) - fixture, err := MockJobInfoScraper.FetchRawBytes() - assert.Nil(err) - jms, err := parseJobMetrics(fixture) + scraper := &MockScraper{fixture: "fixtures/squeue_out.json"} + fetcher := &JobJsonFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{}), + } + jms, err := fetcher.fetch() assert.Nil(err) featureMetrics := parseFeatureMetric(jms) @@ -222,15 +238,97 @@ func TestNAbleTimeJson_NA(t *testing.T) { func TestParseCliFallbackEmpty(t *testing.T) { assert := assert.New(t) - counter := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "validation_counter", - }) - metrics, err := parseCliFallback([]byte(""), counter) + scraper := &StringByteScraper{msg: ""} + cliFallbackFetcher := &JobCliFallbackFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + metrics, err := cliFallbackFetcher.fetch() assert.NoError(err) assert.Empty(metrics) - assert.Zero(CollectCounterValue(counter)) - metrics, err = parseCliFallback([]byte("\n "), counter) + assert.Zero(CollectCounterValue(cliFallbackFetcher.errCounter)) + assert.Equal(1, scraper.Callcount) + scraper.msg = "\n" + metrics, err = cliFallbackFetcher.fetch() assert.NoError(err) assert.Empty(metrics) - assert.Zero(CollectCounterValue(counter)) + assert.Zero(CollectCounterValue(cliFallbackFetcher.errCounter)) + assert.Equal(2, scraper.Callcount) +} + +func TestCliJobFetcherCacheHit(t *testing.T) { + assert := assert.New(t) + scraper := &MockScraper{fixture: "fixtures/squeue_fallback.txt"} + cliFallbackFetcher := &JobCliFallbackFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + metrics, err := cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + assert.Equal(1, scraper.CallCount) + metrics, err = cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + // assert cache hit + assert.Equal(1, scraper.CallCount) +} + +func TestCliJobFetcherCacheMiss(t *testing.T) { + assert := assert.New(t) + scraper := &MockScraper{fixture: "fixtures/squeue_fallback.txt"} + cliFallbackFetcher := &JobCliFallbackFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](0), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + metrics, err := cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + assert.Equal(1, scraper.CallCount) + metrics, err = cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + // assert cache hit + assert.Equal(2, scraper.CallCount) +} + +func TestJsonJobFetcherCacheHit(t *testing.T) { + assert := assert.New(t) + scraper := &MockScraper{fixture: "fixtures/squeue_out.json"} + cliFallbackFetcher := &JobJsonFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](100), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + metrics, err := cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + assert.Equal(1, scraper.CallCount) + metrics, err = cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + // assert cache hit + assert.Equal(1, scraper.CallCount) +} + +func TestJsonJobFetcherCacheMiss(t *testing.T) { + assert := assert.New(t) + scraper := &MockScraper{fixture: "fixtures/squeue_out.json"} + cliFallbackFetcher := &JobJsonFetcher{ + scraper: scraper, + cache: NewAtomicThrottledCache[JobMetric](0), + errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}), + } + metrics, err := cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + assert.Equal(1, scraper.CallCount) + metrics, err = cliFallbackFetcher.FetchMetrics() + assert.NotEmpty(metrics) + assert.NoError(err) + // assert cache hit + assert.Equal(2, scraper.CallCount) } diff --git a/exporter/mock_utils.go b/exporter/mock_utils.go new file mode 100644 index 0000000..3d2a1d6 --- /dev/null +++ b/exporter/mock_utils.go @@ -0,0 +1,71 @@ +// SPDX-FileCopyrightText: 2023 Rivos Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +package exporter + +import ( + "bytes" + "errors" + "os" + "time" +) + +type MockFetchErrored struct{} + +func (f *MockFetchErrored) FetchRawBytes() ([]byte, error) { + return nil, errors.New("mock fetch error") +} + +func (f *MockFetchErrored) Duration() time.Duration { + return 1 +} + +// implements SlurmByteScraper by pulling fixtures instead +// used exclusively for testing +type MockScraper struct { + fixture string + duration time.Duration + CallCount int +} + +func (f *MockScraper) FetchRawBytes() ([]byte, error) { + defer func(t time.Time) { + f.duration = time.Since(t) + }(time.Now()) + f.CallCount++ + file, err := os.ReadFile(f.fixture) + if err != nil { + return nil, err + } + // allow commenting in text files + sep := []byte("\n") + lines := bytes.Split(file, sep) + filtered := make([][]byte, 0) + for _, line := range lines { + if !bytes.HasPrefix(line, []byte("#")) { + filtered = append(filtered, line) + } + } + return bytes.Join(filtered, sep), nil +} + +func (f *MockScraper) Duration() time.Duration { + return f.duration +} + +// implements SlurmByteScraper by emmiting string payload instead +// used exclusively for testing +type StringByteScraper struct { + msg string + Callcount int +} + +func (es *StringByteScraper) FetchRawBytes() ([]byte, error) { + es.Callcount++ + return []byte(es.msg), nil +} + +func (es *StringByteScraper) Duration() time.Duration { + return time.Duration(1) +} diff --git a/exporter/utils.go b/exporter/utils.go index 5e18502..ffe0f04 100644 --- a/exporter/utils.go +++ b/exporter/utils.go @@ -133,37 +133,6 @@ func NewCliScraper(args ...string) *CliScraper { } } -// implements SlurmByteScraper by pulling fixtures instead -// used exclusively for testing -type MockScraper struct { - fixture string - duration time.Duration -} - -func (f *MockScraper) FetchRawBytes() ([]byte, error) { - defer func(t time.Time) { - f.duration = time.Since(t) - }(time.Now()) - file, err := os.ReadFile(f.fixture) - if err != nil { - return nil, err - } - // allow commenting in text files - sep := []byte("\n") - lines := bytes.Split(file, sep) - filtered := make([][]byte, 0) - for _, line := range lines { - if !bytes.HasPrefix(line, []byte("#")) { - filtered = append(filtered, line) - } - } - return bytes.Join(filtered, sep), nil -} - -func (f *MockScraper) Duration() time.Duration { - return f.duration -} - // convert slurm mem string to float64 bytes func MemToFloat(mem string) (float64, error) { if num, err := strconv.ParseFloat(mem, 64); err == nil { diff --git a/exporter/utils_test.go b/exporter/utils_test.go index b98c12d..012c56e 100644 --- a/exporter/utils_test.go +++ b/exporter/utils_test.go @@ -5,7 +5,6 @@ package exporter import ( - "errors" "fmt" "math" "math/rand" @@ -34,31 +33,6 @@ func generateRandString(n int) string { return string(randBytes) } -// used to ensure the fetch function was called -type MockFetchTriggered struct { - msg []byte - called bool -} - -func (f *MockFetchTriggered) Fetch() ([]byte, error) { - f.called = true - return f.msg, nil -} - -func (f *MockFetchTriggered) Duration() time.Duration { - return 1 -} - -type MockFetchErrored struct{} - -func (f *MockFetchErrored) FetchRawBytes() ([]byte, error) { - return nil, errors.New("mock fetch error") -} - -func (f *MockFetchErrored) Duration() time.Duration { - return 1 -} - func TestCliFetcher(t *testing.T) { assert := assert.New(t) cliFetcher := NewCliScraper("ls")