diff --git a/cmd/debugger/main.go b/cmd/debugger/main.go index d0d534f6..0ac0e382 100644 --- a/cmd/debugger/main.go +++ b/cmd/debugger/main.go @@ -120,10 +120,13 @@ func main() { GenesisBlockNumber: cfg.GetGenesisBlockNumber(), }, cfg, mds, p, sm, msm, rc, rcq, rps, l, client) - rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ + rpc, err := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, HttpPort: cfg.RpcConfig.HttpPort, }, mds, rc, rcq, eb, rps, pds, rds, scc, l, cfg) + if err != nil { + l.Sugar().Fatalw("Failed to create rpc server", zap.Error(err)) + } // RPC channel to notify the RPC server to shutdown gracefully rpcChannel := make(chan bool) diff --git a/cmd/rpc.go b/cmd/rpc.go index bc11d54f..93b4481e 100644 --- a/cmd/rpc.go +++ b/cmd/rpc.go @@ -109,11 +109,15 @@ var rpcCmd = &cobra.Command{ l.Sugar().Fatalw("Failed to create sidecar client", zap.Error(err)) } - rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ + rpc, err := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, HttpPort: cfg.RpcConfig.HttpPort, }, mds, rc, rcq, eb, rps, pds, rds, scc, l, cfg) + if err != nil { + l.Sugar().Fatalw("Failed to create rpc server", zap.Error(err)) + } + // RPC channel to notify the RPC server to shutdown gracefully rpcChannel := make(chan bool) if err := rpc.Start(ctx, rpcChannel); err != nil { diff --git a/cmd/run.go b/cmd/run.go index 0e961906..51b65448 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -143,10 +143,13 @@ var runCmd = &cobra.Command{ GenesisBlockNumber: cfg.GetGenesisBlockNumber(), }, cfg, mds, p, sm, msm, rc, rcq, rps, l, client) - rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ + rpc, err := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, HttpPort: cfg.RpcConfig.HttpPort, }, mds, rc, rcq, eb, rps, pds, rds, scc, l, cfg) + if err != nil { + l.Sugar().Fatalw("Failed to create rpc server", zap.Error(err)) + } // RPC channel to notify the RPC server to shutdown gracefully rpcChannel := make(chan bool) diff --git a/examples/eventSubscriber/main.go b/examples/eventSubscriber/main.go index bdb4fe16..222d79e2 100644 --- a/examples/eventSubscriber/main.go +++ b/examples/eventSubscriber/main.go @@ -35,6 +35,7 @@ func streamIndexedBlocks(client v1.EventsClient) { panic(err) } + fmt.Printf("Client has connected to server\n") for { resp := &v1.StreamIndexedBlocksResponse{} err := stream.RecvMsg(resp) @@ -77,5 +78,5 @@ func main() { panic(err) } - streamStateChanges(client) + streamIndexedBlocks(client) } diff --git a/go.mod b/go.mod index abadf518..a05fad39 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.7.0 + github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213193904-02e6c9a33dbf github.com/ethereum/go-ethereum v1.14.13 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index bf677853..0abe6f8b 100644 --- a/go.sum +++ b/go.sum @@ -43,12 +43,12 @@ github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1/go.mod h1:Ie8YE3EQkTHqG6/tnUS0He7/UPMkXPo/3OFXwSy0iRo= github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 h1:Blb4AE+jC/vddV71w4/MQAPooM+8EVqv9w2bL4OytgY= github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13/go.mod h1:PD/HoyzZjxDw1tAcZw3yD0yGddo+yhmwQAi+lk298r4= -github.com/Layr-Labs/protocol-apis v1.6.0 h1:1TTi4+t8Kq4YKsji7xa3CPGYf4fHXZZbbpt3GYQWZ78= -github.com/Layr-Labs/protocol-apis v1.6.0/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ= -github.com/Layr-Labs/protocol-apis v1.6.1-0.20250212194100-39ec43be3227 h1:erE0SEuSNrYMf5Hq7cgvDdJTunN3KnkDJgwPngQpWbc= -github.com/Layr-Labs/protocol-apis v1.6.1-0.20250212194100-39ec43be3227/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ= github.com/Layr-Labs/protocol-apis v1.7.0 h1:BYzfF0mLjil3VAq8EEnswJbOL9v0OqfL32TipUryZ2s= github.com/Layr-Labs/protocol-apis v1.7.0/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ= +github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213185609-9608503ad4ed h1:dWv+54pLMoDWTPiVWatfFvyv5tLlkyuK01KBAvW4ieY= +github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213185609-9608503ad4ed/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ= +github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213193904-02e6c9a33dbf h1:0ajrteTXg+E6jrkd9bPJkr8LvUTCpzqXUq7fFVPMGL4= +github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213193904-02e6c9a33dbf/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/eigenState/avsOperators/avsOperators.go b/pkg/eigenState/avsOperators/avsOperators.go index e40d33e7..9adb54b4 100644 --- a/pkg/eigenState/avsOperators/avsOperators.go +++ b/pkg/eigenState/avsOperators/avsOperators.go @@ -17,12 +17,12 @@ import ( ) type AvsOperatorStateChange struct { - Avs string - Operator string - Registered bool - LogIndex uint64 - TransactionHash string - BlockNumber uint64 + Avs string `filter:"true"` + Operator string `filter:"true"` + Registered bool `filter:"true"` + LogIndex uint64 `filter:"true"` + TransactionHash string `filter:"true"` + BlockNumber uint64 `filter:"true"` } // EigenState model for AVS operators that implements IEigenStateModel. diff --git a/pkg/eigenState/operatorShares/operatorShares.go b/pkg/eigenState/operatorShares/operatorShares.go index b540b592..8a671bf2 100644 --- a/pkg/eigenState/operatorShares/operatorShares.go +++ b/pkg/eigenState/operatorShares/operatorShares.go @@ -21,15 +21,15 @@ import ( ) type OperatorShareDeltas struct { - Operator string - Staker string - Strategy string - Shares string - TransactionHash string - LogIndex uint64 - BlockNumber uint64 - BlockTime time.Time - BlockDate string + Operator string `filter:"true"` + Staker string `filter:"true"` + Strategy string `filter:"true"` + Shares string `filter:"true"` + TransactionHash string `filter:"true"` + LogIndex uint64 `filter:"true"` + BlockNumber uint64 `filter:"true"` + BlockTime time.Time `filter:"true"` + BlockDate string `filter:"true"` } func NewSlotID(operator string, strategy string, staker string, transactionHash string, logIndex uint64) types.SlotID { diff --git a/pkg/eigenState/rewardSubmissions/rewardSubmissions.go b/pkg/eigenState/rewardSubmissions/rewardSubmissions.go index db830e79..96280dd6 100644 --- a/pkg/eigenState/rewardSubmissions/rewardSubmissions.go +++ b/pkg/eigenState/rewardSubmissions/rewardSubmissions.go @@ -20,20 +20,20 @@ import ( ) type RewardSubmission struct { - Avs string - RewardHash string - Token string - Amount string - Strategy string - StrategyIndex uint64 - Multiplier string - StartTimestamp *time.Time - EndTimestamp *time.Time - Duration uint64 - BlockNumber uint64 - RewardType string // avs, all_stakers, all_earners - TransactionHash string - LogIndex uint64 + Avs string `filter:"true"` + RewardHash string `filter:"true"` + Token string `filter:"true"` + Amount string `filter:"true"` + Strategy string `filter:"true"` + StrategyIndex uint64 `filter:"true"` + Multiplier string `filter:"true"` + StartTimestamp *time.Time `filter:"true"` + EndTimestamp *time.Time `filter:"true"` + Duration uint64 `filter:"true"` + BlockNumber uint64 `filter:"true"` + RewardType string `filter:"true"` // avs, all_stakers, all_earners + TransactionHash string `filter:"true"` + LogIndex uint64 `filter:"true"` } func NewSlotID(transactionHash string, logIndex uint64, rewardHash string, strategyIndex uint64) types.SlotID { diff --git a/pkg/eigenState/stakerDelegations/stakerDelegations.go b/pkg/eigenState/stakerDelegations/stakerDelegations.go index 5984c864..f269d5c9 100644 --- a/pkg/eigenState/stakerDelegations/stakerDelegations.go +++ b/pkg/eigenState/stakerDelegations/stakerDelegations.go @@ -17,12 +17,12 @@ import ( ) type StakerDelegationChange struct { - Staker string - Operator string - BlockNumber uint64 - Delegated bool - LogIndex uint64 - TransactionHash string + Staker string `filter:"true"` + Operator string `filter:"true"` + BlockNumber uint64 `filter:"true"` + Delegated bool `filter:"true"` + LogIndex uint64 `filter:"true"` + TransactionHash string `filter:"true"` } type StakerDelegationsModel struct { diff --git a/pkg/eigenState/stakerShares/stakerShares.go b/pkg/eigenState/stakerShares/stakerShares.go index b539a7c4..757d79cb 100644 --- a/pkg/eigenState/stakerShares/stakerShares.go +++ b/pkg/eigenState/stakerShares/stakerShares.go @@ -25,16 +25,16 @@ import ( // Table staker_share_deltas type StakerShareDeltas struct { - Staker string - Strategy string - Shares string - StrategyIndex uint64 - TransactionHash string - LogIndex uint64 - BlockTime time.Time - BlockDate string - BlockNumber uint64 - WithdrawalRootString string `gorm:"-"` + Staker string `filter:"true"` + Strategy string `filter:"true"` + Shares string `filter:"true"` + StrategyIndex uint64 `filter:"true"` + TransactionHash string `filter:"true"` + LogIndex uint64 `filter:"true"` + BlockTime time.Time `filter:"true"` + BlockDate string `filter:"true"` + BlockNumber uint64 `filter:"true"` + WithdrawalRootString string `gorm:"-" filter:"true"` } func NewSlotID(transactionHash string, logIndex uint64, staker string, strategy string, strategyIndex uint64) types.SlotID { diff --git a/pkg/eigenState/types/tables.go b/pkg/eigenState/types/tables.go index d9bb30cc..d1613d62 100644 --- a/pkg/eigenState/types/tables.go +++ b/pkg/eigenState/types/tables.go @@ -5,16 +5,16 @@ import ( ) type SubmittedDistributionRoot struct { - Root string - BlockNumber uint64 - RootIndex uint64 - RewardsCalculationEnd time.Time - RewardsCalculationEndUnit string - ActivatedAt time.Time - ActivatedAtUnit string - CreatedAtBlockNumber uint64 - LogIndex uint64 - TransactionHash string + Root string `filter:"true"` + BlockNumber uint64 `filter:"true"` + RootIndex uint64 `filter:"true"` + RewardsCalculationEnd time.Time `filter:"true"` + RewardsCalculationEndUnit string `filter:"true"` + ActivatedAt time.Time `filter:"true"` + ActivatedAtUnit string `filter:"true"` + CreatedAtBlockNumber uint64 `filter:"true"` + LogIndex uint64 `filter:"true"` + TransactionHash string `filter:"true"` } func (sdr *SubmittedDistributionRoot) GetSnapshotDate() string { @@ -22,8 +22,8 @@ func (sdr *SubmittedDistributionRoot) GetSnapshotDate() string { } type DisabledDistributionRoot struct { - RootIndex uint64 - BlockNumber uint64 - LogIndex uint64 - TransactionHash string + RootIndex uint64 `filter:"true"` + BlockNumber uint64 `filter:"true"` + LogIndex uint64 `filter:"true"` + TransactionHash string `filter:"true"` } diff --git a/pkg/eventFilter/condition.go b/pkg/eventFilter/condition.go new file mode 100644 index 00000000..ec63cd6f --- /dev/null +++ b/pkg/eventFilter/condition.go @@ -0,0 +1,197 @@ +package eventFilter + +import ( + "encoding/json" + "fmt" +) + +// Condition represents a single filter condition +type Condition struct { + Field string `json:"-"` + Op Operator `json:"-"` + Value interface{} `json:"-"` + EntType EntityType `json:"-"` +} + +func (c *Condition) Type() string { + return "condition" +} + +func (c *Condition) ToFilterJSON() (FilterJSON, error) { + return FilterJSON{ + Type: c.Type(), + Field: c.Field, + Operator: c.Op.String(), + Value: c.Value, + EntType: c.EntType.String(), + }, nil +} + +func (c *Condition) MarshalJSON() ([]byte, error) { + fj, err := c.ToFilterJSON() + if err != nil { + return nil, err + } + return json.Marshal(fj) +} + +func (c *Condition) UnmarshalJSON(data []byte) error { + var f FilterJSON + if err := json.Unmarshal(data, &f); err != nil { + return err + } + + c.Field = f.Field + c.Value = f.Value + c.EntType = EntityType(f.EntType) + c.Op = ParseOperator(f.Operator) + + return nil +} + +func (c *Condition) Evaluate(entity interface{}, registry *FilterableRegistry) (bool, error) { + // Validate entity type + if err := registry.ValidateEntityType(c.EntType, entity); err != nil { + return false, err + } + + fieldValue, err := registry.GetFilterableField(entity, c.Field) + if err != nil { + return false, err + } + + return compare(fieldValue, c.Op, c.Value) +} + +// And combines multiple filters with AND logic +type And struct { + Filters []Filter +} + +func (a *And) ToFilterJSON() (FilterJSON, error) { + filters := make([]FilterJSON, len(a.Filters)) + for i, f := range a.Filters { + fj, err := f.ToFilterJSON() + if err != nil { + return FilterJSON{}, err + } + filters[i] = fj + } + + return FilterJSON{ + Type: a.Type(), + Filters: filters, + }, nil +} + +func (a *And) MarshalJSON() ([]byte, error) { + fj, err := a.ToFilterJSON() + if err != nil { + return nil, err + } + return json.Marshal(fj) +} + +func (a *And) UnmarshalJSON(data []byte) error { + var filterJSON FilterJSON + if err := json.Unmarshal(data, &filterJSON); err != nil { + return err + } + + filters := make([]Filter, len(filterJSON.Filters)) + for i, f := range filterJSON.Filters { + filter, err := ParseFilter(f) + if err != nil { + return err + } + filters[i] = filter + } + + a.Filters = filters + return nil +} + +func (a *And) Evaluate(entity interface{}, registry *FilterableRegistry) (bool, error) { + for _, filter := range a.Filters { + result, err := filter.Evaluate(entity, registry) + fmt.Printf("And eval: [] [] %+v\n", result) + if err != nil { + return false, err + } + if !result { + return false, nil + } + } + return true, nil +} + +func (a *And) Type() string { + return "and" +} + +// Or combines multiple filters with OR logic +type Or struct { + Filters []Filter +} + +func (o *Or) ToFilterJSON() (FilterJSON, error) { + filters := make([]FilterJSON, len(o.Filters)) + for i, f := range o.Filters { + fj, err := f.ToFilterJSON() + if err != nil { + return FilterJSON{}, err + } + filters[i] = fj + } + + return FilterJSON{ + Type: o.Type(), + Filters: filters, + }, nil +} + +func (o *Or) MarshalJSON() ([]byte, error) { + fj, err := o.ToFilterJSON() + if err != nil { + return nil, err + } + return json.Marshal(fj) +} + +func (o *Or) UnmarshalJSON(data []byte) error { + var filterJSON FilterJSON + if err := json.Unmarshal(data, &filterJSON); err != nil { + return err + } + + filters := make([]Filter, len(filterJSON.Filters)) + for i, f := range filterJSON.Filters { + filter, err := ParseFilter(f) + if err != nil { + return err + } + filters[i] = filter + } + + o.Filters = filters + return nil +} + +func (o *Or) Type() string { + return Condition_OR +} + +func (o *Or) Evaluate(entity interface{}, registry *FilterableRegistry) (bool, error) { + fmt.Printf("Or list length: %+v\n", len(o.Filters)) + for _, filter := range o.Filters { + result, err := filter.Evaluate(entity, registry) + fmt.Printf("Or eval: err: %+v - %+v\n", err, result) + if err != nil { + return false, err + } + if result { + return true, nil + } + } + return false, nil +} diff --git a/pkg/eventFilter/eventTypeRegistry/registry.go b/pkg/eventFilter/eventTypeRegistry/registry.go new file mode 100644 index 00000000..838ca907 --- /dev/null +++ b/pkg/eventFilter/eventTypeRegistry/registry.go @@ -0,0 +1,48 @@ +package eventTypeRegistry + +import ( + "github.com/Layr-Labs/sidecar/pkg/eigenState/avsOperators" + "github.com/Layr-Labs/sidecar/pkg/eigenState/operatorShares" + "github.com/Layr-Labs/sidecar/pkg/eigenState/rewardSubmissions" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stakerDelegations" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stakerShares" + "github.com/Layr-Labs/sidecar/pkg/eigenState/types" + "github.com/Layr-Labs/sidecar/pkg/eventFilter" + "github.com/Layr-Labs/sidecar/pkg/storage" +) + +func BuildFilterableEventRegistry() (*eventFilter.FilterableRegistry, error) { + reg := eventFilter.NewFilterableRegistry() + if err := reg.RegisterType(&storage.Block{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&storage.Transaction{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&storage.TransactionLog{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&avsOperators.AvsOperatorStateChange{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&types.DisabledDistributionRoot{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&operatorShares.OperatorShareDeltas{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&rewardSubmissions.RewardSubmission{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&stakerDelegations.StakerDelegationChange{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&stakerShares.StakerShareDeltas{}); err != nil { + return nil, err + } + if err := reg.RegisterType(&types.SubmittedDistributionRoot{}); err != nil { + return nil, err + } + + return reg, nil +} diff --git a/pkg/eventFilter/filter.go b/pkg/eventFilter/filter.go new file mode 100644 index 00000000..dd02c0b3 --- /dev/null +++ b/pkg/eventFilter/filter.go @@ -0,0 +1,286 @@ +package eventFilter + +import ( + "fmt" + "reflect" + "strings" +) + +const ( + Condition_OR = "or" + Condition_AND = "and" + Condition_Condition = "condition" +) + +type Filterable interface { + GetFilterableField(field string) (interface{}, error) + GetFilterableFields() []FilterableField +} + +type FilterableField struct { + Name string + Type reflect.Type + Value func(interface{}) (interface{}, error) +} + +// Operator represents comparison operations +type Operator int + +const ( + Equals Operator = iota + NotEquals + GreaterThan + LessThan + GreaterEqual + LessEqual + Contains + NotContains +) + +func (o Operator) String() string { + switch o { + case Equals: + return "eq" + case NotEquals: + return "ne" + case GreaterThan: + return "gt" + case LessThan: + return "lt" + case GreaterEqual: + return "gte" + case LessEqual: + return "lte" + case Contains: + return "contains" + case NotContains: + return "notContains" + default: + return "unknown" + } +} + +func ParseOperator(s string) Operator { + switch strings.ToLower(s) { + case "eq": + return Equals + case "ne": + return NotEquals + case "gt": + return GreaterThan + case "lt": + return LessThan + case "gte": + return GreaterEqual + case "lte": + return LessEqual + case "contains": + return Contains + case "notcontains": + return NotContains + default: + return Equals // default to Equals, could also return error + } +} + +// Filter is the interface that all filter types must implement +type Filter interface { + Evaluate(entity interface{}, registry *FilterableRegistry) (bool, error) + Type() string + ToFilterJSON() (FilterJSON, error) +} + +// FilterJSON represents the JSON structure for any filter type +type FilterJSON struct { + Type string `json:"type"` + Field string `json:"field,omitempty"` + Operator string `json:"operator,omitempty"` + Value interface{} `json:"value,omitempty"` + Filters []FilterJSON `json:"filters,omitempty"` + EntType string `json:"entityType,omitempty"` +} + +func compare(fieldValue interface{}, op Operator, filterValue interface{}) (bool, error) { + switch op { + case Equals: + return equals(fieldValue, filterValue) + case NotEquals: + eq, err := equals(fieldValue, filterValue) + return !eq, err + case GreaterThan: + return compareOrdered(fieldValue, filterValue, true, false) + case LessThan: + return compareOrdered(fieldValue, filterValue, false, false) + case GreaterEqual: + return compareOrdered(fieldValue, filterValue, true, true) + case LessEqual: + return compareOrdered(fieldValue, filterValue, false, true) + case Contains: + return contains(fieldValue, filterValue) + case NotContains: + c, err := contains(fieldValue, filterValue) + return !c, err + default: + return false, fmt.Errorf("unsupported operator: %v", op) + } +} + +func equals(a, b interface{}) (bool, error) { + switch v := a.(type) { + case uint64: + bVal, ok := b.(uint64) + if !ok { + return false, fmt.Errorf("type mismatch: %T != %T", a, b) + } + return v == bVal, nil + case string: + bVal, ok := b.(string) + if !ok { + return false, fmt.Errorf("type mismatch: %T != %T", a, b) + } + return v == bVal, nil + default: + return false, fmt.Errorf("unsupported type for equals comparison: %T", a) + } +} + +func compareOrdered(fieldValue, filterValue interface{}, greater, equal bool) (bool, error) { + va := reflect.ValueOf(fieldValue) + + if va.String() == "string" { + bVal, ok := filterValue.(string) + if !ok { + return false, fmt.Errorf("type mismatch: %T != %T", fieldValue, filterValue) + } + comp := strings.Compare(fieldValue.(string), bVal) + if equal && comp == 0 { + return true, nil + } + if greater { + return comp > 0, nil + } + return comp < 0, nil + } + + if isNumeric(va) { + res, err := compareNumerics(fieldValue, filterValue) + + // if fieldValue >= filterValue + if greater && equal { + return res >= 0, err + } + + // if fieldValue > fiterValue + if greater && !equal { + return res > 0, err + } + + // if fieldValue < filterValue + if !greater && !equal { + return res < 0, err + } + + if !greater && equal { + return res <= 0, err + } + // if fieldValue == filterValue + return res == 0, err + } + + return false, fmt.Errorf("unsupported type for ordered comparison: %T", fieldValue) +} + +func contains(a, b interface{}) (bool, error) { + str, ok := a.(string) + if !ok { + return false, fmt.Errorf("contains operator requires string field, got: %T", a) + } + + pattern, ok := b.(string) + if !ok { + return false, fmt.Errorf("contains operator requires string pattern, got: %T", b) + } + + return strings.Contains(str, pattern), nil +} + +// filterToJSON converts a Filter to its JSON representation +// +//nolint:unused +func filterToJSON(f Filter) (FilterJSON, error) { + switch v := f.(type) { + case *Condition: + return FilterJSON{ + Type: v.Type(), + Field: v.Field, + Operator: v.Op.String(), + Value: v.Value, + EntType: v.EntType.String(), + }, nil + case *And: + filters := make([]FilterJSON, len(v.Filters)) + for i, f := range v.Filters { + var err error + filter, err := filterToJSON(f) + if err != nil { + return FilterJSON{}, err + } + filters[i] = filter + } + return FilterJSON{ + Type: v.Type(), + Filters: filters, + }, nil + case *Or: + filters := make([]FilterJSON, len(v.Filters)) + for i, f := range v.Filters { + var err error + filter, err := filterToJSON(f) + if err != nil { + return FilterJSON{}, err + } + filters[i] = filter + } + return FilterJSON{ + Type: v.Type(), + Filters: filters, + }, nil + default: + return FilterJSON{}, fmt.Errorf("unknown filter type: %T", f) + } +} + +// ParseFilter creates a Filter from a FilterJSON structure +func ParseFilter(f FilterJSON) (Filter, error) { + switch f.Type { + case Condition_Condition: + return &Condition{ + Field: f.Field, + Op: ParseOperator(f.Operator), + Value: f.Value, + EntType: EntityType(f.EntType), + }, nil + case Condition_AND: + filters := make([]Filter, len(f.Filters)) + for i, filter := range f.Filters { + var err error + filters[i], err = ParseFilter(filter) + if err != nil { + return nil, err + } + } + return &And{Filters: filters}, nil + case Condition_OR: + filters := make([]Filter, len(f.Filters)) + for i, filter := range f.Filters { + var err error + filters[i], err = ParseFilter(filter) + if err != nil { + return nil, err + } + } + return &Or{Filters: filters}, nil + default: + return nil, fmt.Errorf("unknown filter type: %s", f.Type) + } +} diff --git a/pkg/eventFilter/filterBuilder.go b/pkg/eventFilter/filterBuilder.go new file mode 100644 index 00000000..cfb9a986 --- /dev/null +++ b/pkg/eventFilter/filterBuilder.go @@ -0,0 +1,40 @@ +package eventFilter + +type FilterBuilder struct { + entType EntityType +} + +func NewFilterBuilder(entType EntityType) *FilterBuilder { + return &FilterBuilder{ + entType: entType, + } +} + +func (b *FilterBuilder) Condition(field string, op Operator, value interface{}) Filter { + return &Condition{ + Field: field, + Op: op, + Value: value, + EntType: b.entType, + } +} + +func (b *FilterBuilder) And(conditions ...Filter) Filter { + if len(conditions) == 0 { + return nil + } + if len(conditions) == 1 { + return conditions[0] + } + return &And{Filters: conditions} +} + +func (b *FilterBuilder) Or(conditions ...Filter) Filter { + if len(conditions) == 0 { + return nil + } + if len(conditions) == 1 { + return conditions[0] + } + return &Or{Filters: conditions} +} diff --git a/pkg/eventFilter/filter_test.go b/pkg/eventFilter/filter_test.go new file mode 100644 index 00000000..6fe9cb55 --- /dev/null +++ b/pkg/eventFilter/filter_test.go @@ -0,0 +1,430 @@ +package eventFilter + +import ( + "encoding/json" + "fmt" + "github.com/Layr-Labs/sidecar/pkg/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +// Example of adding a new filterable type +type CustomEvent struct { + ID string `filter:"true"` + Count uint64 `filter:"true"` + Metadata string // Not filterable +} + +func createRegistry() *FilterableRegistry { + reg := NewFilterableRegistry() + _ = reg.RegisterType(&storage.Transaction{}) + _ = reg.RegisterType(&storage.Block{}) + _ = reg.RegisterType(&storage.TransactionLog{}) + _ = reg.RegisterType(&CustomEvent{}) + return reg +} + +func TestConditionEvaluation(t *testing.T) { + reg := createRegistry() + + tests := []struct { + name string + filter *Condition + entity interface{} + expected bool + hasError bool + }{ + { + name: "equals_match", + filter: &Condition{ + Field: "Number", + Op: Equals, + Value: uint64(1000), + EntType: "Block", + }, + entity: &storage.Block{ + Number: 1000, + Hash: "0x123", + }, + expected: true, + hasError: false, + }, + { + name: "equals_no_match", + filter: &Condition{ + Field: "Number", + Op: Equals, + Value: uint64(1000), + EntType: "Block", + }, + entity: &storage.Block{ + Number: 1001, + Hash: "0x123", + }, + expected: false, + hasError: false, + }, + { + name: "greater_than_match", + filter: &Condition{ + Field: "Number", + Op: GreaterThan, + Value: uint64(1000), + EntType: "Block", + }, + entity: &storage.Block{ + Number: 1001, + Hash: "0x123", + }, + expected: true, + hasError: false, + }, + { + name: "invalid_entity_type", + filter: &Condition{ + Field: "Number", + Op: Equals, + Value: uint64(1000), + EntType: "Block", + }, + entity: &storage.Transaction{}, // Wrong type + expected: false, + hasError: true, + }, + { + name: "invalid_field", + filter: &Condition{ + Field: "NonexistentField", + Op: Equals, + Value: uint64(1000), + EntType: "Block", + }, + entity: &storage.Block{ + Number: 1000, + Hash: "0x123", + }, + expected: false, + hasError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := tt.filter.Evaluate(tt.entity, reg) + if tt.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +} + +func TestLogicalOperations(t *testing.T) { + reg := createRegistry() + + block := &storage.Block{ + Number: 1000, + Hash: "0x123", + } + + tests := []struct { + name string + filter Filter + expected bool + hasError bool + }{ + { + name: "and_both_true", + filter: &And{ + Filters: []Filter{ + &Condition{ + Field: "Number", + Op: Equals, + Value: uint64(1000), + EntType: "Block", + }, + &Condition{ + Field: "Hash", + Op: Equals, + Value: "0x123", + EntType: "Block", + }, + }, + }, + expected: true, + hasError: false, + }, + { + name: "and_one_false", + filter: &And{ + Filters: []Filter{ + &Condition{ + Field: "Number", + Op: Equals, + Value: uint64(1000), + EntType: "Block", + }, + &Condition{ + Field: "Hash", + Op: Equals, + Value: "0x456", + EntType: "Block", + }, + }, + }, + expected: false, + hasError: false, + }, + { + name: "or_one_true", + filter: &Or{ + Filters: []Filter{ + &Condition{ + Field: "Number", + Op: Equals, + Value: uint64(1000), + EntType: "Block", + }, + &Condition{ + Field: "Hash", + Op: Equals, + Value: "0x456", + EntType: "Block", + }, + }, + }, + expected: true, + hasError: false, + }, + { + name: "or_all_false", + filter: &Or{ + Filters: []Filter{ + &Condition{ + Field: "Number", + Op: Equals, + Value: uint64(999), + EntType: "Block", + }, + &Condition{ + Field: "Hash", + Op: Equals, + Value: "0x456", + EntType: "Block", + }, + }, + }, + expected: false, + hasError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := tt.filter.Evaluate(block, reg) + if tt.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +} + +func TestJSONSerialization(t *testing.T) { + reg := createRegistry() + + original := &And{ + Filters: []Filter{ + &Condition{ + Field: "BlockNumber", + Op: GreaterThan, + Value: 1000, + EntType: "Transaction", + }, + &Or{ + Filters: []Filter{ + &Condition{ + Field: "FromAddress", + Op: Equals, + Value: "0x123", + EntType: "Transaction", + }, + &Condition{ + Field: "ToAddress", + Op: Equals, + Value: "0x456", + EntType: "Transaction", + }, + }, + }, + }, + } + + // Marshal to JSON + jsonData, err := json.MarshalIndent(original, "", "\t") + require.NoError(t, err) + + // Unmarshal back to filter + var reconstructed And + err = json.Unmarshal(jsonData, &reconstructed) + require.NoError(t, err) + + // Test both filters against the same transaction + tx := &storage.Transaction{ + BlockNumber: 1500, + FromAddress: "0x123", + } + + originalResult, err := original.Evaluate(tx, reg) + require.NoError(t, err) + + reconstructedResult, err := reconstructed.Evaluate(tx, reg) + require.NoError(t, err) + + assert.Equal(t, originalResult, reconstructedResult) +} + +func TestFilterBuilder(t *testing.T) { + reg := createRegistry() + + tests := []struct { + name string + builder func() Filter + tx *storage.Transaction + expected bool + hasError bool + }{ + { + name: "simple_condition", + builder: func() Filter { + builder := NewFilterBuilder("Transaction") + return builder.Condition("BlockNumber", GreaterThan, uint64(1000)) + }, + tx: &storage.Transaction{ + BlockNumber: 1500, + }, + expected: true, + hasError: false, + }, + { + name: "and_conditions", + builder: func() Filter { + builder := NewFilterBuilder("Transaction") + return builder.And( + builder.Condition("BlockNumber", GreaterThan, uint64(1000)), + builder.Condition("FromAddress", Equals, "0x123"), + ) + }, + tx: &storage.Transaction{ + BlockNumber: 1500, + FromAddress: "0x123", + }, + expected: true, + hasError: false, + }, + { + name: "with_group", + builder: func() Filter { + builder := NewFilterBuilder("Transaction") + return builder.And( + builder.Condition("BlockNumber", GreaterThan, uint64(1000)), + builder.Or( + builder.Condition("FromAddress", Equals, "0x123"), + builder.Condition("ToAddress", Equals, "0x456"), + ), + ) + }, + tx: &storage.Transaction{ + BlockNumber: 1500, + FromAddress: "0x123", + }, + expected: true, + hasError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter := tt.builder() + result, err := filter.Evaluate(tt.tx, reg) + if tt.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + filterJson, _ := json.MarshalIndent(filter, "", "\t") + fmt.Printf("Filter: %v\n", string(filterJson)) + fmt.Printf("Result: %v\n", result) + assert.Equal(t, tt.expected, result, tt.name) + } + }) + } +} + +func TestOperatorString(t *testing.T) { + tests := []struct { + operator Operator + expected string + }{ + {Equals, "eq"}, + {NotEquals, "ne"}, + {GreaterThan, "gt"}, + {LessThan, "lt"}, + {GreaterEqual, "gte"}, + {LessEqual, "lte"}, + {Contains, "contains"}, + {NotContains, "notContains"}, + {Operator(999), "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.operator.String()) + }) + } +} + +func TestParseOperator(t *testing.T) { + tests := []struct { + input string + expected Operator + }{ + {"eq", Equals}, + {"ne", NotEquals}, + {"gt", GreaterThan}, + {"lt", LessThan}, + {"gte", GreaterEqual}, + {"lte", LessEqual}, + {"contains", Contains}, + {"notContains", NotContains}, + {"invalid", Equals}, // Default case + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + assert.Equal(t, tt.expected, ParseOperator(tt.input)) + }) + } +} + +func TestEntityTypeString(t *testing.T) { + tests := []struct { + entityType EntityType + expected string + }{ + {"Block", "Block"}, + {"Transaction", "Transaction"}, + {"TransactionLog", "TransactionLog"}, + {EntityType("Foo"), "Foo"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.entityType.String()) + }) + } +} diff --git a/pkg/eventFilter/filterableTypeRegistry.go b/pkg/eventFilter/filterableTypeRegistry.go new file mode 100644 index 00000000..dcf3bad0 --- /dev/null +++ b/pkg/eventFilter/filterableTypeRegistry.go @@ -0,0 +1,106 @@ +package eventFilter + +import ( + "fmt" + "reflect" + "strings" +) + +type EntityType string + +func (e EntityType) String() string { + return string(e) +} + +func ParseEntityType(s string) EntityType { + return EntityType(strings.ToLower(s)) +} + +type FilterableRegistry struct { + // Use reflect.Type as key since it uniquely identifies a type + fields map[reflect.Type][]FilterableField +} + +// NewFilterableRegistry creates a new registry +func NewFilterableRegistry() *FilterableRegistry { + return &FilterableRegistry{ + fields: make(map[reflect.Type][]FilterableField), + } +} + +// RegisterType analyzes a type and registers its filterable fields +func (r *FilterableRegistry) RegisterType(example interface{}) error { + t := reflect.TypeOf(example) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + var fields []FilterableField + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + if field.Tag.Get("filter") == "true" { + idx := i + fields = append(fields, FilterableField{ + Name: field.Name, + Type: field.Type, + Value: func(e interface{}) (interface{}, error) { + v := reflect.ValueOf(e) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + return v.Field(idx).Interface(), nil + }, + }) + } + } + + r.fields[t] = fields + return nil +} + +func (r *FilterableRegistry) ValidateEntityType(entityType EntityType, entity interface{}) error { + actualType := reflect.TypeOf(entity) + if actualType.Kind() == reflect.Ptr { + actualType = actualType.Elem() + } + + if string(entityType) != actualType.Name() { + return fmt.Errorf("expected %s type, got %s", entityType, actualType.Name()) + } + + // Also verify the type is registered + if _, ok := r.fields[actualType]; !ok { + return fmt.Errorf("type %s is not registered", actualType.Name()) + } + + return nil +} + +// GetFilterableFields returns the filterable fields for a given type +func (r *FilterableRegistry) GetFilterableFields(entity interface{}) ([]FilterableField, error) { + t := reflect.TypeOf(entity) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + fields, ok := r.fields[t] + if !ok { + return nil, fmt.Errorf("type %v not registered", t) + } + return fields, nil +} + +// GetFilterableField returns a specific field value +func (r *FilterableRegistry) GetFilterableField(entity interface{}, fieldName string) (interface{}, error) { + fields, err := r.GetFilterableFields(entity) + if err != nil { + return nil, err + } + + for _, field := range fields { + if field.Name == fieldName { + return field.Value(entity) + } + } + return nil, fmt.Errorf("field %s not found", fieldName) +} diff --git a/pkg/eventFilter/numbers.go b/pkg/eventFilter/numbers.go new file mode 100644 index 00000000..c2e85a40 --- /dev/null +++ b/pkg/eventFilter/numbers.go @@ -0,0 +1,52 @@ +package eventFilter + +import ( + "fmt" + "math/big" + "reflect" +) + +// compareNumerics compares two numeric values and returns -1, 0, or 1 if a < b, a == b, or a > b respectively. +// Rather than trying to deal with upcasting or downcasting numbers, we convert both to big.Float and compare those +// since big.Float can handle arbitrary precision. +func compareNumerics(a, b interface{}) (int, error) { + // Convert both to big.Float + fa := new(big.Float) + fb := new(big.Float) + + // Set values from interface{} + switch v := a.(type) { + case int, int8, int16, int32, int64: + fa.SetInt64(reflect.ValueOf(v).Int()) + case uint, uint8, uint16, uint32, uint64: + fa.SetUint64(reflect.ValueOf(v).Uint()) + case float32, float64: + fa.SetFloat64(reflect.ValueOf(v).Float()) + default: + return 0, fmt.Errorf("unsupported type for a: %T", a) + } + + switch v := b.(type) { + case int, int8, int16, int32, int64: + fb.SetInt64(reflect.ValueOf(v).Int()) + case uint, uint8, uint16, uint32, uint64: + fb.SetUint64(reflect.ValueOf(v).Uint()) + case float32, float64: + fb.SetFloat64(reflect.ValueOf(v).Float()) + default: + return 0, fmt.Errorf("unsupported type for b: %T", b) + } + + return fa.Cmp(fb), nil +} + +func isNumeric(v reflect.Value) bool { + switch v.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, + reflect.Float32, reflect.Float64: + return true + default: + return false + } +} diff --git a/pkg/rpcServer/eventHandlers.go b/pkg/rpcServer/eventHandlers.go index 5c0bf588..707ef6fe 100644 --- a/pkg/rpcServer/eventHandlers.go +++ b/pkg/rpcServer/eventHandlers.go @@ -2,11 +2,14 @@ package rpcServer import ( "context" + "encoding/json" + "fmt" v1EigenState "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/eigenState" v1EthereumTypes "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/ethereumTypes" v1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/events" "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" "github.com/Layr-Labs/sidecar/pkg/eventBus/eventBusTypes" + "github.com/Layr-Labs/sidecar/pkg/eventFilter" "github.com/Layr-Labs/sidecar/pkg/storage" "github.com/google/uuid" "go.uber.org/zap" @@ -15,7 +18,11 @@ import ( "io" ) -func (rpc *RpcServer) subscribeToBlocks(ctx context.Context, requestId string, handleBlock func(interface{}) error) error { +func (rpc *RpcServer) subscribeToBlocks( + ctx context.Context, + requestId string, + handleBlock func(interface{}) error, +) error { consumer := &eventBusTypes.Consumer{ Id: eventBusTypes.ConsumerId(requestId), Context: ctx, @@ -39,6 +46,22 @@ func (rpc *RpcServer) subscribeToBlocks(ctx context.Context, requestId string, h } } +func filterEigenStateChanges(changes map[string][]interface{}, filter *eventFilter.And, filterRegistry *eventFilter.FilterableRegistry) (map[string][]interface{}, error) { + for modelName, modelChanges := range changes { + for i, change := range modelChanges { + match, err := filter.Evaluate(change, filterRegistry) + if err != nil { + return nil, err + } + if !match { + // remove the item from the list + changes[modelName] = append(changes[modelName][:i], changes[modelName][i+1:]...) + } + } + } + return changes, nil +} + func (rpc *RpcServer) StreamEigenStateChanges(request *v1.StreamEigenStateChangesRequest, g grpc.ServerStreamingServer[v1.StreamEigenStateChangesResponse]) error { // Since this rpc sidecar is not processing blocks, we need to connect to the primary sidecar to get the events if !rpc.globalConfig.SidecarPrimaryConfig.IsPrimary { @@ -68,8 +91,27 @@ func (rpc *RpcServer) StreamEigenStateChanges(request *v1.StreamEigenStateChange return err } + var filter *eventFilter.And + filterString := request.GetStateChangeFilter() + if filterString != "" { + err := json.Unmarshal([]byte(filterString), &filter) + if err != nil { + rpc.Logger.Sugar().Errorw("Failed to unmarshal filter", + zap.Error(err), + ) + return err + } + } + err = rpc.subscribeToBlocks(g.Context(), requestId.String(), func(data interface{}) error { blockProcessedData := data.(*eventBusTypes.BlockProcessedData) + + if filter != nil { + blockProcessedData.CommittedState, err = filterEigenStateChanges(blockProcessedData.CommittedState, filter, rpc.filterRegistry) + if err != nil { + return err + } + } changes, err := rpc.parseCommittedChanges(blockProcessedData.CommittedState) if err != nil { return err @@ -110,22 +152,62 @@ func (rpc *RpcServer) StreamIndexedBlocks(request *v1.StreamIndexedBlocksRequest rpc.Logger.Error("Failed to generate request ID", zap.Error(err)) return err } + onlyBlocksWithData := request.GetOnlyBlocksWithData() + + var stateChangesFilter *eventFilter.And + var blockFilter *eventFilter.And + + filters := request.GetFilters() + if filters != nil { + stateChangeFilterStr := filters.GetStateChangeFilter() + if stateChangeFilterStr != "" { + err := json.Unmarshal([]byte(stateChangeFilterStr), &stateChangesFilter) + if err != nil { + rpc.Logger.Sugar().Errorw("Failed to unmarshal state changes filter", + zap.Error(err), + ) + return err + } + } + blockFilterStr := filters.GetBlockFilter() + if blockFilterStr != "" { + err := json.Unmarshal([]byte(blockFilterStr), &blockFilter) + if err != nil { + rpc.Logger.Sugar().Errorw("Failed to unmarshal block filter", + zap.Error(err), + ) + return err + } + } + } err = rpc.subscribeToBlocks(g.Context(), requestId.String(), func(data interface{}) error { rpc.Logger.Debug("Received block", zap.Any("data", data)) blockProcessedData := data.(*eventBusTypes.BlockProcessedData) - resp, err := rpc.buildBlockResponse(blockProcessedData, request.IncludeStateChanges) - if err != nil { - return err - } + if (onlyBlocksWithData && processedBlockHasData(blockProcessedData)) || !onlyBlocksWithData { + if stateChangesFilter != nil { + blockProcessedData.CommittedState, err = filterEigenStateChanges(blockProcessedData.CommittedState, stateChangesFilter, rpc.filterRegistry) + if err != nil { + return err + } + } + fmt.Printf("blockProcessedData.Block.Number: %v\n", blockProcessedData.Block.Number) - return g.SendMsg(resp) + resp, err := rpc.buildBlockResponse(blockProcessedData, request.GetIncludeStateChanges()) + if err != nil { + return err + } + + return g.SendMsg(resp) + } + return nil }) - if err != nil { - return err - } - return nil + return err +} + +func processedBlockHasData(block *eventBusTypes.BlockProcessedData) bool { + return len(block.Transactions) > 0 || len(block.Logs) > 0 || len(block.CommittedState) > 0 } func convertTransactionLogToEventTypeTransaction(log *storage.TransactionLog) *v1EthereumTypes.TransactionLog { diff --git a/pkg/rpcServer/server.go b/pkg/rpcServer/server.go index a98c9090..44b14e5d 100644 --- a/pkg/rpcServer/server.go +++ b/pkg/rpcServer/server.go @@ -13,6 +13,8 @@ import ( "github.com/Layr-Labs/sidecar/internal/logger" sidecarClient "github.com/Layr-Labs/sidecar/pkg/clients/sidecar" "github.com/Layr-Labs/sidecar/pkg/eventBus/eventBusTypes" + "github.com/Layr-Labs/sidecar/pkg/eventFilter" + "github.com/Layr-Labs/sidecar/pkg/eventFilter/eventTypeRegistry" "github.com/Layr-Labs/sidecar/pkg/proofs" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" @@ -48,6 +50,7 @@ type RpcServer struct { rewardsDataService *rewardsDataService.RewardsDataService globalConfig *config.Config sidecarClient *sidecarClient.SidecarClient + filterRegistry *eventFilter.FilterableRegistry } func NewRpcServer( @@ -62,7 +65,7 @@ func NewRpcServer( scc *sidecarClient.SidecarClient, l *zap.Logger, cfg *config.Config, -) *RpcServer { +) (*RpcServer, error) { server := &RpcServer{ rpcConfig: config, blockStore: bs, @@ -77,7 +80,14 @@ func NewRpcServer( sidecarClient: scc, } - return server + reg, err := eventTypeRegistry.BuildFilterableEventRegistry() + if err != nil { + l.Sugar().Errorw("Failed to build filterable event registry", zap.Error(err)) + return nil, err + } + server.filterRegistry = reg + + return server, nil } func (s *RpcServer) registerHandlers(ctx context.Context, grpcServer *grpc.Server, mux *runtime.ServeMux) error { diff --git a/pkg/sidecar/blockIndexer.go b/pkg/sidecar/blockIndexer.go index 5d58bc4f..0a33abf1 100644 --- a/pkg/sidecar/blockIndexer.go +++ b/pkg/sidecar/blockIndexer.go @@ -289,6 +289,7 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { if batchEndBlock > int64(tip) { batchEndBlock = int64(tip) } + if err := s.Pipeline.RunForBlockBatch(ctx, uint64(currentBlock), uint64(batchEndBlock), true); err != nil { s.Logger.Sugar().Errorw("Failed to run pipeline for block batch", zap.Error(err), diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 4bb22a92..ed2c1ace 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -26,9 +26,9 @@ type BlockStore interface { // Tables. type Block struct { - Number uint64 - Hash string - ParentHash string + Number uint64 `filter:"true"` + Hash string `filter:"true"` + ParentHash string `filter:"true"` BlockTime time.Time CreatedAt time.Time UpdatedAt time.Time @@ -36,12 +36,12 @@ type Block struct { } type Transaction struct { - BlockNumber uint64 - TransactionHash string - TransactionIndex uint64 - FromAddress string - ToAddress string - ContractAddress string + BlockNumber uint64 `filter:"true"` + TransactionHash string `filter:"true"` + TransactionIndex uint64 `filter:"true"` + FromAddress string `filter:"true"` + ToAddress string `filter:"true"` + ContractAddress string `filter:"true"` BytecodeHash string CreatedAt time.Time UpdatedAt time.Time @@ -49,13 +49,13 @@ type Transaction struct { } type TransactionLog struct { - TransactionHash string - TransactionIndex uint64 - BlockNumber uint64 - Address string + TransactionHash string `filter:"true"` + TransactionIndex uint64 `filter:"true"` + BlockNumber uint64 `filter:"true"` + Address string `filter:"true"` Arguments string - EventName string - LogIndex uint64 + EventName string `filter:"true"` + LogIndex uint64 `filter:"true"` OutputData string CreatedAt time.Time UpdatedAt time.Time