Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operator state cache to IndexedChainState #983

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func setup(t *testing.T) {
indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()

ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
ics, err := coreindexer.NewIndexedChainState(chainState, indexer, 0)
if err != nil {
panic("failed to create a new indexed chain state")
}
Expand Down
39 changes: 37 additions & 2 deletions core/indexer/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,43 @@ package indexer

import (
"context"
"encoding/binary"
"errors"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/indexer"
lru "github.com/hashicorp/golang-lru/v2"
)

type IndexedChainState struct {
core.ChainState

Indexer indexer.Indexer
Indexer indexer.Indexer
operatorStateCache *lru.Cache[string, *core.IndexedOperatorState]
}

var _ core.IndexedChainState = (*IndexedChainState)(nil)

func NewIndexedChainState(
chainState core.ChainState,
indexer indexer.Indexer,
cacheSize int,
) (*IndexedChainState, error) {
var operatorStateCache *lru.Cache[string, *core.IndexedOperatorState]
var err error

if cacheSize > 0 {
operatorStateCache, err = lru.New[string, *core.IndexedOperatorState](cacheSize)
if err != nil {
return nil, err
}
}

return &IndexedChainState{
ChainState: chainState,
Indexer: indexer,

operatorStateCache: operatorStateCache,
}, nil
}

Expand All @@ -32,6 +47,13 @@ func (ics *IndexedChainState) Start(ctx context.Context) error {
}

func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) {
// Check if the indexed operator state has been cached
cacheKey := computeCacheKey(blockNumber, quorums)
if ics.operatorStateCache != nil {
if val, ok := ics.operatorStateCache.Get(cacheKey); ok {
return val, nil
}
}

pubkeys, sockets, err := ics.getObjects(blockNumber)
if err != nil {
Expand Down Expand Up @@ -73,11 +95,14 @@ func (ics *IndexedChainState) GetIndexedOperatorState(ctx context.Context, block
AggKeys: aggKeys,
}

if ics.operatorStateCache != nil {
ics.operatorStateCache.Add(cacheKey, state)
}

return state, nil
}

func (ics *IndexedChainState) GetIndexedOperators(ctx context.Context, blockNumber uint) (map[core.OperatorID]*core.IndexedOperatorInfo, error) {

pubkeys, sockets, err := ics.getObjects(blockNumber)
if err != nil {
return nil, err
Expand Down Expand Up @@ -138,3 +163,13 @@ func (ics *IndexedChainState) getObjects(blockNumber uint) (*OperatorPubKeys, Op
return pubkeys, sockets, nil

}

// Computes a cache key for the operator state cache. The cache key is a
// combination of the block number and the quorum IDs. Note: the order of the
// quorum IDs matters.
func computeCacheKey(blockNumber uint, quorumIDs []uint8) string {
bytes := make([]byte, 8+len(quorumIDs))
binary.LittleEndian.PutUint64(bytes, uint64(blockNumber))
copy(bytes[8:], quorumIDs)
return string(bytes)
}
82 changes: 82 additions & 0 deletions core/indexer/state_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package indexer_test

import (
"context"
"math/big"
"testing"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/indexer"
coremock "github.com/Layr-Labs/eigenda/core/mock"
indexermock "github.com/Layr-Labs/eigenda/indexer/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type testComponents struct {
ChainState *coremock.ChainDataMock
Indexer *indexermock.MockIndexer
IndexedChainState *indexer.IndexedChainState
}

func TestIndexedOperatorStateCache(t *testing.T) {
c := createTestComponents(t)
pubKeys := &indexer.OperatorPubKeys{}
c.Indexer.On("GetObject", mock.Anything, 0).Return(pubKeys, nil)
sockets := indexer.OperatorSockets{
core.OperatorID{0, 1}: "socket1",
}
c.Indexer.On("GetObject", mock.Anything, 1).Return(sockets, nil)

operatorState := &core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: {
core.OperatorID{0}: {
Stake: big.NewInt(100),
Index: 0,
},
},
},
}
c.ChainState.On("GetOperatorState", mock.Anything, uint(100), []core.QuorumID{0}).Return(operatorState, nil)
c.ChainState.On("GetOperatorState", mock.Anything, uint(100), []core.QuorumID{1}).Return(operatorState, nil)
c.ChainState.On("GetOperatorState", mock.Anything, uint(101), []core.QuorumID{0, 1}).Return(operatorState, nil)

ctx := context.Background()
// Get the operator state for block 100 and quorum 0
_, err := c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{0})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 1)

