diff --git a/core/services/ocr2/plugins/promwrapper/plugin.go b/core/services/ocr2/plugins/promwrapper/plugin.go index cc6c9d135dd..448c2ed2ddf 100644 --- a/core/services/ocr2/plugins/promwrapper/plugin.go +++ b/core/services/ocr2/plugins/promwrapper/plugin.go @@ -5,17 +5,27 @@ package promwrapper import ( "context" + "encoding/json" "fmt" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/patrickmn/go-cache" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) +const ( + // DefaultExpiration is the default expiration time for cache items. + DefaultExpiration = 2 * time.Hour + + // DefaultCleanupInterval is the default interval for cache cleanup. + DefaultCleanupInterval = 30 * time.Minute +) + // Type assertions, buckets and labels. var ( _ types.ReportingPlugin = &promPlugin{} @@ -162,7 +172,7 @@ type ( configDigest string queryEndTimes sync.Map observationEndTimes sync.Map - reportEndTimes sync.Map + reportEndTimes *cache.Cache acceptFinalizedReportEndTimes sync.Map prometheusBackend PrometheusBackend } @@ -230,6 +240,7 @@ func New( oracleID: fmt.Sprintf("%d", config.OracleID), configDigest: common.Bytes2Hex(config.ConfigDigest[:]), prometheusBackend: prometheusBackend, + reportEndTimes: cache.New(DefaultExpiration, DefaultCleanupInterval), } } @@ -280,21 +291,33 @@ func (p *promPlugin) Report(ctx context.Context, timestamp types.ReportTimestamp defer func() { duration := float64(time.Now().UTC().Sub(start)) p.prometheusBackend.SetReportDuration(labelValues, duration) - p.reportEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of Report() + + // hash the timestamp + key := timestampToKey(timestamp) + p.reportEndTimes.Set(key, time.Now().UTC(), cache.DefaultExpiration) // note time at end of Report() }() return p.wrapped.Report(ctx, timestamp, query, observations) } +func timestampToKey(timestamp types.ReportTimestamp) string { + jsonData, _ := json.Marshal(timestamp) + key := string(jsonData) + return key +} + func (p *promPlugin) ShouldAcceptFinalizedReport(ctx context.Context, timestamp types.ReportTimestamp, report types.Report) (bool, error) { start := time.Now().UTC() + jsonData, _ := json.Marshal(timestamp) + key := string(jsonData) + // Report latency between Report() and ShouldAcceptFinalizedReport(). labelValues := getLabelsValues(p, timestamp) - if reportEndTime, ok := p.reportEndTimes.Load(timestamp); ok { + if reportEndTime, ok := p.reportEndTimes.Get(key); ok { latency := float64(start.Sub(reportEndTime.(time.Time))) p.prometheusBackend.SetReportToAcceptFinalizedReportLatency(labelValues, latency) - p.reportEndTimes.Delete(timestamp) + p.reportEndTimes.Delete(key) } // Report latency for ShouldAcceptFinalizedReport() at end of call. diff --git a/core/services/ocr2/plugins/promwrapper/plugin_test.go b/core/services/ocr2/plugins/promwrapper/plugin_test.go index b4de7f027f3..12ecf080ba1 100644 --- a/core/services/ocr2/plugins/promwrapper/plugin_test.go +++ b/core/services/ocr2/plugins/promwrapper/plugin_test.go @@ -217,13 +217,13 @@ func TestPlugin_GetLatencies(t *testing.T) { require.NoError(t, err) _, ok = promPlugin.observationEndTimes.Load(reportTimestamp) require.Equal(t, false, ok) - _, ok = promPlugin.reportEndTimes.Load(reportTimestamp) + _, ok = promPlugin.reportEndTimes.Get(timestampToKey(reportTimestamp)) require.Equal(t, true, ok) time.Sleep(rToALatency) _, err = promPlugin.ShouldAcceptFinalizedReport(ctx, reportTimestamp, nil) require.NoError(t, err) - _, ok = promPlugin.reportEndTimes.Load(reportTimestamp) + _, ok = promPlugin.reportEndTimes.Get(timestampToKey(reportTimestamp)) require.Equal(t, false, ok) _, ok = promPlugin.acceptFinalizedReportEndTimes.Load(reportTimestamp) require.Equal(t, true, ok)