Skip to content

Commit

Permalink
[Caplin] beginnings of instrumentation (#7486)
Browse files Browse the repository at this point in the history
this pr is ready for review, but it is waiting on this PR 

VictoriaMetrics/metrics#45

so that we do not need to use a replace directive.
  • Loading branch information
elee1766 authored May 11, 2023
1 parent 404e395 commit fd6acd4
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 40 deletions.
1 change: 0 additions & 1 deletion cl/merkle_tree/hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (m *merkleHasher) merkleizeTrieLeaves(leaves [][32]byte) ([32]byte, error)
func (m *merkleHasher) merkleizeTrieLeavesFlat(leaves []byte, out []byte) (err error) {
m.mu.Lock()
defer m.mu.Unlock()

layer := m.getBufferFromFlat(leaves)
for len(layer) > 1 {
if err := gohashtree.Hash(layer, layer); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions cmd/erigon-cl/core/state/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"crypto/sha256"
"encoding/binary"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/lru"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/raw"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/shuffling"
)
Expand Down Expand Up @@ -194,13 +194,13 @@ func (b *BeaconState) _refreshActiveBalances() {

func (b *BeaconState) initCaches() error {
var err error
if b.activeValidatorsCache, err = lru.New[uint64, []uint64](5); err != nil {
if b.activeValidatorsCache, err = lru.New[uint64, []uint64]("beacon_active_validators_cache", 5); err != nil {
return err
}
if b.shuffledSetsCache, err = lru.New[common.Hash, []uint64](5); err != nil {
if b.shuffledSetsCache, err = lru.New[common.Hash, []uint64]("beacon_shuffled_sets_cache", 5); err != nil {
return err
}
if b.committeeCache, err = lru.New[[16]byte, []uint64](256); err != nil {
if b.committeeCache, err = lru.New[[16]byte, []uint64]("beacon_committee_cache", 256); err != nil {
return err
}
return nil
Expand Down
1 change: 1 addition & 0 deletions cmd/erigon-cl/core/state/cache_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (b *BeaconState) GetBeaconCommitee(slot, committeeIndex uint64) ([]uint64,
var cacheKey [16]byte
binary.BigEndian.PutUint64(cacheKey[:], slot)
binary.BigEndian.PutUint64(cacheKey[8:], committeeIndex)

if cachedCommittee, ok := b.committeeCache.Get(cacheKey); ok {
return cachedCommittee, nil
}
Expand Down
33 changes: 33 additions & 0 deletions cmd/erigon-cl/core/state/lru/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package lru

import (
"fmt"

"github.com/VictoriaMetrics/metrics"
lru "github.com/hashicorp/golang-lru/v2"
)

// Cache is a wrapper around hashicorp lru but with metric for Get
type Cache[K comparable, V any] struct {
*lru.Cache[K, V]

metricName string
}

func New[K comparable, V any](metricName string, size int) (*Cache[K, V], error) {
v, err := lru.NewWithEvict[K, V](size, nil)
if err != nil {
return nil, err
}
return &Cache[K, V]{Cache: v, metricName: metricName}, nil
}

func (c *Cache[K, V]) Get(k K) (V, bool) {
v, ok := c.Cache.Get(k)
if ok {
metrics.GetOrCreateCounter(fmt.Sprintf(`golang_lru_cache_hit{%s="%s"}`, "cache", c.metricName)).Inc()
} else {
metrics.GetOrCreateCounter(fmt.Sprintf(`golang_lru_cache_miss{%s="%s"}`, "cache", c.metricName)).Inc()
}
return v, ok
}
19 changes: 17 additions & 2 deletions cmd/erigon-cl/core/state/ssz.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
package state

import (
"github.com/ledgerwatch/erigon/metrics/methelp"

"github.com/ledgerwatch/erigon-lib/types/clonable"
)

func (b *BeaconState) EncodeSSZ(buf []byte) ([]byte, error) {
return b.BeaconState.EncodeSSZ(buf)
h := methelp.NewHistTimer("encode_ssz_beacon_state_dur")
bts, err := b.BeaconState.EncodeSSZ(buf)
if err != nil {
return nil, err
}
h.PutSince()
sz := methelp.NewHistTimer("encode_ssz_beacon_state_size")
sz.Update(float64(len(bts)))
return bts, err
}

func (b *BeaconState) DecodeSSZ(buf []byte, version int) error {
h := methelp.NewHistTimer("decode_ssz_beacon_state_dur")
if err := b.BeaconState.DecodeSSZ(buf, version); err != nil {
return err
}
sz := methelp.NewHistTimer("decode_ssz_beacon_state_size")
sz.Update(float64(len(buf)))
h.PutSince()
return b.initBeaconState()
}

// SSZ size of the Beacon State
func (b *BeaconState) EncodingSizeSSZ() (size int) {
return b.BeaconState.EncodingSizeSSZ()
sz := b.BeaconState.EncodingSizeSSZ()
return sz
}

func (b *BeaconState) Clone() clonable.Clonable {
Expand Down
5 changes: 1 addition & 4 deletions cmd/erigon-cl/core/state/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package state
import (
"sort"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state/lru"
)

func copyLRU[K comparable, V any](dst *lru.Cache[K, V], src *lru.Cache[K, V]) *lru.Cache[K, V] {
if dst == nil {
dst = new(lru.Cache[K, V])
}
dst.Purge()
for _, key := range src.Keys() {
val, has := src.Get(key)
Expand Down
38 changes: 38 additions & 0 deletions cmd/erigon-cl/core/transition/block_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state"
"github.com/ledgerwatch/erigon/metrics/methelp"
)

// processBlock takes a block and transitions the state to the next slot, using the provided execution payload if enabled.
Expand All @@ -19,90 +20,127 @@ func processBlock(state *state.BeaconState, signedBlock *cltypes.SignedBeaconBlo
return fmt.Errorf("processBlock: wrong state version for block at slot %d", block.Slot)
}

h := methelp.NewHistTimer("beacon_process_block")

c := h.Tag("process_step", "block_header")
// Process the block header.
if err := ProcessBlockHeader(state, block, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process block header: %v", err)
}
c.PutSince()

// Process execution payload if enabled.
if version >= clparams.BellatrixVersion && executionEnabled(state, block.Body.ExecutionPayload) {
if state.Version() >= clparams.CapellaVersion {
// Process withdrawals in the execution payload.
c = h.Tag("process_step", "withdrawals")
if err := ProcessWithdrawals(state, block.Body.ExecutionPayload.Withdrawals, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process withdrawals: %v", err)
}
c.PutSince()
}

// Process the execution payload.
c = h.Tag("process_step", "execution_payload")
if err := ProcessExecutionPayload(state, block.Body.ExecutionPayload); err != nil {
return fmt.Errorf("processBlock: failed to process execution payload: %v", err)
}
c.PutSince()
}

// Process RANDAO reveal.
c = h.Tag("process_step", "randao_reveal")
if err := ProcessRandao(state, block.Body.RandaoReveal, block.ProposerIndex, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process RANDAO reveal: %v", err)
}
c.PutSince()

// Process Eth1 data.
c = h.Tag("process_step", "eth1_data")
if err := ProcessEth1Data(state, block.Body.Eth1Data); err != nil {
return fmt.Errorf("processBlock: failed to process Eth1 data: %v", err)
}
c.PutSince()

// Process block body operations.
c = h.Tag("process_step", "operations")
if err := processOperations(state, block.Body, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process block body operations: %v", err)
}
c.PutSince()

// Process sync aggregate in case of Altair version.
if version >= clparams.AltairVersion {
c = h.Tag("process_step", "sync_aggregate")
if err := ProcessSyncAggregate(state, block.Body.SyncAggregate, fullValidation); err != nil {
return fmt.Errorf("processBlock: failed to process sync aggregate: %v", err)
}
c.PutSince()
}

h.PutSince()
return nil
}

func processOperations(state *state.BeaconState, blockBody *cltypes.BeaconBody, fullValidation bool) error {
if len(blockBody.Deposits) != int(maximumDeposits(state)) {
return errors.New("outstanding deposits do not match maximum deposits")
}
h := methelp.NewHistTimer("beacon_process_block_operations")

// Process each proposer slashing
c := h.Tag("operation", "proposer_slashings")
for _, slashing := range blockBody.ProposerSlashings {
if err := ProcessProposerSlashing(state, slashing); err != nil {
return fmt.Errorf("ProcessProposerSlashing: %s", err)
}
}
c.PutSince()
// Process each attester slashing
c = h.Tag("operation", "attester_slashings")
for _, slashing := range blockBody.AttesterSlashings {
if err := ProcessAttesterSlashing(state, slashing); err != nil {
return fmt.Errorf("ProcessAttesterSlashing: %s", err)
}
}
c.PutSince()

// Process each attestations
c = h.Tag("operation", "attestations", "validation", "false")
if fullValidation {
c = h.Tag("operation", "attestations", "validation", "true")
}
if err := ProcessAttestations(state, blockBody.Attestations, fullValidation); err != nil {
return fmt.Errorf("ProcessAttestation: %s", err)
}
c.PutSince()

// Process each deposit
c = h.Tag("operation", "deposit")
for _, dep := range blockBody.Deposits {
if err := ProcessDeposit(state, dep, fullValidation); err != nil {
return fmt.Errorf("ProcessDeposit: %s", err)
}
}
c.PutSince()

// Process each voluntary exit.
c = h.Tag("operation", "voluntary_exit")
for _, exit := range blockBody.VoluntaryExits {
if err := ProcessVoluntaryExit(state, exit, fullValidation); err != nil {
return fmt.Errorf("ProcessVoluntaryExit: %s", err)
}
}
c.PutSince()

// Process each execution change. this will only have entries after the capella fork.
c = h.Tag("operation", "execution_change")
for _, addressChange := range blockBody.ExecutionChanges {
if err := ProcessBlsToExecutionChange(state, addressChange, fullValidation); err != nil {
return err
}
}
c.PutSince()
return nil
}

Expand Down
18 changes: 17 additions & 1 deletion cmd/erigon-cl/core/transition/process_attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,33 @@ import (
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state"
"github.com/ledgerwatch/erigon/metrics/methelp"
"golang.org/x/exp/slices"
)

func ProcessAttestations(s *state.BeaconState, attestations []*cltypes.Attestation, fullValidation bool) error {
var err error
attestingIndiciesSet := make([][]uint64, len(attestations))

h := methelp.NewHistTimer("beacon_process_attestations")
baseRewardPerIncrement := s.BaseRewardPerIncrement()

c := h.Tag("attestation_step", "process")
for i, attestation := range attestations {
if attestingIndiciesSet[i], err = processAttestation(s, attestation, baseRewardPerIncrement); err != nil {
return err
}
}
c.PutSince()
if fullValidation {
c = h.Tag("attestation_step", "validate")
valid, err := verifyAttestations(s, attestations, attestingIndiciesSet)
if err != nil {
return err
}
if !valid {
return errors.New("ProcessAttestation: wrong bls data")
}
c.PutSince()
}

return nil
Expand All @@ -41,19 +46,27 @@ func processAttestationPostAltair(s *state.BeaconState, attestation *cltypes.Att
stateSlot := s.Slot()
beaconConfig := s.BeaconConfig()

h := methelp.NewHistTimer("beacon_process_attestation_post_altair")

c := h.Tag("step", "get_participation_flag")
participationFlagsIndicies, err := s.GetAttestationParticipationFlagIndicies(attestation.Data, stateSlot-data.Slot)
if err != nil {
return nil, err
}
c.PutSince()

c = h.Tag("step", "get_attesting_indices")
attestingIndicies, err := s.GetAttestingIndicies(attestation.Data, attestation.AggregationBits, true)
if err != nil {
return nil, err
}
c.PutSince()

var proposerRewardNumerator uint64

isCurrentEpoch := data.Target.Epoch == currentEpoch

c = h.Tag("step", "update_attestation")
for _, attesterIndex := range attestingIndicies {
val, err := s.ValidatorEffectiveBalance(int(attesterIndex))
if err != nil {
Expand All @@ -70,11 +83,14 @@ func processAttestationPostAltair(s *state.BeaconState, attestation *cltypes.Att
proposerRewardNumerator += baseReward * weight
}
}
c.PutSince()
// Reward proposer
c = h.Tag("step", "get_proposer_index")
proposer, err := s.GetBeaconProposerIndex()
if err != nil {
return nil, err
}
c.PutSince()
proposerRewardDenominator := (beaconConfig.WeightDenominator - beaconConfig.ProposerWeight) * beaconConfig.WeightDenominator / beaconConfig.ProposerWeight
reward := proposerRewardNumerator / proposerRewardDenominator
return attestingIndicies, state.IncreaseBalance(s.BeaconState, proposer, reward)
Expand Down
Loading

0 comments on commit fd6acd4

Please sign in to comment.