// Get the operator state for block 100 and quorum 0 again
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{0})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 1)

// Get the operator state for block 100 and quorum 1
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(100), []core.QuorumID{1})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 2)

// Get the operator state for block 101 and quorum 0 & 1
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(101), []core.QuorumID{0, 1})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 3)

// Get the operator state for block 101 and quorum 0 & 1 again
_, err = c.IndexedChainState.GetIndexedOperatorState(ctx, uint(101), []core.QuorumID{0, 1})
assert.NoError(t, err)
c.ChainState.AssertNumberOfCalls(t, "GetOperatorState", 3)
}

func createTestComponents(t *testing.T) *testComponents {
chainState := &coremock.ChainDataMock{}
idx := &indexermock.MockIndexer{}
ics, err := indexer.NewIndexedChainState(chainState, idx, 1)
assert.NoError(t, err)
return &testComponents{
ChainState: chainState,
Indexer: idx,
IndexedChainState: ics,
}
}
2 changes: 1 addition & 1 deletion core/indexer/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func mustMakeChainState(env *deploy.Config, store indexer.HeaderStore, logger lo
)
Expect(err).ToNot(HaveOccurred())

chainState, err := indexedstate.NewIndexedChainState(cs, indexer)
chainState, err := indexedstate.NewIndexedChainState(cs, indexer, 0)
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions core/mock/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
}

func (d *ChainDataMock) GetOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.OperatorState, error) {
_ = d.Called(ctx, blockNumber, quorums)
state := d.GetTotalOperatorStateWithQuorums(ctx, blockNumber, quorums)

return state.OperatorState, nil
Expand Down
27 changes: 18 additions & 9 deletions core/thegraph/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
)

const (
EndpointFlagName = "thegraph.endpoint"
BackoffFlagName = "thegraph.backoff"
MaxRetriesFlagName = "thegraph.max_retries"
EndpointFlagName = "thegraph.endpoint"
BackoffFlagName = "thegraph.backoff"
MaxRetriesFlagName = "thegraph.max_retries"
OperatorStateCacheSize = "thegraph.operator_state_cache_size"
)

type Config struct {
Endpoint string // The Graph endpoint
PullInterval time.Duration // The interval to pull data from The Graph
MaxRetries int // The maximum number of retries to pull data from The Graph
Endpoint string // The Graph endpoint
PullInterval time.Duration // The interval to pull data from The Graph
MaxRetries int // The maximum number of retries to pull data from The Graph
OperatorStateCacheSize int // The size of the cache
}

func CLIFlags(envPrefix string) []cli.Flag {
Expand All @@ -39,15 +41,22 @@ func CLIFlags(envPrefix string) []cli.Flag {
Value: 5,
EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_MAX_RETRIES"),
},
cli.IntFlag{
Name: OperatorStateCacheSize,
Usage: "The size of the operator state cache in elements (0 to disable)",
Value: 0,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should default be 0 or 32?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the estimated size bytes for 32 entries and how many instances of this struct is expected to be created? If it's small enough it should be fine to enable by default

Copy link
Contributor Author

@dmanc dmanc Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on the amount of operators but let's assume 200. The order of magnitude is ~1-5mb.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it seems this cache is not really useful:

Where does the cache help?

EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_OPERATOR_STATE_CACHE_SIZE"),
},
}
}

func ReadCLIConfig(ctx *cli.Context) Config {

return Config{
Endpoint: ctx.String(EndpointFlagName),
PullInterval: ctx.Duration(BackoffFlagName),
MaxRetries: ctx.Int(MaxRetriesFlagName),
Endpoint: ctx.String(EndpointFlagName),
PullInterval: ctx.Duration(BackoffFlagName),
MaxRetries: ctx.Int(MaxRetriesFlagName),
OperatorStateCacheSize: ctx.Int(OperatorStateCacheSize),
}

}
53 changes: 45 additions & 8 deletions core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package thegraph

import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/shurcooL/graphql"
)

