Skip to content

Commit

Permalink
feat: metrics on meterer usage (#1212)
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen authored Feb 5, 2025
1 parent 85a25f2 commit b4a1f21
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 48 deletions.
36 changes: 18 additions & 18 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,34 @@ func (m *Meterer) Start(ctx context.Context) {

// MeterRequest validates a blob header and adds it to the meterer's state
// TODO: return error if there's a rejection (with reasoning) or internal error (should be very rare)
func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint, quorumNumbers []uint8) error {
func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint, quorumNumbers []uint8) (uint32, error) {
accountID := gethcommon.HexToAddress(header.AccountID)
symbolsCharged := m.SymbolsCharged(numSymbols)
m.logger.Info("Validating incoming request's payment metadata", "paymentMetadata", header, "numSymbols", numSymbols, "quorumNumbers", quorumNumbers)
// Validate against the payment method
if header.CumulativePayment.Sign() == 0 {
reservation, err := m.ChainPaymentState.GetReservedPaymentByAccount(ctx, accountID)
if err != nil {
return fmt.Errorf("failed to get active reservation by account: %w", err)
return 0, fmt.Errorf("failed to get active reservation by account: %w", err)
}
if err := m.ServeReservationRequest(ctx, header, reservation, numSymbols, quorumNumbers); err != nil {
return fmt.Errorf("invalid reservation: %w", err)
if err := m.ServeReservationRequest(ctx, header, reservation, symbolsCharged, quorumNumbers); err != nil {
return 0, fmt.Errorf("invalid reservation: %w", err)
}
} else {
onDemandPayment, err := m.ChainPaymentState.GetOnDemandPaymentByAccount(ctx, accountID)
if err != nil {
return fmt.Errorf("failed to get on-demand payment by account: %w", err)
return 0, fmt.Errorf("failed to get on-demand payment by account: %w", err)
}
if err := m.ServeOnDemandRequest(ctx, header, onDemandPayment, numSymbols, quorumNumbers); err != nil {
return fmt.Errorf("invalid on-demand request: %w", err)
if err := m.ServeOnDemandRequest(ctx, header, onDemandPayment, symbolsCharged, quorumNumbers); err != nil {
return 0, fmt.Errorf("invalid on-demand request: %w", err)
}
}

return nil
return symbolsCharged, nil
}

// ServeReservationRequest handles the rate limiting logic for incoming requests
func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, numSymbols uint, quorumNumbers []uint8) error {
func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, symbolsCharged uint32, quorumNumbers []uint8) error {
m.logger.Info("Recording and validating reservation usage", "header", header, "reservation", reservation)
if !reservation.IsActive(uint64(time.Now().Unix())) {
return fmt.Errorf("reservation not active")
Expand All @@ -112,7 +113,7 @@ func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.Payme
}

// Update bin usage atomically and check against reservation's data rate as the bin limit
if err := m.IncrementBinUsage(ctx, header, reservation, numSymbols); err != nil {
if err := m.IncrementBinUsage(ctx, header, reservation, symbolsCharged); err != nil {
return fmt.Errorf("bin overflows: %w", err)
}

Expand Down Expand Up @@ -151,8 +152,7 @@ func (m *Meterer) ValidateReservationPeriod(header core.PaymentMetadata, reserva
}

// IncrementBinUsage increments the bin usage atomically and checks for overflow
func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, numSymbols uint) error {
symbolsCharged := m.SymbolsCharged(numSymbols)
func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ReservedPayment, symbolsCharged uint32) error {
newUsage, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.ReservationPeriod), uint64(symbolsCharged))
if err != nil {
return fmt.Errorf("failed to increment bin usage: %w", err)
Expand Down Expand Up @@ -188,7 +188,7 @@ func GetReservationPeriod(timestamp uint64, binInterval uint32) uint32 {
// ServeOnDemandRequest handles the rate limiting logic for incoming requests
// On-demand requests doesn't have additional quorum settings and should only be
// allowed by ETH and EIGEN quorums
func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint, headerQuorums []uint8) error {
func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, symbolsCharged uint32, headerQuorums []uint8) error {
m.logger.Info("Recording and validating on-demand usage", "header", header, "onDemandPayment", onDemandPayment)
quorumNumbers, err := m.ChainPaymentState.GetOnDemandQuorumNumbers(ctx)
if err != nil {
Expand All @@ -198,14 +198,13 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM
if err := m.ValidateQuorum(headerQuorums, quorumNumbers); err != nil {
return fmt.Errorf("invalid quorum for On-Demand Request: %w", err)
}
// update blob header to use the miniumum chargeable size
symbolsCharged := m.SymbolsCharged(numSymbols)

err = m.OffchainStore.AddOnDemandPayment(ctx, header, symbolsCharged)
if err != nil {
return fmt.Errorf("failed to update cumulative payment: %w", err)
}
// Validate payments attached
err = m.ValidatePayment(ctx, header, onDemandPayment, numSymbols)
err = m.ValidatePayment(ctx, header, onDemandPayment, symbolsCharged)
if err != nil {
// No tolerance for incorrect payment amounts; no rollbacks
return fmt.Errorf("invalid on-demand payment: %w", err)
Expand All @@ -231,7 +230,7 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM
// prevPmt + PaymentMetadata.numSymbols * m.FixedFeePerByte
// <= PaymentMetadata.CumulativePayment
// <= nextPmt - nextPmtnumSymbols * m.FixedFeePerByte > nextPmt
func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint) error {
func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, symbolsCharged uint32) error {
if header.CumulativePayment.Cmp(onDemandPayment.CumulativePayment) > 0 {
return fmt.Errorf("request claims a cumulative payment greater than the on-chain deposit")
}
Expand All @@ -241,7 +240,7 @@ func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetada
return fmt.Errorf("failed to get relevant on-demand records: %w", err)
}
// the current request must increment cumulative payment by a magnitude sufficient to cover the blob size
if prevPmt.Add(prevPmt, m.PaymentCharged(numSymbols)).Cmp(header.CumulativePayment) > 0 {
if prevPmt.Add(prevPmt, m.PaymentCharged(uint(symbolsCharged))).Cmp(header.CumulativePayment) > 0 {
return fmt.Errorf("insufficient cumulative payment increment")
}
// the current request must not break the payment magnitude for the next payment if the two requests were delivered out-of-order
Expand All @@ -254,6 +253,7 @@ func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetada

// PaymentCharged returns the chargeable price for a given data length
func (m *Meterer) PaymentCharged(numSymbols uint) *big.Int {
// symbolsCharged == m.SymbolsCharged(numSymbols) if numSymbols is already a multiple of MinNumSymbols
symbolsCharged := big.NewInt(int64(m.SymbolsCharged(numSymbols)))
pricePerSymbol := big.NewInt(int64(m.ChainPaymentState.GetPricePerSymbol()))
return symbolsCharged.Mul(symbolsCharged, pricePerSymbol)
Expand Down
43 changes: 23 additions & 20 deletions core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,16 @@ func TestMetererReservations(t *testing.T) {

// test invalid quorom ID
header := createPaymentHeader(1, big.NewInt(0), accountID1)
err := mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
_, err := mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "quorum number mismatch")

// overwhelming bin overflow for empty bins
header = createPaymentHeader(reservationPeriod-1, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, 10, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 10, quoromNumbers)
assert.NoError(t, err)
// overwhelming bin overflow for empty bins
header = createPaymentHeader(reservationPeriod-1, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, 1000, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 1000, quoromNumbers)
assert.ErrorContains(t, err, "overflow usage exceeds bin limit")

// test non-existent account
Expand All @@ -217,41 +217,42 @@ func TestMetererReservations(t *testing.T) {
}
header = createPaymentHeader(1, big.NewInt(0), crypto.PubkeyToAddress(unregisteredUser.PublicKey))
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
_, err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "failed to get active reservation by account: reservation not found")

// test inactive reservation
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID3)
err = mt.MeterRequest(ctx, *header, 1000, []uint8{0})
_, err = mt.MeterRequest(ctx, *header, 1000, []uint8{0})
assert.ErrorContains(t, err, "reservation not active")

// test invalid reservation period
header = createPaymentHeader(reservationPeriod-3, big.NewInt(0), accountID1)
err = mt.MeterRequest(ctx, *header, 2000, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 2000, quoromNumbers)
assert.ErrorContains(t, err, "invalid reservation period for reservation")

// test bin usage metering
symbolLength := uint(20)
requiredLength := uint(21) // 21 should be charged for length of 20 since minNumSymbols is 3
for i := 0; i < 9; i++ {
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quoromNumbers)
symbolsCharged, err := mt.MeterRequest(ctx, *header, symbolLength, quoromNumbers)
assert.NoError(t, err)
item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{
"AccountID": &types.AttributeValueMemberS{Value: accountID2.Hex()},
"ReservationPeriod": &types.AttributeValueMemberN{Value: strconv.Itoa(int(reservationPeriod))},
})
assert.NoError(t, err)
assert.Equal(t, uint32(requiredLength), symbolsCharged)
assert.Equal(t, accountID2.Hex(), item["AccountID"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, strconv.Itoa(int(reservationPeriod)), item["ReservationPeriod"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, strconv.Itoa((i+1)*int(requiredLength)), item["BinUsage"].(*types.AttributeValueMemberN).Value)

}
// first over flow is allowed
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID2)
symbolsCharged, err := mt.MeterRequest(ctx, *header, 25, quoromNumbers)
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 25, quoromNumbers)
assert.NoError(t, err)
assert.Equal(t, uint32(27), symbolsCharged)
overflowedReservationPeriod := reservationPeriod + 2
item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{
"AccountID": &types.AttributeValueMemberS{Value: accountID2.Hex()},
Expand All @@ -266,7 +267,7 @@ func TestMetererReservations(t *testing.T) {
// second over flow
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID2)
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 1, quoromNumbers)
_, err = mt.MeterRequest(ctx, *header, 1, quoromNumbers)
assert.ErrorContains(t, err, "bin has already been filled")
}

Expand All @@ -293,17 +294,17 @@ func TestMetererOnDemand(t *testing.T) {
}
header := createPaymentHeader(reservationPeriod, big.NewInt(2), crypto.PubkeyToAddress(unregisteredUser.PublicKey))
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
assert.ErrorContains(t, err, "failed to get on-demand payment by account: payment not found")

// test invalid quorom ID
header = createPaymentHeader(reservationPeriod, big.NewInt(2), accountID1)
err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
_, err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "invalid quorum for On-Demand Request")

// test insufficient cumulative payment
header = createPaymentHeader(reservationPeriod, big.NewInt(1), accountID1)
err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers)
assert.ErrorContains(t, err, "insufficient cumulative payment increment")
// No rollback after meter request
result, err := dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{
Expand All @@ -318,36 +319,38 @@ func TestMetererOnDemand(t *testing.T) {
priceCharged := mt.PaymentCharged(symbolLength)
assert.Equal(t, big.NewInt(int64(102*mt.ChainPaymentState.GetPricePerSymbol())), priceCharged)
header = createPaymentHeader(reservationPeriod, priceCharged, accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
symbolsCharged, err := mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.NoError(t, err)
assert.Equal(t, uint32(102), symbolsCharged)
header = createPaymentHeader(reservationPeriod, priceCharged, accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.ErrorContains(t, err, "exact payment already exists")

// test valid payments
for i := 1; i < 9; i++ {
header = createPaymentHeader(reservationPeriod, new(big.Int).Mul(priceCharged, big.NewInt(int64(i+1))), accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
symbolsCharged, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.NoError(t, err)
assert.Equal(t, uint32(102), symbolsCharged)
}

// test cumulative payment on-chain constraint
header = createPaymentHeader(reservationPeriod, big.NewInt(2023), accountID2)
err = mt.MeterRequest(ctx, *header, 1, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1, quorumNumbers)
assert.ErrorContains(t, err, "invalid on-demand payment: request claims a cumulative payment greater than the on-chain deposit")

// test insufficient increment in cumulative payment
previousCumulativePayment := priceCharged.Mul(priceCharged, big.NewInt(9))
symbolLength = uint(2)
priceCharged = mt.PaymentCharged(symbolLength)
header = createPaymentHeader(reservationPeriod, big.NewInt(0).Add(previousCumulativePayment, big.NewInt(0).Sub(priceCharged, big.NewInt(1))), accountID2)
err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers)
assert.ErrorContains(t, err, "invalid on-demand payment: insufficient cumulative payment increment")
previousCumulativePayment = big.NewInt(0).Add(previousCumulativePayment, priceCharged)

// test cannot insert cumulative payment in out of order
header = createPaymentHeader(reservationPeriod, mt.PaymentCharged(50), accountID2)
err = mt.MeterRequest(ctx, *header, 50, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 50, quorumNumbers)
assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants")

numPrevRecords := 12
Expand All @@ -359,7 +362,7 @@ func TestMetererOnDemand(t *testing.T) {
assert.Equal(t, numPrevRecords, len(result))
// test failed global rate limit (previously payment recorded: 2, global limit: 1009)
header = createPaymentHeader(reservationPeriod, big.NewInt(0).Add(previousCumulativePayment, mt.PaymentCharged(1010)), accountID1)
err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers)
_, err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers)
assert.ErrorContains(t, err, "failed global rate limiting")
// Correct rollback
result, err = dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{
Expand Down
9 changes: 4 additions & 5 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
if onchainState == nil {
return nil, api.NewErrorInternal("onchain state is nil")
}
if err := s.validateDispersalRequest(ctx, req, onchainState); err != nil {
if err := s.validateDispersalRequest(req, onchainState); err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate the request: %v", err))
}

Expand All @@ -40,9 +40,8 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
finishedValidation := time.Now()
s.metrics.reportValidateDispersalRequestLatency(finishedValidation.Sub(start))

s.metrics.reportDisperseBlobSize(len(req.GetBlob()))

blob := req.GetBlob()
s.metrics.reportDisperseBlobSize(len(blob))
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to parse the blob header proto: %v", err))
Expand Down Expand Up @@ -118,16 +117,16 @@ func (s *DispersalServerV2) checkPaymentMeter(ctx context.Context, req *pb.Dispe
CumulativePayment: cumulativePayment,
}

err = s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
symbolsCharged, err := s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
if err != nil {
return api.NewErrorResourceExhausted(err.Error())
}
s.metrics.reportDisperseMeteredBytes(int(symbolsCharged) * encoding.BYTES_PER_SYMBOL)

return nil
}

func (s *DispersalServerV2) validateDispersalRequest(
ctx context.Context,
req *pb.DisperseBlobRequest,
onchainState *OnchainState) error {

Expand Down
23 changes: 19 additions & 4 deletions disperser/apiserver/metrics_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type metricsV2 struct {
getBlobCommitmentLatency *prometheus.SummaryVec
getPaymentStateLatency *prometheus.SummaryVec
disperseBlobLatency *prometheus.SummaryVec
disperseBlobSize *prometheus.GaugeVec
disperseBlobSize *prometheus.CounterVec
disperseBlobMeteredBytes *prometheus.CounterVec
validateDispersalRequestLatency *prometheus.SummaryVec
storeBlobLatency *prometheus.SummaryVec
getBlobStatusLatency *prometheus.SummaryVec
Expand Down Expand Up @@ -79,15 +80,24 @@ func newAPIServerV2Metrics(registry *prometheus.Registry, metricsConfig disperse
[]string{},
)

disperseBlobSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
disperseBlobSize := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "disperse_blob_size_bytes",
Help: "The size of the blob in bytes.",
},
[]string{},
)

disperseBlobMeteredBytes := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "disperse_blob_metered_bytes",
Help: "The number of bytes charged for the blob.",
},
[]string{},
)

validateDispersalRequestLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Expand Down Expand Up @@ -124,6 +134,7 @@ func newAPIServerV2Metrics(registry *prometheus.Registry, metricsConfig disperse
getPaymentStateLatency: getPaymentStateLatency,
disperseBlobLatency: disperseBlobLatency,
disperseBlobSize: disperseBlobSize,
disperseBlobMeteredBytes: disperseBlobMeteredBytes,
validateDispersalRequestLatency: validateDispersalRequestLatency,
storeBlobLatency: storeBlobLatency,
getBlobStatusLatency: getBlobStatusLatency,
Expand Down Expand Up @@ -162,7 +173,11 @@ func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) {
}

func (m *metricsV2) reportDisperseBlobSize(size int) {
m.disperseBlobSize.WithLabelValues().Set(float64(size))
m.disperseBlobSize.WithLabelValues().Add(float64(size))
}

func (m *metricsV2) reportDisperseMeteredBytes(usageInBytes int) {
m.disperseBlobMeteredBytes.WithLabelValues().Add(float64(usageInBytes))
}

func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) {
Expand Down
Loading

0 comments on commit b4a1f21

Please sign in to comment.