Skip to content

Commit

Permalink
Merge pull request #572 from oasisprotocol/mitjat/3phase-timings
Browse files Browse the repository at this point in the history
metrics: Add timings for data fetch, data analysis
  • Loading branch information
mitjat authored Dec 23, 2023
2 parents 652cf7c + 816e08a commit d9493f5
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 111 deletions.
4 changes: 2 additions & 2 deletions analyzer/aggregate_stats/aggregate_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type aggregateStatsAnalyzer struct {
target storage.TargetStorage

logger *log.Logger
metrics metrics.StorageMetrics
metrics metrics.AnalysisMetrics
}

var _ analyzer.Analyzer = (*aggregateStatsAnalyzer)(nil)
Expand All @@ -66,7 +66,7 @@ func NewAggregateStatsAnalyzer(target storage.TargetStorage, logger *log.Logger)
return &aggregateStatsAnalyzer{
target: target,
logger: logger.With("analyzer", aggregateStatsAnalyzerName),
metrics: metrics.NewDefaultStorageMetrics(aggregateStatsAnalyzerName),
metrics: metrics.NewDefaultAnalysisMetrics(aggregateStatsAnalyzerName),
}, nil
}

Expand Down
54 changes: 34 additions & 20 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type processor struct {
network sdkConfig.Network
target storage.TargetStorage
logger *log.Logger
metrics metrics.StorageMetrics
metrics metrics.AnalysisMetrics
}

var _ block.BlockProcessor = (*processor)(nil)
Expand All @@ -79,7 +79,7 @@ func NewAnalyzer(blockRange config.BlockRange, batchSize uint64, mode analyzer.B
network: network,
target: target,
logger: logger.With("analyzer", consensusAnalyzerName),
metrics: metrics.NewDefaultStorageMetrics(consensusAnalyzerName),
metrics: metrics.NewDefaultAnalysisMetrics(consensusAnalyzerName),
}

return block.NewAnalyzer(blockRange, batchSize, mode, consensusAnalyzerName, processor, target, logger)
Expand Down Expand Up @@ -220,24 +220,8 @@ func (m *processor) processGenesis(ctx context.Context, genesisDoc *genesis.Docu
return nil
}

// Implements BlockProcessor interface.
func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
if uheight > math.MaxInt64 {
return fmt.Errorf("height %d is too large", uheight)
}
height := int64(uheight)

// Fetch all data.
data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode)
if err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) {
return analyzer.ErrOutOfRange
}
return err
}

// Process data, prepare updates.
batch := &storage.QueryBatch{}
// Expands `batch` with DB statements that reflect the contents of `data`.
func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data allData) error {
for _, f := range []func(*storage.QueryBatch, *consensusBlockData) error{
m.queueBlockInserts,
m.queueEpochInserts,
Expand Down Expand Up @@ -300,6 +284,36 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
return err
}

return nil
}

// Implements BlockProcessor interface.
func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
if uheight > math.MaxInt64 {
return fmt.Errorf("height %d is too large", uheight)
}
height := int64(uheight)

// Fetch all data.
fetchTimer := m.metrics.BlockFetchLatencies()
data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode)
if err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) {
return analyzer.ErrOutOfRange
}
return err
}
fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading.

// Process data, prepare updates.
analysisTimer := m.metrics.BlockAnalysisLatencies()
batch := &storage.QueryBatch{}
err = m.queueDbUpdates(batch, *data)
analysisTimer.ObserveDuration()
if err != nil {
return err
}