Expand Down Expand Up @@ -79,29 +81,41 @@ type (
core.ChainState
querier GraphQLQuerier

logger logging.Logger
logger logging.Logger
operatorStateCache *lru.Cache[string, *core.IndexedOperatorState]
}
)

var _ IndexedChainState = (*indexedChainState)(nil)

func MakeIndexedChainState(config Config, cs core.ChainState, logger logging.Logger) *indexedChainState {

func MakeIndexedChainState(config Config, cs core.ChainState, logger logging.Logger) (*indexedChainState, error) {
logger.Info("Using graph node")
querier := graphql.NewClient(config.Endpoint, nil)

// RetryQuerier is a wrapper around the GraphQLQuerier that retries queries on failure
retryQuerier := NewRetryQuerier(querier, config.PullInterval, config.MaxRetries)

return NewIndexedChainState(cs, retryQuerier, logger)
return NewIndexedChainState(cs, retryQuerier, logger, config.OperatorStateCacheSize)
}

func NewIndexedChainState(cs core.ChainState, querier GraphQLQuerier, logger logging.Logger) *indexedChainState {
func NewIndexedChainState(cs core.ChainState, querier GraphQLQuerier, logger logging.Logger, cacheSize int) (*indexedChainState, error) {
var operatorStateCache *lru.Cache[string, *core.IndexedOperatorState]
var err error

if cacheSize > 0 {
operatorStateCache, err = lru.New[string, *core.IndexedOperatorState](cacheSize)
if err != nil {
return nil, err
}
}

return &indexedChainState{
ChainState: cs,
querier: querier,
logger: logger.With("component", "IndexedChainState"),
}

querier: querier,
logger: logger.With("component", "IndexedChainState"),
operatorStateCache: operatorStateCache,
}, nil
}

func (ics *indexedChainState) Start(ctx context.Context) error {
Expand All @@ -124,6 +138,14 @@ func (ics *indexedChainState) Start(ctx context.Context) error {
}

func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.IndexedOperatorState, error) {
// Check if the indexed operator state has been cached
cacheKey := computeCacheKey(blockNumber, quorums)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using cache then we need to assume that the blockNumber has been finalized already. I believe all users of this function would satisfy that assumption since they're passing in a reference block number.

if ics.operatorStateCache != nil {
if val, ok := ics.operatorStateCache.Get(cacheKey); ok {
return val, nil
}
}

operatorState, err := ics.ChainState.GetOperatorState(ctx, blockNumber, quorums)
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,6 +195,11 @@ func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, block
IndexedOperators: indexedOperators,
AggKeys: aggKeys,
}

if ics.operatorStateCache != nil {
ics.operatorStateCache.Add(cacheKey, state)
}

return state, nil
}

Expand Down Expand Up @@ -365,3 +392,13 @@ func convertIndexedOperatorInfoGqlToIndexedOperatorInfo(operator *IndexedOperato
Socket: string(operator.SocketUpdates[0].Socket),
}, nil
}

// Computes a cache key for the operator state cache. The cache key is a
// combination of the block number and the quorum IDs. Note: the order of the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is a problem but noticed that the cache key is dependent on the order you input the quorum ids.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a unordered set, so you may eliminate the ordering effect by sorting the them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good I'll add sorting.

// quorum IDs matters.
func computeCacheKey(blockNumber uint, quorumIDs []uint8) string {
bytes := make([]byte, 8+len(quorumIDs))
binary.LittleEndian.PutUint64(bytes, uint64(blockNumber))
copy(bytes[8:], quorumIDs)
return string(bytes)
}
3 changes: 2 additions & 1 deletion core/thegraph/state_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func TestIndexerIntegration(t *testing.T) {
tx, err := eth.NewWriter(logger, client, testConfig.EigenDA.OperatorStateRetreiver, testConfig.EigenDA.ServiceManager)
assert.NoError(t, err)

cs := thegraph.NewIndexedChainState(eth.NewChainState(tx, client), graphql.NewClient(graphUrl, nil), logger)
cs, err := thegraph.NewIndexedChainState(eth.NewChainState(tx, client), graphql.NewClient(graphUrl, nil), logger, 0)
assert.NoError(t, err)
time.Sleep(5 * time.Second)

err = cs.Start(context.Background())
Expand Down
Loading
Loading