diff --git a/analyzer/aggregate_stats/aggregate_stats.go b/analyzer/aggregate_stats/aggregate_stats.go index 657056912..4a80d30c9 100644 --- a/analyzer/aggregate_stats/aggregate_stats.go +++ b/analyzer/aggregate_stats/aggregate_stats.go @@ -52,7 +52,7 @@ type aggregateStatsAnalyzer struct { target storage.TargetStorage logger *log.Logger - metrics metrics.StorageMetrics + metrics metrics.AnalysisMetrics } var _ analyzer.Analyzer = (*aggregateStatsAnalyzer)(nil) @@ -66,7 +66,7 @@ func NewAggregateStatsAnalyzer(target storage.TargetStorage, logger *log.Logger) return &aggregateStatsAnalyzer{ target: target, logger: logger.With("analyzer", aggregateStatsAnalyzerName), - metrics: metrics.NewDefaultStorageMetrics(aggregateStatsAnalyzerName), + metrics: metrics.NewDefaultAnalysisMetrics(aggregateStatsAnalyzerName), }, nil } diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index a88a2da16..f4eb5f097 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -65,7 +65,7 @@ type processor struct { network sdkConfig.Network target storage.TargetStorage logger *log.Logger - metrics metrics.StorageMetrics + metrics metrics.AnalysisMetrics } var _ block.BlockProcessor = (*processor)(nil) @@ -79,7 +79,7 @@ func NewAnalyzer(blockRange config.BlockRange, batchSize uint64, mode analyzer.B network: network, target: target, logger: logger.With("analyzer", consensusAnalyzerName), - metrics: metrics.NewDefaultStorageMetrics(consensusAnalyzerName), + metrics: metrics.NewDefaultAnalysisMetrics(consensusAnalyzerName), } return block.NewAnalyzer(blockRange, batchSize, mode, consensusAnalyzerName, processor, target, logger) @@ -220,24 +220,8 @@ func (m *processor) processGenesis(ctx context.Context, genesisDoc *genesis.Docu return nil } -// Implements BlockProcessor interface. -func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error { - if uheight > math.MaxInt64 { - return fmt.Errorf("height %d is too large", uheight) - } - height := int64(uheight) - - // Fetch all data. - data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode) - if err != nil { - if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) { - return analyzer.ErrOutOfRange - } - return err - } - - // Process data, prepare updates. - batch := &storage.QueryBatch{} +// Expands `batch` with DB statements that reflect the contents of `data`. +func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data allData) error { for _, f := range []func(*storage.QueryBatch, *consensusBlockData) error{ m.queueBlockInserts, m.queueEpochInserts, @@ -300,6 +284,36 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error { return err } + return nil +} + +// Implements BlockProcessor interface. +func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error { + if uheight > math.MaxInt64 { + return fmt.Errorf("height %d is too large", uheight) + } + height := int64(uheight) + + // Fetch all data. + fetchTimer := m.metrics.BlockFetchLatencies() + data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode) + if err != nil { + if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) { + return analyzer.ErrOutOfRange + } + return err + } + fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading. + + // Process data, prepare updates. + analysisTimer := m.metrics.BlockAnalysisLatencies() + batch := &storage.QueryBatch{} + err = m.queueDbUpdates(batch, *data) + analysisTimer.ObserveDuration() + if err != nil { + return err + } + // Update indexing progress. batch.Queue( queries.IndexingProgress, diff --git a/analyzer/runtime/runtime.go b/analyzer/runtime/runtime.go index 08990d14a..f9f4c51a1 100644 --- a/analyzer/runtime/runtime.go +++ b/analyzer/runtime/runtime.go @@ -31,7 +31,7 @@ type processor struct { source nodeapi.RuntimeApiLite target storage.TargetStorage logger *log.Logger - metrics metrics.StorageMetrics + metrics metrics.AnalysisMetrics } var _ block.BlockProcessor = (*processor)(nil) @@ -55,7 +55,7 @@ func NewRuntimeAnalyzer( source: sourceClient, target: target, logger: logger.With("analyzer", runtime), - metrics: metrics.NewDefaultStorageMetrics(string(runtime)), + metrics: metrics.NewDefaultAnalysisMetrics(string(runtime)), } return block.NewAnalyzer(blockRange, batchSize, mode, string(runtime), processor, target, logger) @@ -137,6 +137,7 @@ func (m *processor) FinalizeFastSync(ctx context.Context, lastFastSyncHeight int // Implements BlockProcessor interface. func (m *processor) ProcessBlock(ctx context.Context, round uint64) error { // Fetch all data. + fetchTimer := m.metrics.BlockFetchLatencies() blockHeader, err := m.source.GetBlockHeader(ctx, round) if err != nil { if strings.Contains(err.Error(), "roothash: block not found") { @@ -152,8 +153,10 @@ func (m *processor) ProcessBlock(ctx context.Context, round uint64) error { if err != nil { return err } + fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading. // Preprocess data. + analysisTimer := m.metrics.BlockAnalysisLatencies() blockData, err := ExtractRound(*blockHeader, transactionsWithResults, rawEvents, m.logger) if err != nil { return err @@ -163,6 +166,7 @@ func (m *processor) ProcessBlock(ctx context.Context, round uint64) error { batch := &storage.QueryBatch{} m.queueDbUpdates(batch, blockData) m.queueAccountsEvents(batch, blockData) + analysisTimer.ObserveDuration() // Update indexing progress. batch.Queue( diff --git a/metrics/analysis.go b/metrics/analysis.go new file mode 100644 index 000000000..542ea42ef --- /dev/null +++ b/metrics/analysis.go @@ -0,0 +1,135 @@ +package metrics + +import ( + "fmt" + "math" + + "github.com/prometheus/client_golang/prometheus" +) + +// Default service metrics for analyzer operations. +type AnalysisMetrics struct { + // Name of the runtime (or "consensus") that is being analyzed. + runtime string + + // Counts of database operations + databaseOperations *prometheus.CounterVec + + // Latencies of database operations. + databaseLatencies *prometheus.HistogramVec + + // Cache hit rates for the local cache. + localCacheReads *prometheus.CounterVec + + // Latencies of analysis. + blockAnalysisLatencies *prometheus.HistogramVec + + // Latencies of fetching a block's data from the node. + blockFetchLatencies *prometheus.HistogramVec +} + +type CacheReadStatus string + +const ( + CacheReadStatusHit CacheReadStatus = "hit" + CacheReadStatusMiss CacheReadStatus = "miss" + CacheReadStatusBadValue CacheReadStatus = "bad_value" // Value in cache was not valid (likely because of mismatched types / CBOR encoding). + CacheReadStatusError CacheReadStatus = "error" // Other internal error reading from cache. +) + +// defaultTimeBuckets returns a set of buckets for use in a timing histogram. +// The buckets are logarithmically spaced between two hardcoded thresholds. +func defaultTimeBuckets() []float64 { + const ( + minThreshold = 0.0001 + maxThreshold = 10 // The resulting output might be one bucket short due to rounding errors. + thresholdsPerOrderOfMagnitude = 10 // "order of magnitude" being 10x. + ) + + buckets := []float64{} + threshold := minThreshold + for threshold <= maxThreshold { + buckets = append(buckets, threshold) + threshold *= math.Pow(10, 1.0/thresholdsPerOrderOfMagnitude) + } + return buckets +} + +// NewDefaultAnalysisMetrics creates Prometheus metric instrumentation +// for basic metrics common to storage accesses. +func NewDefaultAnalysisMetrics(runtime string) AnalysisMetrics { + metrics := AnalysisMetrics{ + runtime: runtime, + databaseOperations: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: fmt.Sprintf("%s_db_operations", runtime), + Help: "How many database operations occur, partitioned by operation and status.", + }, + []string{"database", "operation", "status"}, // Labels. + ), + databaseLatencies: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: fmt.Sprintf("%s_db_latencies", runtime), + Help: "How long database operations take, partitioned by operation.", + Buckets: defaultTimeBuckets(), + }, + []string{"database", "operation"}, // Labels. + ), + localCacheReads: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "local_cache_reads", + Help: "How many local cache reads occur, partitioned by status (hit, miss, bad_data, error).", + }, + []string{"cache", "status"}, // Labels. + ), + blockAnalysisLatencies: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "block_analysis_latencies", + Help: "How long it takes to analyze a block, NOT including data fetch (from the node) or writing (to the DB).", + Buckets: defaultTimeBuckets(), + }, + []string{"layer"}, // Labels. + ), + blockFetchLatencies: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "block_fetch_latencies", + Help: "How long it takes to fetch data for a block from the node (possibly sped up by a local cache).", + Buckets: defaultTimeBuckets(), + }, + []string{"layer"}, // Labels. + ), + } + metrics.databaseOperations = registerOnce(metrics.databaseOperations).(*prometheus.CounterVec) + metrics.databaseLatencies = registerOnce(metrics.databaseLatencies).(*prometheus.HistogramVec) + metrics.localCacheReads = registerOnce(metrics.localCacheReads).(*prometheus.CounterVec) + metrics.blockAnalysisLatencies = registerOnce(metrics.blockAnalysisLatencies).(*prometheus.HistogramVec) + metrics.blockFetchLatencies = registerOnce(metrics.blockFetchLatencies).(*prometheus.HistogramVec) + return metrics +} + +// DatabaseOperations returns the counter for the database operation. +// The provided params are used as labels. +func (m *AnalysisMetrics) DatabaseOperations(db, operation, status string) prometheus.Counter { + return m.databaseOperations.WithLabelValues(db, operation, status) +} + +// DatabaseLatencies returns a new latency timer for the provided +// database operation. +// The provided params are used as labels. +func (m *AnalysisMetrics) DatabaseLatencies(db string, operation string) *prometheus.Timer { + return prometheus.NewTimer(m.databaseLatencies.WithLabelValues(db, operation)) +} + +// LocalCacheReads returns the counter for the local cache read. +// The provided params are used as labels. +func (m *AnalysisMetrics) LocalCacheReads(status CacheReadStatus) prometheus.Counter { + return m.localCacheReads.WithLabelValues(m.runtime, string(status)) +} + +func (m *AnalysisMetrics) BlockAnalysisLatencies() *prometheus.Timer { + return prometheus.NewTimer(m.blockAnalysisLatencies.WithLabelValues(m.runtime)) +} + +func (m *AnalysisMetrics) BlockFetchLatencies() *prometheus.Timer { + return prometheus.NewTimer(m.blockFetchLatencies.WithLabelValues(m.runtime)) +} diff --git a/metrics/storage.go b/metrics/storage.go deleted file mode 100644 index 0dd0ee70f..000000000 --- a/metrics/storage.go +++ /dev/null @@ -1,83 +0,0 @@ -package metrics - -import ( - "fmt" - - "github.com/prometheus/client_golang/prometheus" -) - -// Default service metrics for database operations. -type StorageMetrics struct { - // Name of the runtime (or "consensus") that is being analyzed. - runtime string - - // Counts of database operations - databaseOperations *prometheus.CounterVec - - // Latencies of database operations. - databaseLatencies *prometheus.HistogramVec - - // Cache hit rates for the local cache. - localCacheReads *prometheus.CounterVec -} - -type CacheReadStatus string - -const ( - CacheReadStatusHit CacheReadStatus = "hit" - CacheReadStatusMiss CacheReadStatus = "miss" - CacheReadStatusBadValue CacheReadStatus = "bad_value" // Value in cache was not valid (likely because of mismatched types / CBOR encoding). - CacheReadStatusError CacheReadStatus = "error" // Other internal error reading from cache. -) - -// NewDefaultStorageMetrics creates Prometheus metric instrumentation -// for basic metrics common to storage accesses. -func NewDefaultStorageMetrics(runtime string) StorageMetrics { - metrics := StorageMetrics{ - runtime: runtime, - databaseOperations: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: fmt.Sprintf("%s_db_operations", runtime), - Help: "How many database operations occur, partitioned by operation and status.", - }, - []string{"database", "operation", "status"}, // Labels. - ), - databaseLatencies: prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: fmt.Sprintf("%s_db_latencies", runtime), - Help: "How long database operations take, partitioned by operation.", - }, - []string{"database", "operation"}, // Labels. - ), - localCacheReads: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "local_cache_reads", - Help: "How many local cache reads occur, partitioned by status (hit, miss, bad_data, error).", - }, - []string{"cache", "status"}, // Labels. - ), - } - metrics.databaseOperations = registerOnce(metrics.databaseOperations).(*prometheus.CounterVec) - metrics.databaseLatencies = registerOnce(metrics.databaseLatencies).(*prometheus.HistogramVec) - metrics.localCacheReads = registerOnce(metrics.localCacheReads).(*prometheus.CounterVec) - return metrics -} - -// DatabaseOperations returns the counter for the database operation. -// The provided params are used as labels. -func (m *StorageMetrics) DatabaseOperations(db, operation, status string) prometheus.Counter { - return m.databaseOperations.WithLabelValues(db, operation, status) -} - -// DatabaseLatencies returns a new latency timer for the provided -// database operation. -// The provided params are used as labels. -func (m *StorageMetrics) DatabaseLatencies(db string, operation string) *prometheus.Timer { - return prometheus.NewTimer(m.databaseLatencies.WithLabelValues(db, operation)) -} - -// LocalCacheReads returns the counter for the local cache read. -// The provided params are used as labels. -func (m *StorageMetrics) LocalCacheReads(status CacheReadStatus) prometheus.Counter { - return m.localCacheReads.WithLabelValues(m.runtime, string(status)) -} diff --git a/storage/oasis/nodeapi/file/consensus.go b/storage/oasis/nodeapi/file/consensus.go index 2bf99326f..1d178822f 100644 --- a/storage/oasis/nodeapi/file/consensus.go +++ b/storage/oasis/nodeapi/file/consensus.go @@ -31,7 +31,7 @@ func NewFileConsensusApiLite(cacheDir string, consensusApi nodeapi.ConsensusApiL db, err := OpenKVStore( log.NewDefaultLogger("cached-node-api").With("runtime", "consensus"), cacheDir, - common.Ptr(metrics.NewDefaultStorageMetrics("consensus")), + common.Ptr(metrics.NewDefaultAnalysisMetrics("consensus")), ) if err != nil { return nil, err diff --git a/storage/oasis/nodeapi/file/kvstore.go b/storage/oasis/nodeapi/file/kvstore.go index e4288bfd1..9e8916e39 100644 --- a/storage/oasis/nodeapi/file/kvstore.go +++ b/storage/oasis/nodeapi/file/kvstore.go @@ -34,7 +34,7 @@ type pogrebKVStore struct { path string logger *log.Logger - metrics *metrics.StorageMetrics // if nil, no metrics are emitted + metrics *metrics.AnalysisMetrics // if nil, no metrics are emitted // Address of the atomic variable that indicates whether the store is initialized. // Synchronisation is required because the store is opened in background goroutine. @@ -97,7 +97,7 @@ func (s *pogrebKVStore) init() error { // Initializes a new KVStore backed by a database at `path`, or opens an existing one. // `metrics` can be `nil`, in which case no metrics are emitted during operation. -func OpenKVStore(logger *log.Logger, path string, metrics *metrics.StorageMetrics) (KVStore, error) { +func OpenKVStore(logger *log.Logger, path string, metrics *metrics.AnalysisMetrics) (KVStore, error) { store := &pogrebKVStore{ logger: logger, path: path, diff --git a/storage/oasis/nodeapi/file/runtime.go b/storage/oasis/nodeapi/file/runtime.go index 85f256f6c..60b1b41f1 100644 --- a/storage/oasis/nodeapi/file/runtime.go +++ b/storage/oasis/nodeapi/file/runtime.go @@ -25,7 +25,7 @@ func NewFileRuntimeApiLite(runtime common.Runtime, cacheDir string, runtimeApi n db, err := OpenKVStore( log.NewDefaultLogger("cached-node-api").With("runtime", runtime), cacheDir, - common.Ptr(metrics.NewDefaultStorageMetrics(string(runtime))), + common.Ptr(metrics.NewDefaultAnalysisMetrics(string(runtime))), ) if err != nil { return nil, err