// Update indexing progress.
batch.Queue(
queries.IndexingProgress,
Expand Down
8 changes: 6 additions & 2 deletions analyzer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type processor struct {
source nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
metrics metrics.StorageMetrics
metrics metrics.AnalysisMetrics
}

var _ block.BlockProcessor = (*processor)(nil)
Expand All @@ -55,7 +55,7 @@ func NewRuntimeAnalyzer(
source: sourceClient,
target: target,
logger: logger.With("analyzer", runtime),
metrics: metrics.NewDefaultStorageMetrics(string(runtime)),
metrics: metrics.NewDefaultAnalysisMetrics(string(runtime)),
}

return block.NewAnalyzer(blockRange, batchSize, mode, string(runtime), processor, target, logger)
Expand Down Expand Up @@ -137,6 +137,7 @@ func (m *processor) FinalizeFastSync(ctx context.Context, lastFastSyncHeight int
// Implements BlockProcessor interface.
func (m *processor) ProcessBlock(ctx context.Context, round uint64) error {
// Fetch all data.
fetchTimer := m.metrics.BlockFetchLatencies()
blockHeader, err := m.source.GetBlockHeader(ctx, round)
if err != nil {
if strings.Contains(err.Error(), "roothash: block not found") {
Expand All @@ -152,8 +153,10 @@ func (m *processor) ProcessBlock(ctx context.Context, round uint64) error {
if err != nil {
return err
}
fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading.

// Preprocess data.
analysisTimer := m.metrics.BlockAnalysisLatencies()
blockData, err := ExtractRound(*blockHeader, transactionsWithResults, rawEvents, m.logger)
if err != nil {
return err
Expand All @@ -163,6 +166,7 @@ func (m *processor) ProcessBlock(ctx context.Context, round uint64) error {
batch := &storage.QueryBatch{}
m.queueDbUpdates(batch, blockData)
m.queueAccountsEvents(batch, blockData)
analysisTimer.ObserveDuration()

// Update indexing progress.
batch.Queue(
Expand Down
135 changes: 135 additions & 0 deletions metrics/analysis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package metrics

import (
"fmt"
"math"

"github.com/prometheus/client_golang/prometheus"
)

// Default service metrics for analyzer operations.
type AnalysisMetrics struct {
// Name of the runtime (or "consensus") that is being analyzed.
runtime string

// Counts of database operations
databaseOperations *prometheus.CounterVec

// Latencies of database operations.
databaseLatencies *prometheus.HistogramVec

// Cache hit rates for the local cache.
localCacheReads *prometheus.CounterVec

// Latencies of analysis.
blockAnalysisLatencies *prometheus.HistogramVec

// Latencies of fetching a block's data from the node.
blockFetchLatencies *prometheus.HistogramVec
}

type CacheReadStatus string

const (
CacheReadStatusHit CacheReadStatus = "hit"
CacheReadStatusMiss CacheReadStatus = "miss"
CacheReadStatusBadValue CacheReadStatus = "bad_value" // Value in cache was not valid (likely because of mismatched types / CBOR encoding).
CacheReadStatusError CacheReadStatus = "error" // Other internal error reading from cache.
)

// defaultTimeBuckets returns a set of buckets for use in a timing histogram.
// The buckets are logarithmically spaced between two hardcoded thresholds.
func defaultTimeBuckets() []float64 {
const (
minThreshold = 0.0001
maxThreshold = 10 // The resulting output might be one bucket short due to rounding errors.
thresholdsPerOrderOfMagnitude = 10 // "order of magnitude" being 10x.
)

buckets := []float64{}
threshold := minThreshold
for threshold <= maxThreshold {
buckets = append(buckets, threshold)
threshold *= math.Pow(10, 1.0/thresholdsPerOrderOfMagnitude)
}
return buckets
}

// NewDefaultAnalysisMetrics creates Prometheus metric instrumentation
// for basic metrics common to storage accesses.
func NewDefaultAnalysisMetrics(runtime string) AnalysisMetrics {
metrics := AnalysisMetrics{
runtime: runtime,
databaseOperations: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: fmt.Sprintf("%s_db_operations", runtime),
Help: "How many database operations occur, partitioned by operation and status.",
},
[]string{"database", "operation", "status"}, // Labels.
),
databaseLatencies: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: fmt.Sprintf("%s_db_latencies", runtime),
Help: "How long database operations take, partitioned by operation.",
Buckets: defaultTimeBuckets(),
},
[]string{"database", "operation"}, // Labels.
),
localCacheReads: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "local_cache_reads",
Help: "How many local cache reads occur, partitioned by status (hit, miss, bad_data, error).",
},
[]string{"cache", "status"}, // Labels.
),
blockAnalysisLatencies: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "block_analysis_latencies",
Help: "How long it takes to analyze a block, NOT including data fetch (from the node) or writing (to the DB).",
Buckets: defaultTimeBuckets(),
},
[]string{"layer"}, // Labels.
),
blockFetchLatencies: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "block_fetch_latencies",
Help: "How long it takes to fetch data for a block from the node (possibly sped up by a local cache).",
Buckets: defaultTimeBuckets(),
},
[]string{"layer"}, // Labels.
),
}
metrics.databaseOperations = registerOnce(metrics.databaseOperations).(*prometheus.CounterVec)
metrics.databaseLatencies = registerOnce(metrics.databaseLatencies).(*prometheus.HistogramVec)
metrics.localCacheReads = registerOnce(metrics.localCacheReads).(*prometheus.CounterVec)
metrics.blockAnalysisLatencies = registerOnce(metrics.blockAnalysisLatencies).(*prometheus.HistogramVec)
metrics.blockFetchLatencies = registerOnce(metrics.blockFetchLatencies).(*prometheus.HistogramVec)
return metrics
}

