Skip to content

Commit

Permalink
feat: add GetEigenStateChanges rpc (#218)
Browse files Browse the repository at this point in the history
closes #210
  • Loading branch information
seanmcgary authored Feb 5, 2025
2 parents 67e3f1a + ebda852 commit bcd7412
Show file tree
Hide file tree
Showing 26 changed files with 433 additions and 211 deletions.
2 changes: 1 addition & 1 deletion cmd/debugger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {

p := pipeline.NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l)
rps := proofs.NewRewardsProofsStore(rc, l)
pds := protocolDataService.NewProtocolDataService(grm, l, cfg)
pds := protocolDataService.NewProtocolDataService(sm, grm, l, cfg)
rds := rewardsDataService.NewRewardsDataService(grm, l, cfg, rc)

scc, err := sidecarClient.NewSidecarClient(cfg.SidecarPrimaryConfig.Url, !cfg.SidecarPrimaryConfig.Secure)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var rpcCmd = &cobra.Command{

rps := proofs.NewRewardsProofsStore(rc, l)

pds := protocolDataService.NewProtocolDataService(grm, l, cfg)
pds := protocolDataService.NewProtocolDataService(sm, grm, l, cfg)
rds := rewardsDataService.NewRewardsDataService(grm, l, cfg, rc)

go rcq.Process()
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ var runCmd = &cobra.Command{

rps := proofs.NewRewardsProofsStore(rc, l)

pds := protocolDataService.NewProtocolDataService(grm, l, cfg)
pds := protocolDataService.NewProtocolDataService(sm, grm, l, cfg)
rds := rewardsDataService.NewRewardsDataService(grm, l, cfg, rc)

go rcq.Process()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1
github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13
github.com/Layr-Labs/protocol-apis v1.3.0
github.com/Layr-Labs/protocol-apis v1.4.0
github.com/ethereum/go-ethereum v1.14.9
github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1
github.com/google/uuid v1.6.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 h1:Blb4AE+jC/vddV71w4/MQA
github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13/go.mod h1:PD/HoyzZjxDw1tAcZw3yD0yGddo+yhmwQAi+lk298r4=
github.com/Layr-Labs/protocol-apis v1.3.0 h1:T0HjCEYc3ez3FMx9EP8YiLzsszYn0ccfSlHtuqTu2SA=
github.com/Layr-Labs/protocol-apis v1.3.0/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v1.3.1-0.20250204210953-d34ee63f5f2d h1:B2ToEXNCekGol5Kcwk2zjzCN1TzvueRGikOJEPRDU7E=
github.com/Layr-Labs/protocol-apis v1.3.1-0.20250204210953-d34ee63f5f2d/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v1.3.1-0.20250204211907-5477dff7df6c h1:63+br77UB2y/zlvPII1c3QD2A0NxrOgfU8+6+oGzlL4=
github.com/Layr-Labs/protocol-apis v1.3.1-0.20250204211907-5477dff7df6c/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Layr-Labs/protocol-apis v1.4.0 h1:ZmdBHvT4HMdKtH3Dz9EJeQpInXHEUdHcs7lR+UqkD6g=
github.com/Layr-Labs/protocol-apis v1.4.0/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
Expand Down
2 changes: 2 additions & 0 deletions internal/metrics/metricsTypes/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var (
Metric_Incr_BlockProcessed = "blockProcessed"

Metric_Gauge_CurrentBlockHeight = "currentBlockHeight"

Metric_Gauge_LastDistributionRootBlockHeight = "lastDistributionRootBlockHeight"
)

var MetricTypes = map[MetricsType][]MetricsTypeConfig{
Expand Down
14 changes: 14 additions & 0 deletions pkg/eigenState/avsOperators/avsOperators.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,17 @@ func (a *AvsOperatorsModel) sortValuesForMerkleTree(deltas []*AvsOperatorStateCh
func (a *AvsOperatorsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return a.BaseEigenState.DeleteState("avs_operator_state_changes", startBlockNumber, endBlockNumber, a.DB)
}

func (a *AvsOperatorsModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
records := make([]*AvsOperatorStateChange, 0)
res := a.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&records)
if res.Error != nil {
a.logger.Sugar().Errorw("Failed to list records for block range",
zap.Error(res.Error),
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return nil, res.Error
}
return base.CastCommittedStateToInterface(records), nil
}
10 changes: 10 additions & 0 deletions pkg/eigenState/defaultOperatorSplits/defaultOperatorSplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,13 @@ func (dos *DefaultOperatorSplitModel) sortValuesForMerkleTree(splits []*DefaultO
func (dos *DefaultOperatorSplitModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return dos.BaseEigenState.DeleteState("default_operator_splits", startBlockNumber, endBlockNumber, dos.DB)
}

func (dos *DefaultOperatorSplitModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var splits []*DefaultOperatorSplit
res := dos.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&splits)
if res.Error != nil {
dos.logger.Sugar().Errorw("Failed to list records", zap.Error(res.Error))
return nil, res.Error
}
return base.CastCommittedStateToInterface(splits), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,17 @@ func (ddr *DisabledDistributionRootsModel) GetSubmittedRootsForBlock(blockNumber
}
return records, nil
}

func (ddr *DisabledDistributionRootsModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
records := make([]*types.DisabledDistributionRoot, 0)
res := ddr.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&records)
if res.Error != nil {
ddr.logger.Sugar().Errorw("Failed to list records for block range",
zap.Error(res.Error),
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return nil, res.Error
}
return base.CastCommittedStateToInterface(records), nil
}
10 changes: 10 additions & 0 deletions pkg/eigenState/operatorAVSSplits/operatorAVSSplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,13 @@ func (oas *OperatorAVSSplitModel) sortValuesForMerkleTree(splits []*OperatorAVSS
func (oas *OperatorAVSSplitModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return oas.BaseEigenState.DeleteState("operator_avs_splits", startBlockNumber, endBlockNumber, oas.DB)
}

func (oar *OperatorAVSSplitModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var splits []*OperatorAVSSplit
res := oar.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&splits)
if res.Error != nil {
oar.logger.Sugar().Errorw("Failed to list records", zap.Error(res.Error))
return nil, res.Error
}
return base.CastCommittedStateToInterface(splits), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,13 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) sortValuesForMerkleTree(subm
func (odrs *OperatorDirectedRewardSubmissionsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return odrs.BaseEigenState.DeleteState("operator_directed_reward_submissions", startBlockNumber, endBlockNumber, odrs.DB)
}

func (odrs *OperatorDirectedRewardSubmissionsModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var submissions []*OperatorDirectedRewardSubmission
res := odrs.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&submissions)
if res.Error != nil {
odrs.logger.Sugar().Errorw("Failed to list records", zap.Error(res.Error))
return nil, res.Error
}
return base.CastCommittedStateToInterface(submissions), nil
}
10 changes: 10 additions & 0 deletions pkg/eigenState/operatorPISplits/operatorPISplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,13 @@ func (ops *OperatorPISplitModel) sortValuesForMerkleTree(splits []*OperatorPISpl
func (ops *OperatorPISplitModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return ops.BaseEigenState.DeleteState("operator_pi_splits", startBlockNumber, endBlockNumber, ops.DB)
}

func (ops *OperatorPISplitModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var splits []*OperatorPISplit
res := ops.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&splits)
if res.Error != nil {
ops.logger.Sugar().Errorw("Failed to list records", zap.Error(res.Error))
return nil, res.Error
}
return base.CastCommittedStateToInterface(splits), nil
}
9 changes: 9 additions & 0 deletions pkg/eigenState/operatorShares/operatorShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,12 @@ func (osm *OperatorSharesModel) sortValuesForMerkleTree(diffs []*OperatorShareDe
func (osm *OperatorSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return osm.BaseEigenState.DeleteState("operator_share_deltas", startBlockNumber, endBlockNumber, osm.DB)
}

func (osm *OperatorSharesModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var deltas []*OperatorShareDeltas
res := osm.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&deltas)
if res.Error != nil {
return nil, res.Error
}
return base.CastCommittedStateToInterface(deltas), nil
}
10 changes: 10 additions & 0 deletions pkg/eigenState/rewardSubmissions/rewardSubmissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,13 @@ func (rs *RewardSubmissionsModel) sortValuesForMerkleTree(submissions []*RewardS
func (rs *RewardSubmissionsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return rs.BaseEigenState.DeleteState("reward_submissions", startBlockNumber, endBlockNumber, rs.DB)
}

func (rs *RewardSubmissionsModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var submissions []*RewardSubmission
res := rs.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&submissions)
if res.Error != nil {
rs.logger.Sugar().Errorw("Failed to list records", zap.Error(res.Error))
return nil, res.Error
}
return base.CastCommittedStateToInterface(submissions), nil
}
14 changes: 14 additions & 0 deletions pkg/eigenState/stakerDelegations/stakerDelegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,17 @@ func (s *StakerDelegationsModel) sortValuesForMerkleTree(deltas []*StakerDelegat
func (s *StakerDelegationsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return s.BaseEigenState.DeleteState("staker_delegation_changes", startBlockNumber, endBlockNumber, s.DB)
}

func (s *StakerDelegationsModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var deltas []*StakerDelegationChange
res := s.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&deltas)
if res.Error != nil {
s.logger.Sugar().Errorw("Failed to list deltas for block range",
zap.Error(res.Error),
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return nil, res.Error
}
return base.CastCommittedStateToInterface(deltas), nil
}
10 changes: 10 additions & 0 deletions pkg/eigenState/stakerShares/stakerShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,3 +621,13 @@ func (ss *StakerSharesModel) sortValuesForMerkleTree(diffs []*StakerShareDeltas)
func (ss *StakerSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return ss.BaseEigenState.DeleteState("staker_share_deltas", startBlockNumber, endBlockNumber, ss.DB)
}

func (ss *StakerSharesModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var deltas []*StakerShareDeltas
res := ss.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&deltas)
if res.Error != nil {
ss.logger.Sugar().Errorw("Failed to fetch staker share deltas", zap.Error(res.Error))
return nil, res.Error
}
return base.CastCommittedStateToInterface(deltas), nil
}
41 changes: 41 additions & 0 deletions pkg/eigenState/stateManager/stateManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Layr-Labs/sidecar/pkg/utils"
"github.com/ethereum/go-ethereum/common"
"slices"
"sync"
"time"

"github.com/Layr-Labs/sidecar/pkg/eigenState/types"
Expand Down Expand Up @@ -232,3 +233,43 @@ func (e *EigenStateManager) GetSubmittedDistributionRoots(blockNumber uint64) ([
}
return roots, nil
}

type EigenStateResult struct {
Results []interface{}
Error error
}

// ListForBlockRange lists all records for the block range, inclusive of start and end block numbers.
// Each model is processed concurrently in a goroutine
func (e *EigenStateManager) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) (map[string][]interface{}, error) {
channelMap := make(map[string]chan EigenStateResult)

var wg sync.WaitGroup
for _, index := range e.GetSortedModelIndexes() {
wg.Add(1)
ch := make(chan EigenStateResult, 1)
channelMap[e.StateModels[index].GetModelName()] = ch

go func(ch chan EigenStateResult) {
defer wg.Done()
state := e.StateModels[index]
res, err := state.ListForBlockRange(startBlockNumber, endBlockNumber)

ch <- EigenStateResult{
Results: res,
Error: err,
}
}(ch)
}
wg.Wait()
records := make(map[string][]interface{})
for name, ch := range channelMap {
close(ch)
result := <-ch
if result.Error != nil {
return nil, result.Error
}
records[name] = result.Results
}
return records, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,17 @@ func (sdr *SubmittedDistributionRootsModel) GetSubmittedRootsForBlock(blockNumbe
}
return records, nil
}

func (sdr *SubmittedDistributionRootsModel) ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error) {
var deltas []*types.SubmittedDistributionRoot
res := sdr.DB.Where("block_number >= ? AND block_number <= ?", startBlockNumber, endBlockNumber).Find(&deltas)
if res.Error != nil {
sdr.logger.Sugar().Errorw("Failed to list deltas for block range",
zap.Error(res.Error),
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return nil, res.Error
}
return base.CastCommittedStateToInterface(deltas), nil
}
3 changes: 3 additions & 0 deletions pkg/eigenState/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type IEigenStateModel interface {
// @param startBlockNumber the block number to start deleting state from (inclusive)
// @param endBlockNumber the block number to end deleting state from (inclusive). If 0, delete all state from startBlockNumber
DeleteState(startBlockNumber uint64, endBlockNumber uint64) error

// ListForBlockRange lists all records for the block range, inclusive of start and end block numbers
ListForBlockRange(startBlockNumber uint64, endBlockNumber uint64) ([]interface{}, error)
}

// StateTransitions
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ func (p *Pipeline) RunForFetchedBlock(ctx context.Context, block *fetcher.Fetche

rewardsTotalTimeMs := time.Since(rewardStartTime).Milliseconds()

_ = p.metricsSink.Gauge(metricsTypes.Metric_Gauge_LastDistributionRootBlockHeight, float64(blockNumber), nil)

// nolint:all
if strings.ToLower(root) != strings.ToLower(rs.Root) {
if !p.globalConfig.CanIgnoreIncorrectRewardsRoot(blockNumber) {
Expand Down
Loading

0 comments on commit bcd7412

Please sign in to comment.