Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

analyzer: configurable MaxBackoffTimeout #837

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/837.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
analyzers: add configurable `MaxBackoffTimeout`
15 changes: 13 additions & 2 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type itemBasedAnalyzer[Item any] struct {
stopIfQueueEmptyFor time.Duration
fixedInterval time.Duration
interItemDelay time.Duration
maxBackoffTime time.Duration
analyzerName string

processor ItemProcessor[Item]
Expand Down Expand Up @@ -68,6 +69,10 @@ type ItemProcessor[Item any] interface {
// If fixedInterval is provided, the analyzer will process one batch every fixedInterval.
// By default, the analyzer will use a backoff mechanism that will attempt to run as
// fast as possible until encountering an error.
//
// If maxBackoffTime is provided, the backoff mechanism will cap the maximum backoff time
// to the provided value. By default (if not provided), the maximum backoff time is 6 seconds,
// which roughly corresponds to the expected consensus block time.
func NewAnalyzer[Item any](
name string,
cfg config.ItemBasedAnalyzerConfig,
Expand All @@ -83,6 +88,7 @@ func NewAnalyzer[Item any](
stopIfQueueEmptyFor: cfg.StopIfQueueEmptyFor,
fixedInterval: cfg.Interval,
interItemDelay: cfg.InterItemDelay,
maxBackoffTime: cfg.MaxBackoffTime,
analyzerName: name,
processor: processor,
target: target,
Expand Down Expand Up @@ -188,10 +194,15 @@ func processErrors(errs []error) (int, error) {

// Start starts the item based analyzer.
func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) {
// Cap the timeout at the expected consensus block time, if not provided.
maxBackoff := 6 * time.Second
if a.maxBackoffTime != 0 {
maxBackoff = a.maxBackoffTime
}

backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected consensus block time
6*time.Second,
maxBackoff,
)
if err != nil {
a.logger.Error("error configuring backoff policy",
Expand Down
3 changes: 2 additions & 1 deletion analyzer/item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
GRANT SELECT ON ALL TABLES IN SCHEMA analysis TO PUBLIC`

testItemInsert = `
INSERT INTO analysis.item_analyzer_test(id)
INSERT INTO analysis.item_analyzer_test(id)
VALUES ($1)`

testItemCounts = `
Expand All @@ -54,6 +54,7 @@ var testItemBasedConfig = &config.ItemBasedAnalyzerConfig{
StopIfQueueEmptyFor: time.Second,
Interval: 0, // use backoff
InterItemDelay: 0,
MaxBackoffTime: 0,
}

type mockItem struct {
Expand Down
7 changes: 7 additions & 0 deletions analyzer/validatorstakinghistory/validatorstakinghistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/jackc/pgx/v5"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
Expand Down Expand Up @@ -54,6 +55,12 @@ func NewAnalyzer(
) (analyzer.Analyzer, error) {
logger = logger.With("analyzer", validatorHistoryAnalyzerName)

if cfg.MaxBackoffTime == 0 {
// Once caught up we will receive work only after epochs are finalized,
// so increase the maximum backoff timeout a bit.
cfg.MaxBackoffTime = 5 * time.Minute
}

// Find the epoch corresponding to startHeight.
if startHeight > math.MaxInt64 {
return nil, fmt.Errorf("startHeight %d is too large", startHeight)
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ type ItemBasedAnalyzerConfig struct {
// new goroutines to process items within a batch. This is useful for cases where
// processItem() makes out of band requests to rate limited resources.
InterItemDelay time.Duration `koanf:"inter_item_delay"`

// MaxBackoffTime determines the maximum backoff time the analyzer will use when
// processing items. This is useful for cases where the expected analyzer cadence
// is different from the consensus block time (e.g. 6 seconds).
MaxBackoffTime time.Duration `koanf:"max_backoff_time"`
}

type EvmTokensAnalyzerConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_regression/damask/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ analysis:
evm_contract_code_emerald: { stop_if_queue_empty_for: 1s }
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
validator_staking_history: { from: 8_048_956, stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 8_048_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
# Some non-block analyzers are not tested in e2e regressions.
# They are largely not worth the trouble as they do not interact with rest of the system much.
# metadata_registry: {} # Awkward to inject mock registry responses.
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_regression/eden/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ analysis:
evm_contract_code_emerald: { stop_if_queue_empty_for: 1s }
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
# Some non-block analyzers are not tested in e2e regressions.
# They are largely not worth the trouble as they do not interact with rest of the system much.
# metadata_registry: {} # Awkward to inject mock registry responses.
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_regression/edenfast/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ analysis:
evm_contract_code_emerald: { stop_if_queue_empty_for: 1s }
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
# Some non-block analyzers are not tested in e2e regressions.
# They are largely not worth the trouble as they do not interact with rest of the system much.
# metadata_registry: {} # Awkward to inject mock registry responses.
Expand Down
Loading