// DatabaseOperations returns the counter for the database operation.
// The provided params are used as labels.
func (m *AnalysisMetrics) DatabaseOperations(db, operation, status string) prometheus.Counter {
return m.databaseOperations.WithLabelValues(db, operation, status)
}

// DatabaseLatencies returns a new latency timer for the provided
// database operation.
// The provided params are used as labels.
func (m *AnalysisMetrics) DatabaseLatencies(db string, operation string) *prometheus.Timer {
return prometheus.NewTimer(m.databaseLatencies.WithLabelValues(db, operation))
}

// LocalCacheReads returns the counter for the local cache read.
// The provided params are used as labels.
func (m *AnalysisMetrics) LocalCacheReads(status CacheReadStatus) prometheus.Counter {
return m.localCacheReads.WithLabelValues(m.runtime, string(status))
}

func (m *AnalysisMetrics) BlockAnalysisLatencies() *prometheus.Timer {
return prometheus.NewTimer(m.blockAnalysisLatencies.WithLabelValues(m.runtime))
}

func (m *AnalysisMetrics) BlockFetchLatencies() *prometheus.Timer {
return prometheus.NewTimer(m.blockFetchLatencies.WithLabelValues(m.runtime))
}
83 changes: 0 additions & 83 deletions metrics/storage.go

This file was deleted.

2 changes: 1 addition & 1 deletion storage/oasis/nodeapi/file/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewFileConsensusApiLite(cacheDir string, consensusApi nodeapi.ConsensusApiL
db, err := OpenKVStore(
log.NewDefaultLogger("cached-node-api").With("runtime", "consensus"),
cacheDir,
common.Ptr(metrics.NewDefaultStorageMetrics("consensus")),
common.Ptr(metrics.NewDefaultAnalysisMetrics("consensus")),
)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions storage/oasis/nodeapi/file/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type pogrebKVStore struct {

path string
logger *log.Logger
metrics *metrics.StorageMetrics // if nil, no metrics are emitted
metrics *metrics.AnalysisMetrics // if nil, no metrics are emitted

// Address of the atomic variable that indicates whether the store is initialized.
// Synchronisation is required because the store is opened in background goroutine.
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *pogrebKVStore) init() error {

// Initializes a new KVStore backed by a database at `path`, or opens an existing one.
// `metrics` can be `nil`, in which case no metrics are emitted during operation.
func OpenKVStore(logger *log.Logger, path string, metrics *metrics.StorageMetrics) (KVStore, error) {
func OpenKVStore(logger *log.Logger, path string, metrics *metrics.AnalysisMetrics) (KVStore, error) {
store := &pogrebKVStore{
logger: logger,
path: path,
Expand Down
Loading

0 comments on commit d9493f5

Please sign in to comment.