Skip to content

Commit

Permalink
[exporter/jobs] fix JobMetric Fetcher cache bug (#99)
Browse files Browse the repository at this point in the history
* fix and cover cli fetcher cache bug

* refactor test utils into seperate file

* cover json job fetcher
  • Loading branch information
abhinavDhulipala authored Aug 30, 2024
1 parent 6e7dada commit 3afae44
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 144 deletions.
119 changes: 59 additions & 60 deletions exporter/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,28 @@ type JobJsonFetcher struct {
errCounter prometheus.Counter
}

func (jjf *JobJsonFetcher) FetchMetrics() ([]JobMetric, error) {
func (jjf *JobJsonFetcher) fetch() ([]JobMetric, error) {
data, err := jjf.scraper.FetchRawBytes()
if err != nil {
jjf.errCounter.Inc()
return nil, err
}
return jjf.cache.FetchOrThrottle(func() ([]JobMetric, error) { return parseJobMetrics(data) })
var squeue squeueResponse
err = json.Unmarshal(data, &squeue)
if err != nil {
slog.Error("Unmarshaling node metrics %q", err)
return nil, err
}
for _, j := range squeue.Jobs {
for _, resource := range j.JobResources.AllocNodes {
resource.Mem *= 1e9
}
}
return squeue.Jobs, nil
}

func (jjf *JobJsonFetcher) FetchMetrics() ([]JobMetric, error) {
return jjf.cache.FetchOrThrottle(jjf.fetch)
}

func (jjf *JobJsonFetcher) ScrapeDuration() time.Duration {
Expand All @@ -78,65 +93,11 @@ type JobCliFallbackFetcher struct {
errCounter prometheus.Counter
}

func (jcf *JobCliFallbackFetcher) FetchMetrics() ([]JobMetric, error) {
data, err := jcf.scraper.FetchRawBytes()
func (jcf *JobCliFallbackFetcher) fetch() ([]JobMetric, error) {
squeue, err := jcf.scraper.FetchRawBytes()
if err != nil {
jcf.errCounter.Inc()
return nil, err
}
return jcf.cache.FetchOrThrottle(func() ([]JobMetric, error) { return parseCliFallback(data, jcf.errCounter) })
}

func (jcf *JobCliFallbackFetcher) ScrapeDuration() time.Duration {
return jcf.scraper.Duration()
}

func (jcf *JobCliFallbackFetcher) ScrapeError() prometheus.Counter {
return jcf.errCounter
}

func totalAllocMem(resource *JobResource) float64 {
var allocMem float64
for _, node := range resource.AllocNodes {
allocMem += node.Mem
}
return allocMem
}

func parseJobMetrics(jsonJobList []byte) ([]JobMetric, error) {
var squeue squeueResponse
err := json.Unmarshal(jsonJobList, &squeue)
if err != nil {
slog.Error("Unmarshaling node metrics %q", err)
return nil, err
}
for _, j := range squeue.Jobs {
for _, resource := range j.JobResources.AllocNodes {
resource.Mem *= 1e9
}
}
return squeue.Jobs, nil
}

type NAbleTime struct{ time.Time }

// report beginning of time in the case of N/A
func (nat *NAbleTime) UnmarshalJSON(data []byte) error {
var tString string
if err := json.Unmarshal(data, &tString); err != nil {
return err
}
nullSet := map[string]struct{}{"N/A": {}, "NONE": {}}
if _, ok := nullSet[tString]; ok {
nat.Time = time.Time{}
return nil
}
t, err := time.Parse("2006-01-02T15:04:05", tString)
nat.Time = t
return err
}

func parseCliFallback(squeue []byte, errorCounter prometheus.Counter) ([]JobMetric, error) {
jobMetrics := make([]JobMetric, 0)
// clean input
squeue = bytes.TrimSpace(squeue)
Expand All @@ -159,13 +120,13 @@ func parseCliFallback(squeue []byte, errorCounter prometheus.Counter) ([]JobMetr
}
if err := json.Unmarshal(line, &metric); err != nil {
slog.Error(fmt.Sprintf("squeue fallback parse error: failed on line %d `%s`", i, line))
errorCounter.Inc()
jcf.errCounter.Inc()
continue
}
mem, err := MemToFloat(metric.Mem)
if err != nil {
slog.Error(fmt.Sprintf("squeue fallback parse error: failed on line %d `%s` with err `%q`", i, line, err))
errorCounter.Inc()
jcf.errCounter.Inc()
continue
}
openapiJobMetric := JobMetric{
Expand All @@ -185,6 +146,44 @@ func parseCliFallback(squeue []byte, errorCounter prometheus.Counter) ([]JobMetr
return jobMetrics, nil
}

func (jcf *JobCliFallbackFetcher) FetchMetrics() ([]JobMetric, error) {
return jcf.cache.FetchOrThrottle(jcf.fetch)
}

func (jcf *JobCliFallbackFetcher) ScrapeDuration() time.Duration {
return jcf.scraper.Duration()
}

func (jcf *JobCliFallbackFetcher) ScrapeError() prometheus.Counter {
return jcf.errCounter
}

func totalAllocMem(resource *JobResource) float64 {
var allocMem float64
for _, node := range resource.AllocNodes {
allocMem += node.Mem
}
return allocMem
}

type NAbleTime struct{ time.Time }

// report beginning of time in the case of N/A
func (nat *NAbleTime) UnmarshalJSON(data []byte) error {
var tString string
if err := json.Unmarshal(data, &tString); err != nil {
return err
}
nullSet := map[string]struct{}{"N/A": {}, "NONE": {}}
if _, ok := nullSet[tString]; ok {
nat.Time = time.Time{}
return nil
}
t, err := time.Parse("2006-01-02T15:04:05", tString)
nat.Time = t
return err
}

type UserJobMetric struct {
stateJobCount map[string]float64
totalJobCount float64
Expand Down
152 changes: 125 additions & 27 deletions exporter/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ func TestNewJobsController(t *testing.T) {

func TestParseJobMetrics(t *testing.T) {
assert := assert.New(t)
fixture, err := MockJobInfoScraper.FetchRawBytes()
assert.Nil(err)
jms, err := parseJobMetrics(fixture)
assert.Nil(err)
assert.NotEmpty(jms)
scraper := &MockScraper{fixture: "fixtures/squeue_out.json"}
fetcher := &JobJsonFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{}),
}
jms, err := fetcher.fetch()
assert.NoError(err)
// test parse of single job
var job *JobMetric
for _, m := range jms {
Expand All @@ -63,22 +66,27 @@ func TestParseJobMetrics(t *testing.T) {

func TestParseCliFallback(t *testing.T) {
assert := assert.New(t)
fetcher := MockScraper{fixture: "fixtures/squeue_fallback.txt"}
data, err := fetcher.FetchRawBytes()
assert.Nil(err)
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"})
metrics, err := parseCliFallback(data, counter)
cliFallbackFetcher := &JobCliFallbackFetcher{
scraper: &MockScraper{fixture: "fixtures/squeue_fallback.txt"},
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}),
}
metrics, err := cliFallbackFetcher.fetch()
assert.Nil(err)
assert.NotEmpty(metrics)
assert.Equal(2., CollectCounterValue(counter))
assert.Equal(2., CollectCounterValue(cliFallbackFetcher.errCounter))
}

func TestUserJobMetric(t *testing.T) {
// setup
assert := assert.New(t)
fixture, err := MockJobInfoScraper.FetchRawBytes()
assert.Nil(err)
jms, err := parseJobMetrics(fixture)
scraper := &MockScraper{fixture: "fixtures/squeue_out.json"}
fetcher := &JobJsonFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{}),
}
jms, err := fetcher.fetch()
assert.Nil(err)

//test
Expand Down Expand Up @@ -158,9 +166,13 @@ func TestJobCollect_Fallback(t *testing.T) {

func TestParsePartitionJobMetrics(t *testing.T) {
assert := assert.New(t)
fixture, err := MockJobInfoScraper.FetchRawBytes()
assert.Nil(err)
jms, err := parseJobMetrics(fixture)
scraper := &MockScraper{fixture: "fixtures/squeue_out.json"}
fetcher := &JobJsonFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{}),
}
jms, err := fetcher.fetch()
assert.Nil(err)

partitionJobMetrics := parsePartitionJobMetrics(jms)
Expand All @@ -169,9 +181,13 @@ func TestParsePartitionJobMetrics(t *testing.T) {

func TestParsePartMetrics(t *testing.T) {
assert := assert.New(t)
fixture, err := MockJobInfoScraper.FetchRawBytes()
assert.Nil(err)
jms, err := parseJobMetrics(fixture)
scraper := &MockScraper{fixture: "fixtures/squeue_out.json"}
fetcher := &JobJsonFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{}),
}
jms, err := fetcher.fetch()
assert.Nil(err)

featureMetrics := parseFeatureMetric(jms)
Expand Down Expand Up @@ -222,15 +238,97 @@ func TestNAbleTimeJson_NA(t *testing.T) {

func TestParseCliFallbackEmpty(t *testing.T) {
assert := assert.New(t)
counter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "validation_counter",
})
metrics, err := parseCliFallback([]byte(""), counter)
scraper := &StringByteScraper{msg: ""}
cliFallbackFetcher := &JobCliFallbackFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}),
}
metrics, err := cliFallbackFetcher.fetch()
assert.NoError(err)
assert.Empty(metrics)
assert.Zero(CollectCounterValue(counter))
metrics, err = parseCliFallback([]byte("\n "), counter)
assert.Zero(CollectCounterValue(cliFallbackFetcher.errCounter))
assert.Equal(1, scraper.Callcount)
scraper.msg = "\n"
metrics, err = cliFallbackFetcher.fetch()
assert.NoError(err)
assert.Empty(metrics)
assert.Zero(CollectCounterValue(counter))
assert.Zero(CollectCounterValue(cliFallbackFetcher.errCounter))
assert.Equal(2, scraper.Callcount)
}

func TestCliJobFetcherCacheHit(t *testing.T) {
assert := assert.New(t)
scraper := &MockScraper{fixture: "fixtures/squeue_fallback.txt"}
cliFallbackFetcher := &JobCliFallbackFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}),
}
metrics, err := cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
assert.Equal(1, scraper.CallCount)
metrics, err = cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
// assert cache hit
assert.Equal(1, scraper.CallCount)
}

