From cd790e8a73eba76976e8d3957b9d706721e76f48 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Tue, 4 Feb 2025 08:36:13 -0800 Subject: [PATCH] Operator signing computing (#1206) --- core/mock/writer.go | 12 + disperser/dataapi/docs/v2/V2_docs.go | 103 ++++ disperser/dataapi/docs/v2/V2_swagger.json | 103 ++++ disperser/dataapi/docs/v2/V2_swagger.yaml | 71 +++ disperser/dataapi/nonsigner_utils.go | 17 + disperser/dataapi/operator_handler.go | 4 + disperser/dataapi/subgraph/mock/api.go | 27 +- disperser/dataapi/v2/server_v2.go | 440 +++++++++++++++- disperser/dataapi/v2/server_v2_test.go | 605 +++++++++++++++++++++- 9 files changed, 1370 insertions(+), 12 deletions(-) diff --git a/core/mock/writer.go b/core/mock/writer.go index 0e3191eab5..4a173e4a2f 100644 --- a/core/mock/writer.go +++ b/core/mock/writer.go @@ -89,6 +89,9 @@ func (t *MockWriter) GetOperatorStakes(ctx context.Context, operatorId core.Oper func (t *MockWriter) GetOperatorStakesForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) (core.OperatorStakes, error) { args := t.Called() result := args.Get(0) + if fn, ok := result.(func([]core.QuorumID, uint32) core.OperatorStakes); ok { + return fn(quorums, blockNumber), args.Error(1) + } return result.(core.OperatorStakes), args.Error(1) } @@ -128,18 +131,27 @@ func (t *MockWriter) OperatorAddressToID(ctx context.Context, address gethcommon func (t *MockWriter) BatchOperatorIDToAddress(ctx context.Context, operatorIds []core.OperatorID) ([]gethcommon.Address, error) { args := t.Called() result := args.Get(0) + if fn, ok := result.(func([]core.OperatorID) []gethcommon.Address); ok { + return fn(operatorIds), args.Error(1) + } return result.([]gethcommon.Address), args.Error(1) } func (t *MockWriter) BatchOperatorAddressToID(ctx context.Context, addresses []gethcommon.Address) ([]core.OperatorID, error) { args := t.Called() result := args.Get(0) + if fn, ok := result.(func([]gethcommon.Address) []core.OperatorID); ok { + return fn(addresses), args.Error(1) + } return result.([]core.OperatorID), args.Error(1) } func (t *MockWriter) GetQuorumBitmapForOperatorsAtBlockNumber(ctx context.Context, operatorIds []core.OperatorID, blockNumber uint32) ([]*big.Int, error) { args := t.Called() result := args.Get(0) + if fn, ok := result.(func([]core.OperatorID, uint32) []*big.Int); ok { + return fn(operatorIds, blockNumber), args.Error(1) + } return result.([]*big.Int), args.Error(1) } diff --git a/disperser/dataapi/docs/v2/V2_docs.go b/disperser/dataapi/docs/v2/V2_docs.go index d72b5582b1..cac50119b1 100644 --- a/disperser/dataapi/docs/v2/V2_docs.go +++ b/disperser/dataapi/docs/v2/V2_docs.go @@ -501,6 +501,69 @@ const docTemplateV2 = `{ } } }, + "/operators/signing-info": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Operators" + ], + "summary": "Fetch operators signing info", + "parameters": [ + { + "type": "string", + "description": "Fetch operators signing info up to the end time (ISO 8601 format: 2006-01-02T15:04:05Z) [default: now]", + "name": "end", + "in": "query" + }, + { + "type": "integer", + "description": "Fetch operators signing info starting from an interval (in seconds) before the end time [default: 3600]", + "name": "interval", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of quorum IDs to fetch signing info for [default: 0,1]", + "name": "quorums", + "in": "query" + }, + { + "type": "boolean", + "description": "Whether to only return operators with signing rate less than 100% [default: false]", + "name": "nonsigner_only", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/v2.OperatorsSigningInfoResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/v2.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/v2.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/v2.ErrorResponse" + } + } + } + } + }, "/operators/stake": { "get": { "produces": [ @@ -1251,6 +1314,35 @@ const docTemplateV2 = `{ } } }, + "v2.OperatorSigningInfo": { + "type": "object", + "properties": { + "operator_address": { + "type": "string" + }, + "operator_id": { + "type": "string" + }, + "quorum_id": { + "type": "integer" + }, + "signing_percentage": { + "type": "number" + }, + "stake_percentage": { + "type": "number" + }, + "total_batches": { + "type": "integer" + }, + "total_responsible_batches": { + "type": "integer" + }, + "total_unsigned_batches": { + "type": "integer" + } + } + }, "v2.OperatorStake": { "type": "object", "properties": { @@ -1268,6 +1360,17 @@ const docTemplateV2 = `{ } } }, + "v2.OperatorsSigningInfoResponse": { + "type": "object", + "properties": { + "operator_signing_info": { + "type": "array", + "items": { + "$ref": "#/definitions/v2.OperatorSigningInfo" + } + } + } + }, "v2.OperatorsStakeResponse": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/v2/V2_swagger.json b/disperser/dataapi/docs/v2/V2_swagger.json index 83481830b3..7bf8fc5d48 100644 --- a/disperser/dataapi/docs/v2/V2_swagger.json +++ b/disperser/dataapi/docs/v2/V2_swagger.json @@ -498,6 +498,69 @@ } } }, + "/operators/signing-info": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Operators" + ], + "summary": "Fetch operators signing info", + "parameters": [ + { + "type": "string", + "description": "Fetch operators signing info up to the end time (ISO 8601 format: 2006-01-02T15:04:05Z) [default: now]", + "name": "end", + "in": "query" + }, + { + "type": "integer", + "description": "Fetch operators signing info starting from an interval (in seconds) before the end time [default: 3600]", + "name": "interval", + "in": "query" + }, + { + "type": "string", + "description": "Comma separated list of quorum IDs to fetch signing info for [default: 0,1]", + "name": "quorums", + "in": "query" + }, + { + "type": "boolean", + "description": "Whether to only return operators with signing rate less than 100% [default: false]", + "name": "nonsigner_only", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/v2.OperatorsSigningInfoResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/v2.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/v2.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/v2.ErrorResponse" + } + } + } + } + }, "/operators/stake": { "get": { "produces": [ @@ -1248,6 +1311,35 @@ } } }, + "v2.OperatorSigningInfo": { + "type": "object", + "properties": { + "operator_address": { + "type": "string" + }, + "operator_id": { + "type": "string" + }, + "quorum_id": { + "type": "integer" + }, + "signing_percentage": { + "type": "number" + }, + "stake_percentage": { + "type": "number" + }, + "total_batches": { + "type": "integer" + }, + "total_responsible_batches": { + "type": "integer" + }, + "total_unsigned_batches": { + "type": "integer" + } + } + }, "v2.OperatorStake": { "type": "object", "properties": { @@ -1265,6 +1357,17 @@ } } }, + "v2.OperatorsSigningInfoResponse": { + "type": "object", + "properties": { + "operator_signing_info": { + "type": "array", + "items": { + "$ref": "#/definitions/v2.OperatorSigningInfo" + } + } + } + }, "v2.OperatorsStakeResponse": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/v2/V2_swagger.yaml b/disperser/dataapi/docs/v2/V2_swagger.yaml index a49e818720..c2c5e3a90e 100644 --- a/disperser/dataapi/docs/v2/V2_swagger.yaml +++ b/disperser/dataapi/docs/v2/V2_swagger.yaml @@ -449,6 +449,25 @@ definitions: v2_dispersal_status: type: string type: object + v2.OperatorSigningInfo: + properties: + operator_address: + type: string + operator_id: + type: string + quorum_id: + type: integer + signing_percentage: + type: number + stake_percentage: + type: number + total_batches: + type: integer + total_responsible_batches: + type: integer + total_unsigned_batches: + type: integer + type: object v2.OperatorStake: properties: operator_id: @@ -460,6 +479,13 @@ definitions: stake_percentage: type: number type: object + v2.OperatorsSigningInfoResponse: + properties: + operator_signing_info: + items: + $ref: '#/definitions/v2.OperatorSigningInfo' + type: array + type: object v2.OperatorsStakeResponse: properties: stake_ranked_operators: @@ -846,6 +872,51 @@ paths: summary: Operator node reachability check tags: - Operators + /operators/signing-info: + get: + parameters: + - description: 'Fetch operators signing info up to the end time (ISO 8601 format: + 2006-01-02T15:04:05Z) [default: now]' + in: query + name: end + type: string + - description: 'Fetch operators signing info starting from an interval (in seconds) + before the end time [default: 3600]' + in: query + name: interval + type: integer + - description: 'Comma separated list of quorum IDs to fetch signing info for + [default: 0,1]' + in: query + name: quorums + type: string + - description: 'Whether to only return operators with signing rate less than + 100% [default: false]' + in: query + name: nonsigner_only + type: boolean + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/v2.OperatorsSigningInfoResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/v2.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/v2.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/v2.ErrorResponse' + summary: Fetch operators signing info + tags: + - Operators /operators/stake: get: parameters: diff --git a/disperser/dataapi/nonsigner_utils.go b/disperser/dataapi/nonsigner_utils.go index 0a8ef2217e..cc15491202 100644 --- a/disperser/dataapi/nonsigner_utils.go +++ b/disperser/dataapi/nonsigner_utils.go @@ -3,6 +3,8 @@ package dataapi import ( "fmt" "sort" + + corev2 "github.com/Layr-Labs/eigenda/core/v2" ) // NumBatchesAtBlock represents the number of batches at current block. @@ -224,6 +226,21 @@ func CreateQuorumBatchMap(batches []*BatchNonSigningInfo) map[uint8]map[uint32]i return quorumBatchMap } +// CreateQuorumBatchMapV2 returns quorumBatchMap, where quorumBatchMap[q][b] means the number of +// batches at block b that have dispersed to quorum q. +func CreateQuorumBatchMapV2(attestations []*corev2.Attestation) map[uint8]map[uint32]int { + quorumBatchMap := make(map[uint8]map[uint32]int) + for _, at := range attestations { + for _, q := range at.QuorumNumbers { + if _, ok := quorumBatchMap[q]; !ok { + quorumBatchMap[q] = make(map[uint32]int) + } + quorumBatchMap[q][uint32(at.ReferenceBlockNumber)]++ + } + } + return quorumBatchMap +} + // CreatQuorumBatches returns quorumBatches, where quorumBatches[q] is a list of // QuorumBatches in ascending order by block number. func CreatQuorumBatches(quorumBatchMap map[uint8]map[uint32]int) map[uint8]*QuorumBatches { diff --git a/disperser/dataapi/operator_handler.go b/disperser/dataapi/operator_handler.go index 36a2cedb50..200bdbe67a 100644 --- a/disperser/dataapi/operator_handler.go +++ b/disperser/dataapi/operator_handler.go @@ -46,6 +46,10 @@ func NewOperatorList() *OperatorList { } } +func (o *OperatorList) GetOperatorIds() []core.OperatorID { + return o.operatorIds +} + func (o *OperatorList) Add(id core.OperatorID, address string) { if _, exists := o.idToAddress[id]; exists { return diff --git a/disperser/dataapi/subgraph/mock/api.go b/disperser/dataapi/subgraph/mock/api.go index 1721198db6..de078f42d8 100644 --- a/disperser/dataapi/subgraph/mock/api.go +++ b/disperser/dataapi/subgraph/mock/api.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "slices" + "strconv" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" "github.com/stretchr/testify/mock" @@ -148,7 +149,18 @@ func (m *MockSubgraphApi) QueryOperatorAddedToQuorum(ctx context.Context, startB value = args.Get(0).([]*subgraph.OperatorQuorum) } - return value, args.Error(1) + result := make([]*subgraph.OperatorQuorum, 0) + for _, oq := range value { + blockNum, err := strconv.ParseUint(string(oq.BlockNumber), 10, 64) + if err != nil { + return nil, err + } + if blockNum >= uint64(startBlock) && blockNum <= uint64(endBlock) { + result = append(result, oq) + } + } + + return result, args.Error(1) } func (m *MockSubgraphApi) QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*subgraph.OperatorQuorum, error) { @@ -159,7 +171,18 @@ func (m *MockSubgraphApi) QueryOperatorRemovedFromQuorum(ctx context.Context, st value = args.Get(0).([]*subgraph.OperatorQuorum) } - return value, args.Error(1) + result := make([]*subgraph.OperatorQuorum, 0) + for _, oq := range value { + blockNum, err := strconv.ParseUint(string(oq.BlockNumber), 10, 64) + if err != nil { + return nil, err + } + if blockNum >= uint64(startBlock) && blockNum <= uint64(endBlock) { + result = append(result, oq) + } + } + + return result, args.Error(1) } func (m *MockSubgraphApi) QueryOperatorEjectionsGteBlockTimestamp(ctx context.Context, blockTimestamp uint64, first uint, skip uint) ([]*subgraph.OperatorEjection, error) { diff --git a/disperser/dataapi/v2/server_v2.go b/disperser/dataapi/v2/server_v2.go index 777f295769..d3e19a67f0 100644 --- a/disperser/dataapi/v2/server_v2.go +++ b/disperser/dataapi/v2/server_v2.go @@ -5,10 +5,12 @@ import ( "encoding/hex" "errors" "fmt" + "math" "math/big" "net/http" "os" "os/signal" + "sort" "strconv" "strings" "syscall" @@ -22,6 +24,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/dataapi" docsv2 "github.com/Layr-Labs/eigenda/disperser/dataapi/docs/v2" "github.com/Layr-Labs/eigensdk-go/logging" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/gin-contrib/cors" "github.com/gin-contrib/logger" "github.com/gin-gonic/gin" @@ -43,6 +46,9 @@ const ( // range or "limit" param. maxNumBatchesPerBatchFeedResponse = 1000 + // The quorum IDs that are allowed to query for signing info are [0, maxQuorumIDAllowed] + maxQuorumIDAllowed = 2 + cacheControlParam = "Cache-Control" maxFeedBlobAge = 300 // this is completely static maxOperatorsStakeAge = 300 // not expect the stake change to happen frequently @@ -109,6 +115,20 @@ type ( AvgThroughput float64 `json:"avg_throughput"` } + OperatorSigningInfo struct { + OperatorId string `json:"operator_id"` + OperatorAddress string `json:"operator_address"` + QuorumId uint8 `json:"quorum_id"` + TotalUnsignedBatches int `json:"total_unsigned_batches"` + TotalResponsibleBatches int `json:"total_responsible_batches"` + TotalBatches int `json:"total_batches"` + SigningPercentage float64 `json:"signing_percentage"` + StakePercentage float64 `json:"stake_percentage"` + } + OperatorsSigningInfoResponse struct { + OperatorSigningInfo []*OperatorSigningInfo `json:"operator_signing_info"` + } + OperatorStake struct { QuorumId string `json:"quorum_id"` OperatorId string `json:"operator_id"` @@ -251,7 +271,7 @@ func (s *ServerV2) Start() error { } operators := v2.Group("/operators") { - operators.GET("/nonsigners", s.FetchNonSingers) + operators.GET("/signing-info", s.FetchOperatorSigningInfo) operators.GET("/stake", s.FetchOperatorsStake) operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo) operators.GET("/reachability", s.CheckOperatorsReachability) @@ -385,7 +405,7 @@ func (s *ServerV2) FetchBlobFeedHandler(c *gin.Context) { } if endTime.Before(oldestTime) { s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedHandler") - invalidParamsErrorResponse(c, fmt.Errorf("end time cannot be more than 14 days ago from now, found: %s", c.Query("end"))) + invalidParamsErrorResponse(c, fmt.Errorf("end time cannot be more than 14 days in the past, found: %s", c.Query("end"))) return } } @@ -429,6 +449,9 @@ func (s *ServerV2) FetchBlobFeedHandler(c *gin.Context) { } startTime := endTime.Add(-time.Duration(interval) * time.Second) + if startTime.Before(oldestTime) { + startTime = oldestTime + } startCursor := blobstore.BlobFeedCursor{ RequestedAt: uint64(startTime.UnixNano()), } @@ -591,6 +614,128 @@ func (s *ServerV2) FetchBlobInclusionInfoHandler(c *gin.Context) { c.JSON(http.StatusOK, response) } +// FetchOperatorSigningInfo godoc +// +// @Summary Fetch operators signing info +// @Tags Operators +// @Produce json +// @Param end query string false "Fetch operators signing info up to the end time (ISO 8601 format: 2006-01-02T15:04:05Z) [default: now]" +// @Param interval query int false "Fetch operators signing info starting from an interval (in seconds) before the end time [default: 3600]" +// @Param quorums query string false "Comma separated list of quorum IDs to fetch signing info for [default: 0,1]" +// @Param nonsigner_only query boolean false "Whether to only return operators with signing rate less than 100% [default: false]" +// @Success 200 {object} OperatorsSigningInfoResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators/signing-info [get] +func (s *ServerV2) FetchOperatorSigningInfo(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchOperatorSigningInfo", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + var err error + + now := time.Now() + oldestTime := now.Add(-maxBlobAge) + + endTime := now + if c.Query("end") != "" { + endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end")) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorSigningInfo") + invalidParamsErrorResponse(c, fmt.Errorf("failed to parse end param: %w", err)) + return + } + if endTime.Before(oldestTime) { + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorSigningInfo") + invalidParamsErrorResponse( + c, fmt.Errorf("end time cannot be more than 14 days in the past, found: %s", c.Query("end")), + ) + return + } + } + + interval := 3600 + if c.Query("interval") != "" { + interval, err = strconv.Atoi(c.Query("interval")) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorSigningInfo") + invalidParamsErrorResponse(c, fmt.Errorf("failed to parse interval param: %w", err)) + return + } + if interval <= 0 { + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorSigningInfo") + invalidParamsErrorResponse(c, fmt.Errorf("interval must be greater than 0, found: %d", interval)) + return + } + } + + quorumStr := "0,1" + if c.Query("quorums") != "" { + quorumStr = c.Query("quorums") + } + quorums := strings.Split(quorumStr, ",") + quorumsSeen := make(map[uint8]struct{}, 0) + for _, idStr := range quorums { + id, err := strconv.Atoi(idStr) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorSigningInfo") + invalidParamsErrorResponse(c, fmt.Errorf("failed to parse the provided quorum: %s", quorumStr)) + return + } + if id < 0 || id > maxQuorumIDAllowed { + s.metrics.IncrementInvalidArgRequestNum("FetchOperatorSigningInfo") + invalidParamsErrorResponse( + c, fmt.Errorf("the quorumID must be in range [0, %d], found: %d", maxQuorumIDAllowed, id), + ) + return + } + quorumsSeen[uint8(id)] = struct{}{} + } + quorumIds := make([]uint8, 0, len(quorumsSeen)) + for q := range quorumsSeen { + quorumIds = append(quorumIds, q) + } + + nonsignerOnly := false + if c.Query("nonsigner_only") != "" { + nonsignerOnlyStr := c.Query("nonsigner_only") + nonsignerOnly, err = strconv.ParseBool(nonsignerOnlyStr) + if err != nil { + invalidParamsErrorResponse(c, errors.New("the nonsigner_only param must be \"true\" or \"false\"")) + return + } + } + + startTime := endTime.Add(-time.Duration(interval) * time.Second) + if startTime.Before(oldestTime) { + startTime = oldestTime + } + + attestations, err := s.blobMetadataStore.GetAttestationByAttestedAt( + c.Request.Context(), uint64(startTime.UnixNano())+1, uint64(endTime.UnixNano()), -1, + ) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchOperatorSigningInfo") + errorResponse(c, fmt.Errorf("failed to fetch attestation feed from blob metadata store: %w", err)) + return + } + + signingInfo, err := s.computeOperatorsSigningInfo(c.Request.Context(), attestations, quorumIds, nonsignerOnly) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchOperatorSigningInfo") + errorResponse(c, fmt.Errorf("failed to compute the operators signing info: %w", err)) + return + } + response := OperatorsSigningInfoResponse{ + OperatorSigningInfo: signingInfo, + } + + s.metrics.IncrementSuccessfulRequestNum("FetchOperatorSigningInfo") + c.JSON(http.StatusOK, response) +} + // FetchBatchFeedHandler godoc // // @Summary Fetch batch feed @@ -625,7 +770,7 @@ func (s *ServerV2) FetchBatchFeedHandler(c *gin.Context) { } if endTime.Before(oldestTime) { s.metrics.IncrementInvalidArgRequestNum("FetchBatchFeedHandler") - invalidParamsErrorResponse(c, fmt.Errorf("end time cannot be more than 14 days ago from now, found: %s", c.Query("end"))) + invalidParamsErrorResponse(c, fmt.Errorf("end time cannot be more than 14 days in the past, found: %s", c.Query("end"))) return } } @@ -880,10 +1025,6 @@ func (s *ServerV2) CheckOperatorsReachability(c *gin.Context) { c.JSON(http.StatusOK, portCheckResponse) } -func (s *ServerV2) FetchNonSingers(c *gin.Context) { - errorResponse(c, errors.New("FetchNonSingers unimplemented")) -} - // FetchMetricsSummaryHandler godoc // // @Summary Fetch metrics summary @@ -969,3 +1110,288 @@ func (s *ServerV2) FetchMetricsThroughputTimeseriesHandler(c *gin.Context) { c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxThroughputAge)) c.JSON(http.StatusOK, ths) } + +func (s *ServerV2) computeOperatorsSigningInfo( + ctx context.Context, + attestations []*corev2.Attestation, + quorumIDs []uint8, + nonsignerOnly bool, +) ([]*OperatorSigningInfo, error) { + if len(attestations) == 0 { + return nil, errors.New("no attestations to compute signing info") + } + + // Compute the block number range [startBlock, endBlock] (both inclusive) when the + // attestations have happened. + startBlock := attestations[0].ReferenceBlockNumber + endBlock := attestations[0].ReferenceBlockNumber + for i := range attestations { + if startBlock > attestations[i].ReferenceBlockNumber { + startBlock = attestations[i].ReferenceBlockNumber + } + if endBlock < attestations[i].ReferenceBlockNumber { + endBlock = attestations[i].ReferenceBlockNumber + } + } + + // Get quorum change events in range [startBlock+1, endBlock]. + // We don't need the events at startBlock because we'll fetch all active operators and + // quorums at startBlock. + operatorQuorumEvents, err := s.subgraphClient.QueryOperatorQuorumEvent(ctx, uint32(startBlock+1), uint32(endBlock)) + if err != nil { + return nil, err + } + + // Get operators of interest to compute signing info, which includes: + // - operators that were active at startBlock + // - operators that joined after startBlock + operatorList, err := s.getOperatorsOfInterest( + ctx, startBlock, endBlock, quorumIDs, operatorQuorumEvents, + ) + if err != nil { + return nil, err + } + + // Create operators' quorum intervals: OperatorQuorumIntervals[op][q] is a sequence of + // increasing and non-overlapping block intervals during which the operator "op" is + // registered in quorum "q". + operatorQuorumIntervals, _, err := s.operatorHandler.CreateOperatorQuorumIntervals( + ctx, operatorList, operatorQuorumEvents, uint32(startBlock), uint32(endBlock), + ) + if err != nil { + return nil, err + } + + // Compute num batches failed, where numFailed[op][q] is the number of batches + // failed to sign for quorum "q" by operator "op". + numFailed := computeNumFailed(attestations, operatorQuorumIntervals) + + // Compute num batches responsible, where numResponsible[op][q] is the number of batches + // that operator "op" are responsible for in quorum "q". + numResponsible := computeNumResponsible(attestations, operatorQuorumIntervals) + + totalNumBatchesPerQuorum := computeTotalNumBatchesPerQuorum(attestations) + + state, err := s.chainState.GetOperatorState(ctx, uint(endBlock), quorumIDs) + if err != nil { + return nil, err + } + signingInfo := make([]*OperatorSigningInfo, 0) + for _, op := range operatorList.GetOperatorIds() { + for _, q := range quorumIDs { + operatorId := op.Hex() + + numShouldHaveSigned := 0 + if num, exist := safeAccess(numResponsible, operatorId, q); exist { + numShouldHaveSigned = num + } + // The operator op received no batch that it should sign. + if numShouldHaveSigned == 0 { + continue + } + + numFailedToSign := 0 + if num, exist := safeAccess(numFailed, operatorId, q); exist { + numFailedToSign = num + } + + if nonsignerOnly && numFailedToSign == 0 { + continue + } + + operatorAddress, ok := operatorList.GetAddress(operatorId) + if !ok { + // This should never happen (becuase OperatorList ensures the 1:1 mapping + // between ID and address), but we don't fail the entire request, just + // mark internal error for the address field to signal the issue. + operatorAddress = "Unexpected internal error" + s.logger.Error("Internal error: failed to find address for operatorId", "operatorId", operatorId) + } + + // Signing percentage with 2 decimal (e.g. 95.75, which means 95.75%) + signingPercentage := math.Round( + (float64(numShouldHaveSigned-numFailedToSign)/float64(numShouldHaveSigned))*100*100, + ) / 100 + + stakePercentage := float64(0) + if stake, ok := state.Operators[q][op]; ok { + totalStake := new(big.Float).SetInt(state.Totals[q].Stake) + stakePercentage, _ = new(big.Float).Quo( + new(big.Float).SetInt(stake.Stake), + totalStake).Float64() + } + + si := &OperatorSigningInfo{ + OperatorId: operatorId, + OperatorAddress: operatorAddress, + QuorumId: q, + TotalUnsignedBatches: numFailedToSign, + TotalResponsibleBatches: numShouldHaveSigned, + TotalBatches: totalNumBatchesPerQuorum[q], + SigningPercentage: signingPercentage, + StakePercentage: stakePercentage, + } + signingInfo = append(signingInfo, si) + } + } + + // Sort by descending order of signing rate and then ascending order of . + sort.Slice(signingInfo, func(i, j int) bool { + if signingInfo[i].SigningPercentage == signingInfo[j].SigningPercentage { + if signingInfo[i].OperatorId == signingInfo[j].OperatorId { + return signingInfo[i].QuorumId < signingInfo[j].QuorumId + } + return signingInfo[i].OperatorId < signingInfo[j].OperatorId + } + return signingInfo[i].SigningPercentage > signingInfo[j].SigningPercentage + }) + + return signingInfo, nil +} + +// getOperatorsOfInterest returns operators that we want to compute signing info for. +// +// This contains two parts: +// - the operators that were active at the startBlock +// - the operators that joined after startBlock +func (s *ServerV2) getOperatorsOfInterest( + ctx context.Context, + startBlock, endBlock uint64, + quorumIDs []uint8, + operatorQuorumEvents *dataapi.OperatorQuorumEvents, +) (*dataapi.OperatorList, error) { + operatorList := dataapi.NewOperatorList() + + // The first part: active operators at startBlock + operatorsByQuorum, err := s.chainReader.GetOperatorStakesForQuorums(ctx, quorumIDs, uint32(startBlock)) + if err != nil { + return nil, err + } + operatorsSeen := make(map[core.OperatorID]struct{}, 0) + for _, ops := range operatorsByQuorum { + for _, op := range ops { + operatorsSeen[op.OperatorID] = struct{}{} + } + } + operatorIDs := make([]core.OperatorID, 0) + for id := range operatorsSeen { + operatorIDs = append(operatorIDs, id) + } + // Get the address for the operators. + // operatorAddresses[i] is the address for operatorIDs[i]. + operatorAddresses, err := s.chainReader.BatchOperatorIDToAddress(ctx, operatorIDs) + if err != nil { + return nil, err + } + for i := range operatorIDs { + operatorList.Add(operatorIDs[i], operatorAddresses[i].Hex()) + } + + // The second part: new operators after startBlock. + newAddresses := make(map[string]struct{}, 0) + for op := range operatorQuorumEvents.AddedToQuorum { + if _, exist := operatorList.GetID(op); !exist { + newAddresses[op] = struct{}{} + } + } + for op := range operatorQuorumEvents.RemovedFromQuorum { + if _, exist := operatorList.GetID(op); !exist { + newAddresses[op] = struct{}{} + } + } + addresses := make([]gethcommon.Address, 0, len(newAddresses)) + for addr := range newAddresses { + addresses = append(addresses, gethcommon.HexToAddress(addr)) + } + operatorIds, err := s.chainReader.BatchOperatorAddressToID(ctx, addresses) + if err != nil { + return nil, err + } + // We merge the new operators observed in AddedToQuorum and RemovedFromQuorum + // into the operator set. + for i := 0; i < len(operatorIds); i++ { + operatorList.Add(operatorIds[i], addresses[i].Hex()) + } + + return operatorList, nil +} + +func computeNumFailed( + attestations []*corev2.Attestation, + operatorQuorumIntervals dataapi.OperatorQuorumIntervals, +) map[string]map[uint8]int { + numFailed := make(map[string]map[uint8]int) + for _, at := range attestations { + for _, pubkey := range at.NonSignerPubKeys { + op := pubkey.GetOperatorID().Hex() + // Note: avg number of quorums per operator is a small number, so use brute + // force here (otherwise, we can create a map to make it more efficient) + for _, operatorQuorum := range operatorQuorumIntervals.GetQuorums( + op, + uint32(at.ReferenceBlockNumber), + ) { + for _, batchQuorum := range at.QuorumNumbers { + if operatorQuorum == batchQuorum { + if _, ok := numFailed[op]; !ok { + numFailed[op] = make(map[uint8]int) + } + numFailed[op][operatorQuorum]++ + break + } + } + } + } + } + return numFailed +} + +func computeNumResponsible( + attestations []*corev2.Attestation, + operatorQuorumIntervals dataapi.OperatorQuorumIntervals, +) map[string]map[uint8]int { + // Create quorumBatches, where quorumBatches[q].AccuBatches is the total number of + // batches in block interval [startBlock, b] for quorum "q". + quorumBatches := dataapi.CreatQuorumBatches(dataapi.CreateQuorumBatchMapV2(attestations)) + + numResponsible := make(map[string]map[uint8]int) + for op, val := range operatorQuorumIntervals { + if _, ok := numResponsible[op]; !ok { + numResponsible[op] = make(map[uint8]int) + } + for q, intervals := range val { + numBatches := 0 + if _, ok := quorumBatches[q]; ok { + for _, interval := range intervals { + numBatches += dataapi.ComputeNumBatches( + quorumBatches[q], interval.StartBlock, interval.EndBlock, + ) + } + } + numResponsible[op][q] = numBatches + } + } + + return numResponsible +} + +func computeTotalNumBatchesPerQuorum(attestations []*corev2.Attestation) map[uint8]int { + numBatchesPerQuorum := make(map[uint8]int) + for _, at := range attestations { + for _, q := range at.QuorumNumbers { + numBatchesPerQuorum[q]++ + } + } + return numBatchesPerQuorum +} + +func safeAccess(data map[string]map[uint8]int, i string, j uint8) (int, bool) { + innerMap, ok := data[i] + if !ok { + return 0, false // Key i does not exist + } + val, ok := innerMap[j] + if !ok { + return 0, false // Key j does not exist in the inner map + } + return val, true +} diff --git a/disperser/dataapi/v2/server_v2_test.go b/disperser/dataapi/v2/server_v2_test.go index 7e7fa251e8..2f5aaa6768 100644 --- a/disperser/dataapi/v2/server_v2_test.go +++ b/disperser/dataapi/v2/server_v2_test.go @@ -3,6 +3,7 @@ package v2_test import ( "bytes" "context" + "crypto/ecdsa" "crypto/rand" _ "embed" "encoding/hex" @@ -19,6 +20,7 @@ import ( "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" "github.com/Layr-Labs/eigenda/common/testutils" "github.com/Layr-Labs/eigenda/core" @@ -35,9 +37,11 @@ import ( serverv2 "github.com/Layr-Labs/eigenda/disperser/dataapi/v2" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/consensys/gnark-crypto/ecc/bn254" "github.com/consensys/gnark-crypto/ecc/bn254/fp" gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/ory/dockertest/v3" @@ -57,6 +61,8 @@ var ( //go:embed testdata/prometheus-resp-avg-throughput.json mockPrometheusRespAvgThroughput string + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) blobMetadataStore *blobstorev2.BlobMetadataStore testDataApiServerV2 *serverv2.ServerV2 @@ -68,6 +74,8 @@ var ( dockertestResource *dockertest.Resource deployLocalStack bool + dynamoClient dynamodb.Client + mockLogger = testutils.GetLogger() blobstore = inmem.NewBlobStore() mockPrometheusApi = &prommock.MockPrometheusApi{} @@ -202,7 +210,6 @@ func setup(m *testing.M) { SecretAccessKey: "localstack", EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), } - metadataTableName := fmt.Sprintf("test-BlobMetadata-%v", uuid.New()) _, err := test_utils.CreateTable(context.Background(), cfg, metadataTableName, blobstorev2.GenerateTableSchema(metadataTableName, 10, 10)) if err != nil { teardown() @@ -210,7 +217,7 @@ func setup(m *testing.M) { } // Create BlobMetadataStore - dynamoClient, err := dynamodb.NewClient(cfg, logger) + dynamoClient, err = dynamodb.NewClient(cfg, logger) if err != nil { teardown() panic("failed to create dynamodb client: " + err.Error()) @@ -306,12 +313,27 @@ func checkBlobKeyEqual(t *testing.T, blobKey corev2.BlobKey, blobHeader *corev2. assert.Equal(t, blobKey, bk) } +func checkOperatorSigningInfoEqual(t *testing.T, actual, expected *serverv2.OperatorSigningInfo) { + assert.Equal(t, expected.OperatorId, actual.OperatorId) + assert.Equal(t, expected.OperatorAddress, actual.OperatorAddress) + assert.Equal(t, expected.QuorumId, actual.QuorumId) + assert.Equal(t, expected.TotalUnsignedBatches, actual.TotalUnsignedBatches) + assert.Equal(t, expected.TotalResponsibleBatches, actual.TotalResponsibleBatches) + assert.Equal(t, expected.TotalBatches, actual.TotalBatches) +} + func checkPaginationToken(t *testing.T, token string, requestedAt uint64, blobKey corev2.BlobKey) { cursor, err := new(blobstorev2.BlobFeedCursor).FromCursorKey(token) require.NoError(t, err) assert.True(t, cursor.Equal(requestedAt, &blobKey)) } +func deleteItems(t *testing.T, keys []commondynamodb.Key) { + failed, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) + assert.NoError(t, err) + assert.Len(t, failed, 0) +} + func TestFetchBlobHandlerV2(t *testing.T) { r := setUpRouter() @@ -642,6 +664,11 @@ func TestFetchBatchHandlerV2(t *testing.T) { } err = blobMetadataStore.PutAttestation(context.Background(), attestation) require.NoError(t, err) + dk := commondynamodb.Key{ + "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + batchHeaderHash}, + "SK": &types.AttributeValueMemberS{Value: "Attestation"}, + } + defer deleteItems(t, []commondynamodb.Key{dk}) r.GET("/v2/batches/:batch_header_hash", testDataApiServerV2.FetchBatchHandler) @@ -666,11 +693,14 @@ func TestFetchBatchFeedHandler(t *testing.T) { nanoSecsPerBatch := uint64(time.Minute.Nanoseconds()) // 1 batch per minute attestedAt := make([]uint64, numBatches) batchHeaders := make([]*corev2.BatchHeader, numBatches) + dynamoKeys := make([]commondynamodb.Key, numBatches) for i := 0; i < numBatches; i++ { batchHeaders[i] = &corev2.BatchHeader{ BatchRoot: [32]byte{1, 2, byte(i)}, ReferenceBlockNumber: uint64(i + 1), } + bhh, err := batchHeaders[i].Hash() + require.NoError(t, err) keyPair, err := core.GenRandomBlsKeys() assert.NoError(t, err) apk := keyPair.GetPubKeyG2() @@ -697,8 +727,13 @@ func TestFetchBatchFeedHandler(t *testing.T) { }, } err = blobMetadataStore.PutAttestation(ctx, attestation) - assert.NoError(t, err) + require.NoError(t, err) + dynamoKeys[i] = commondynamodb.Key{ + "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, + "SK": &types.AttributeValueMemberS{Value: "Attestation"}, + } } + defer deleteItems(t, dynamoKeys) r.GET("/v2/batches/feed", testDataApiServerV2.FetchBatchFeedHandler) @@ -771,6 +806,533 @@ func TestFetchBatchFeedHandler(t *testing.T) { }) } +func TestFetchOperatorSigningInfo(t *testing.T) { + r := setUpRouter() + ctx := context.Background() + + /* + Test data setup + + Column definitions: + - Batch: Batch number + - AttestedAt: Timestamp of attestation (sortkey of this table) + - RefBlockNum: Reference block number + - Quorums: Quorum numbers used by the batch + - Nonsigners: Operators that didn't sign for the batch + - Active operators: Mapping of operator ID to their quorum assignments at the block + + Data: + +-------+------------+-------------+---------+------------+------------------------+ + | Batch | AttestedAt | RefBlockNum | Quorums | Nonsigners | Active operators | + +-------+------------+-------------+---------+------------+------------------------+ + | 1 | A | 1 | 0,1 | 3 | 1: {2} | + | | | | | | 2: {0,1} | + | | | | | | 3: {0,1} | + +-------+------------+-------------+---------+------------+------------------------+ + | 2 | B | 3 | 1 | 4 | 1: {2} | + | | | | | | 2: {0,1} | + | | | | | | 3: {0,1} | + | | | | | | 4: {0,1} | + | | | | | | 5: {0} | + +-------+------------+-------------+---------+------------+------------------------+ + | 3 | C | 2 | 0 | 3 | 1: {2} | + | | | | | | 2: {0,1} | + | | | | | | 3: {0,1} | + | | | | | | 4: {0,1} | + +-------+------------+-------------+---------+------------+------------------------+ + | 4 | D | 2 | 0,1 | None | 1: {2} | + | | | | | | 2: {0,1} | + | | | | | | 3: {0,1} | + | | | | | | 4: {0,1} | + +-------+------------+-------------+---------+------------+------------------------+ + | 5 | E | 4 | 0,1 | 3,5 | 1: {2} | + | | | | | | 2: {0,1} | + | | | | | | 3: {0,1} | + | | | | | | 5: {0} | + +-------+------------+-------------+---------+------------+------------------------+ + | 6 | F | 5 | 0 | 5 | 1: {2} | + | | | | | | 2: {0,1} | + | | | | | | 3: {0,1} | + | | | | | | 5: {0} | + +-------+------------+-------------+---------+------------+------------------------+ + */ + + // Create test operators + // Note: the operator numbered 1-5 in the above tables are corresponding to the + // operatorIds[0], ..., operatorIds[4] here + numOperators := 5 + operatorIds := make([]core.OperatorID, numOperators) + operatorAddresses := make([]gethcommon.Address, numOperators) + operatorG1s := make([]*core.G1Point, numOperators) + operatorIDToAddr := make(map[string]gethcommon.Address) + operatorAddrToID := make(map[string]core.OperatorID) + for i := 0; i < numOperators; i++ { + operatorG1s[i] = core.NewG1Point(big.NewInt(int64(i)), big.NewInt(int64(i+1))) + operatorIds[i] = operatorG1s[i].GetOperatorID() + privateKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + require.NoError(t, err) + publicKey := privateKey.Public().(*ecdsa.PublicKey) + operatorAddresses[i] = crypto.PubkeyToAddress(*publicKey) + + operatorIDToAddr[operatorIds[i].Hex()] = operatorAddresses[i] + operatorAddrToID[operatorAddresses[i].Hex()] = operatorIds[i] + } + + // Mocking using a map function so we can always maintain the ID and address mapping + // defined above, ie. operatorIds[i] <-> operatorAddresses[i] + mockTx.On("BatchOperatorIDToAddress").Return( + func(ids []core.OperatorID) []gethcommon.Address { + result := make([]gethcommon.Address, len(ids)) + for i, id := range ids { + result[i] = operatorIDToAddr[id.Hex()] + } + return result + }, + nil, + ) + mockTx.On("BatchOperatorAddressToID").Return( + func(addrs []gethcommon.Address) []core.OperatorID { + result := make([]core.OperatorID, len(addrs)) + for i, addr := range addrs { + result[i] = operatorAddrToID[addr.Hex()] + } + return result + }, + nil, + ) + + // Mocking using a map function so we can always maintain the ID and address mapping + // defined above, ie. operatorIds[i] <-> operatorAddresses[i] + // We prepare data at two blocks (1 and 4) as they will be hit by queries below + operatorIntialQuorumsByBlock := map[uint32]map[core.OperatorID]*big.Int{ + 1: map[core.OperatorID]*big.Int{ + operatorIds[0]: big.NewInt(4), // quorum 2 + operatorIds[1]: big.NewInt(3), // quorum 0,1 + operatorIds[2]: big.NewInt(3), // quorum 0,1 + operatorIds[3]: big.NewInt(0), // no quorum + operatorIds[4]: big.NewInt(0), // no quorum + }, + 4: map[core.OperatorID]*big.Int{ + operatorIds[0]: big.NewInt(4), // quorum 2 + operatorIds[1]: big.NewInt(3), // quorum 0,1 + operatorIds[2]: big.NewInt(3), // quorum 0,1 + operatorIds[3]: big.NewInt(0), // no quorum + operatorIds[4]: big.NewInt(1), // quorum 0 + }, + } + mockTx.On("GetQuorumBitmapForOperatorsAtBlockNumber").Return( + func(ids []core.OperatorID, blockNum uint32) []*big.Int { + bitmaps := make([]*big.Int, len(ids)) + for i, id := range ids { + bitmaps[i] = operatorIntialQuorumsByBlock[blockNum][id] + } + return bitmaps + }, + nil, + ) + + // We prepare data at two blocks (1 and 4) as they will be hit by queries below + operatorStakesByBlock := map[uint32]core.OperatorStakes{ + 1: core.OperatorStakes{ + 0: { + 0: { + OperatorID: operatorIds[1], + Stake: big.NewInt(2), + }, + 1: { + OperatorID: operatorIds[2], + Stake: big.NewInt(2), + }, + }, + 1: { + 0: { + OperatorID: operatorIds[1], + Stake: big.NewInt(2), + }, + 1: { + OperatorID: operatorIds[2], + Stake: big.NewInt(2), + }, + }, + 2: { + 1: { + OperatorID: operatorIds[0], + Stake: big.NewInt(2), + }, + }, + }, + 4: core.OperatorStakes{ + 0: { + 0: { + OperatorID: operatorIds[1], + Stake: big.NewInt(2), + }, + 1: { + OperatorID: operatorIds[2], + Stake: big.NewInt(2), + }, + 2: { + OperatorID: operatorIds[4], + Stake: big.NewInt(2), + }, + }, + 1: { + 0: { + OperatorID: operatorIds[1], + Stake: big.NewInt(2), + }, + 1: { + OperatorID: operatorIds[2], + Stake: big.NewInt(2), + }, + }, + 2: { + 1: { + OperatorID: operatorIds[0], + Stake: big.NewInt(2), + }, + }, + }, + } + mockTx.On("GetOperatorStakesForQuorums").Return( + func(quorums []core.QuorumID, blockNum uint32) core.OperatorStakes { + return operatorStakesByBlock[blockNum] + }, + nil, + ) + + // operatorIds[3], operatorIds[4] were not active at the first block, but were added to + // quorums after startBlock (see the above table). + operatorAddedToQuorum := []*subgraph.OperatorQuorum{ + { + Operator: graphql.String(operatorAddresses[3].Hex()), + QuorumNumbers: "0x0001", + BlockNumber: "2", + BlockTimestamp: "1702666070", + }, + { + Operator: graphql.String(operatorAddresses[4].Hex()), + QuorumNumbers: "0x00", + BlockNumber: "3", + BlockTimestamp: "1702666070", + }, + } + operatorRemovedFromQuorum := []*subgraph.OperatorQuorum{ + { + Operator: graphql.String(operatorAddresses[3].Hex()), + QuorumNumbers: "0x0001", + BlockNumber: "4", + BlockTimestamp: "1702666058", + }, + } + mockSubgraphApi.On("QueryOperatorAddedToQuorum").Return(operatorAddedToQuorum, nil) + mockSubgraphApi.On("QueryOperatorRemovedFromQuorum").Return(operatorRemovedFromQuorum, nil) + + // Create a timeline of test batches + // See the above table for the choices of reference block number, quorums and nonsigners + // for each batch + numBatches := 6 + now := uint64(time.Now().UnixNano()) + firstBatchTime := now - uint64(32*time.Minute.Nanoseconds()) + nanoSecsPerBatch := uint64(5 * time.Minute.Nanoseconds()) // 1 batch per 5 minutes + attestedAt := make([]uint64, numBatches) + for i := 0; i < numBatches; i++ { + attestedAt[i] = firstBatchTime + uint64(i)*nanoSecsPerBatch + } + referenceBlockNum := []uint64{1, 3, 2, 2, 4, 5} + quorums := [][]uint8{{0, 1}, {1}, {0}, {0, 1}, {0, 1}, {0}} + nonsigners := [][]*core.G1Point{ + {operatorG1s[2]}, + {operatorG1s[3]}, + {operatorG1s[2]}, + {}, + {operatorG1s[2], operatorG1s[4]}, + {operatorG1s[4]}, + } + dynamoKeys := make([]commondynamodb.Key, numBatches) + for i := 0; i < numBatches; i++ { + attestation := createAttestation(t, referenceBlockNum[i], attestedAt[i], nonsigners[i], quorums[i]) + err := blobMetadataStore.PutAttestation(ctx, attestation) + require.NoError(t, err) + bhh, err := attestation.BatchHeader.Hash() + require.NoError(t, err) + dynamoKeys[i] = commondynamodb.Key{ + "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, + "SK": &types.AttributeValueMemberS{Value: "Attestation"}, + } + } + defer deleteItems(t, dynamoKeys) + + /* + Resulting Operator SigningInfo (for block range [1, 5]) + + Column definitions: + - : Operator ID and quorum pair + - Total responsible: Total number of batches the operator was responsible for + - Total nonsigning: Number of batches where operator did not sign + - Signing rate: Percentage of batches signed by + + Data: + +------------------+-------------------+------------------+--------------+ + | | Total responsible | Total nonsigning | Signing rate | + +------------------+-------------------+------------------+--------------+ + | <2, 0> | 5 | 0 | 100% | + +------------------+-------------------+------------------+--------------+ + | <2, 1> | 4 | 0 | 100% | + +------------------+-------------------+------------------+--------------+ + | <3, 0> | 5 | 3 | 40% | + +------------------+-------------------+------------------+--------------+ + | <3, 1> | 4 | 2 | 50% | + +------------------+-------------------+------------------+--------------+ + | <4, 0> | 2 | 0 | 100% | + +------------------+-------------------+------------------+--------------+ + | <4, 1> | 2 | 1 | 50% | + +------------------+-------------------+------------------+--------------+ + | <5, 0> | 2 | 2 | 0% | + +------------------+-------------------+------------------+--------------+ + */ + + r.GET("/v2/operators/signing-info", testDataApiServerV2.FetchOperatorSigningInfo) + + t.Run("invalid params", func(t *testing.T) { + reqUrls := []string{ + "/v2/operators/signing-info?interval=abc", + "/v2/operators/signing-info?interval=-1", + "/v2/operators/signing-info?end=2006-01-02T15:04:05", + "/v2/operators/signing-info?end=2006-01-02T15:04:05Z", + "/v2/operators/signing-info?quorums=-1", + "/v2/operators/signing-info?quorums=abc", + "/v2/operators/signing-info?quorums=10000000", + "/v2/operators/signing-info?quorums=-1", + "/v2/operators/signing-info?nonsigner_only=-1", + "/v2/operators/signing-info?nonsigner_only=deadbeef", + } + for _, url := range reqUrls { + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, url, nil) + r.ServeHTTP(w, req) + require.Equal(t, http.StatusBadRequest, w.Result().StatusCode) + } + }) + + t.Run("default params", func(t *testing.T) { + w := executeRequest(t, r, http.MethodGet, "/v2/operators/signing-info") + response := decodeResponseBody[serverv2.OperatorsSigningInfoResponse](t, w) + osi := response.OperatorSigningInfo + require.Equal(t, 7, len(osi)) + checkOperatorSigningInfoEqual(t, osi[0], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[3].Hex(), + OperatorAddress: operatorAddresses[3].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 0, + TotalResponsibleBatches: 2, + TotalBatches: 5, + }) + checkOperatorSigningInfoEqual(t, osi[1], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[1].Hex(), + OperatorAddress: operatorAddresses[1].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 0, + TotalResponsibleBatches: 5, + TotalBatches: 5, + }) + checkOperatorSigningInfoEqual(t, osi[2], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[1].Hex(), + OperatorAddress: operatorAddresses[1].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 0, + TotalResponsibleBatches: 4, + TotalBatches: 4, + }) + checkOperatorSigningInfoEqual(t, osi[3], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[3].Hex(), + OperatorAddress: operatorAddresses[3].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 1, + TotalResponsibleBatches: 2, + TotalBatches: 4, + }) + checkOperatorSigningInfoEqual(t, osi[4], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[2].Hex(), + OperatorAddress: operatorAddresses[2].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 2, + TotalResponsibleBatches: 4, + TotalBatches: 4, + }) + checkOperatorSigningInfoEqual(t, osi[5], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[2].Hex(), + OperatorAddress: operatorAddresses[2].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 3, + TotalResponsibleBatches: 5, + TotalBatches: 5, + }) + checkOperatorSigningInfoEqual(t, osi[6], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[4].Hex(), + OperatorAddress: operatorAddresses[4].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 2, + TotalResponsibleBatches: 2, + TotalBatches: 5, + }) + }) + + t.Run("nonsigner only", func(t *testing.T) { + w := executeRequest(t, r, http.MethodGet, "/v2/operators/signing-info?nonsigner_only=true") + response := decodeResponseBody[serverv2.OperatorsSigningInfoResponse](t, w) + osi := response.OperatorSigningInfo + require.Equal(t, 4, len(osi)) + checkOperatorSigningInfoEqual(t, osi[0], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[3].Hex(), + OperatorAddress: operatorAddresses[3].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 1, + TotalResponsibleBatches: 2, + TotalBatches: 4, + }) + checkOperatorSigningInfoEqual(t, osi[1], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[2].Hex(), + OperatorAddress: operatorAddresses[2].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 2, + TotalResponsibleBatches: 4, + TotalBatches: 4, + }) + checkOperatorSigningInfoEqual(t, osi[2], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[2].Hex(), + OperatorAddress: operatorAddresses[2].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 3, + TotalResponsibleBatches: 5, + TotalBatches: 5, + }) + checkOperatorSigningInfoEqual(t, osi[3], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[4].Hex(), + OperatorAddress: operatorAddresses[4].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 2, + TotalResponsibleBatches: 2, + TotalBatches: 5, + }) + }) + + t.Run("quorum 1 only", func(t *testing.T) { + w := executeRequest(t, r, http.MethodGet, "/v2/operators/signing-info?quorums=1") + response := decodeResponseBody[serverv2.OperatorsSigningInfoResponse](t, w) + osi := response.OperatorSigningInfo + require.Equal(t, 3, len(osi)) + checkOperatorSigningInfoEqual(t, osi[0], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[1].Hex(), + OperatorAddress: operatorAddresses[1].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 0, + TotalResponsibleBatches: 4, + TotalBatches: 4, + }) + checkOperatorSigningInfoEqual(t, osi[1], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[3].Hex(), + OperatorAddress: operatorAddresses[3].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 1, + TotalResponsibleBatches: 2, + TotalBatches: 4, + }) + checkOperatorSigningInfoEqual(t, osi[2], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[2].Hex(), + OperatorAddress: operatorAddresses[2].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 2, + TotalResponsibleBatches: 4, + TotalBatches: 4, + }) + }) + + t.Run("custom time range", func(t *testing.T) { + // We query 800 seconds before "now", it should hit the last 2 batches (block 4, 5) + // in the setup table: + // + // +-------+------------+-------------+---------+------------+------------------------+ + // | Batch | AttestedAt | RefBlockNum | Quorums | Nonsigners | Active operators | + // +-------+------------+-------------+---------+------------+------------------------+ + // | 5 | 5 | 4 | 0,1 | 3,5 | 1: {2} | + // | | | | | | 2: {0,1} | + // | | | | | | 3: {0,1} | + // | | | | | | 5: {0} | + // +-------+------------+-------------+---------+------------+------------------------+ + // | 6 | 6 | 5 | 0 | 5 | 1: {2} | + // | | | | | | 2: {0,1} | + // | | | | | | 3: {0,1} | + // | | | | | | 5: {0} | + // +-------+------------+-------------+---------+------------+------------------------+ + // + // which results in: + // + // +------------------+-------------------+------------------+--------------+ + // | | Total responsible | Total nonsigning | Signing rate | + // +------------------+-------------------+------------------+--------------+ + // | <2, 0> | 2 | 0 | 100% | + // +------------------+-------------------+------------------+--------------+ + // | <2, 1> | 1 | 0 | 100% | + // +------------------+-------------------+------------------+--------------+ + // | <3, 0> | 2 | 1 | 50% | + // +------------------+-------------------+------------------+--------------+ + // | <3, 1> | 1 | 1 | 0% | + // +------------------+-------------------+------------------+--------------+ + // | <5, 0> | 2 | 2 | 0% | + // +------------------+-------------------+------------------+--------------+ + + tm := time.Unix(0, int64(now)+1).UTC() + endTime := tm.Format("2006-01-02T15:04:05.999999999Z") + reqUrl := fmt.Sprintf("/v2/operators/signing-info?end=%s&interval=1000", endTime) + w := executeRequest(t, r, http.MethodGet, reqUrl) + response := decodeResponseBody[serverv2.OperatorsSigningInfoResponse](t, w) + osi := response.OperatorSigningInfo + require.Equal(t, 5, len(osi)) + checkOperatorSigningInfoEqual(t, osi[0], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[1].Hex(), + OperatorAddress: operatorAddresses[1].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 0, + TotalResponsibleBatches: 2, + TotalBatches: 2, + }) + checkOperatorSigningInfoEqual(t, osi[1], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[1].Hex(), + OperatorAddress: operatorAddresses[1].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 0, + TotalResponsibleBatches: 1, + TotalBatches: 1, + }) + checkOperatorSigningInfoEqual(t, osi[2], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[2].Hex(), + OperatorAddress: operatorAddresses[2].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 1, + TotalResponsibleBatches: 2, + TotalBatches: 2, + }) + checkOperatorSigningInfoEqual(t, osi[3], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[4].Hex(), + OperatorAddress: operatorAddresses[4].Hex(), + QuorumId: 0, + TotalUnsignedBatches: 2, + TotalResponsibleBatches: 2, + TotalBatches: 2, + }) + checkOperatorSigningInfoEqual(t, osi[4], &serverv2.OperatorSigningInfo{ + OperatorId: operatorIds[2].Hex(), + OperatorAddress: operatorAddresses[2].Hex(), + QuorumId: 1, + TotalUnsignedBatches: 1, + TotalResponsibleBatches: 1, + TotalBatches: 1, + }) + }) + +} + func TestCheckOperatorsReachability(t *testing.T) { r := setUpRouter() @@ -968,3 +1530,40 @@ func TestFetchMetricsThroughputTimeseriesHandler(t *testing.T) { assert.Equal(t, uint64(1701292920), response[0].Timestamp) assert.Equal(t, float64(3.503022666666651e+07), totalThroughput) } + +func createAttestation( + t *testing.T, + refBlockNumber uint64, + attestedAt uint64, + nonsigners []*core.G1Point, + quorums []uint8, +) *corev2.Attestation { + br := make([]byte, 32) + _, err := rand.Read(br) + require.NoError(t, err) + batchHeader := &corev2.BatchHeader{ + BatchRoot: ([32]byte)(br), + ReferenceBlockNumber: refBlockNumber, + } + keyPair, err := core.GenRandomBlsKeys() + assert.NoError(t, err) + apk := keyPair.GetPubKeyG2() + return &corev2.Attestation{ + BatchHeader: batchHeader, + AttestedAt: attestedAt, + NonSignerPubKeys: nonsigners, + APKG2: apk, + QuorumAPKs: map[uint8]*core.G1Point{ + 0: core.NewG1Point(big.NewInt(5), big.NewInt(6)), + 1: core.NewG1Point(big.NewInt(7), big.NewInt(8)), + }, + Sigma: &core.Signature{ + G1Point: core.NewG1Point(big.NewInt(9), big.NewInt(10)), + }, + QuorumNumbers: quorums, + QuorumResults: map[uint8]uint8{ + 0: 100, + 1: 80, + }, + } +}