func TestCliJobFetcherCacheMiss(t *testing.T) {
assert := assert.New(t)
scraper := &MockScraper{fixture: "fixtures/squeue_fallback.txt"}
cliFallbackFetcher := &JobCliFallbackFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](0),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}),
}
metrics, err := cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
assert.Equal(1, scraper.CallCount)
metrics, err = cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
// assert cache hit
assert.Equal(2, scraper.CallCount)
}

func TestJsonJobFetcherCacheHit(t *testing.T) {
assert := assert.New(t)
scraper := &MockScraper{fixture: "fixtures/squeue_out.json"}
cliFallbackFetcher := &JobJsonFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](100),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}),
}
metrics, err := cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
assert.Equal(1, scraper.CallCount)
metrics, err = cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
// assert cache hit
assert.Equal(1, scraper.CallCount)
}

func TestJsonJobFetcherCacheMiss(t *testing.T) {
assert := assert.New(t)
scraper := &MockScraper{fixture: "fixtures/squeue_out.json"}
cliFallbackFetcher := &JobJsonFetcher{
scraper: scraper,
cache: NewAtomicThrottledCache[JobMetric](0),
errCounter: prometheus.NewCounter(prometheus.CounterOpts{Name: "errors"}),
}
metrics, err := cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
assert.Equal(1, scraper.CallCount)
metrics, err = cliFallbackFetcher.FetchMetrics()
assert.NotEmpty(metrics)
assert.NoError(err)
// assert cache hit
assert.Equal(2, scraper.CallCount)
}
Loading

0 comments on commit 3afae44

Please sign in to comment.