Skip to content

Commit

Permalink
Encoder API robustness audit/fix (#1236)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Feb 7, 2025
1 parent 51455bf commit aaaee47
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
5 changes: 5 additions & 0 deletions disperser/apiserver/get_blob_status_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/Layr-Labs/eigenda/api"
Expand All @@ -30,6 +31,10 @@ func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatu

metadata, err := s.blobMetadataStore.GetBlobMetadata(ctx, blobKey)
if err != nil {
if strings.Contains(err.Error(), "metadata not found") {
s.logger.Info("blob metadata not found", "err", err, "blobKey", blobKey.Hex())
return nil, api.NewErrorNotFound(fmt.Sprintf("blob metadata not found for blob key: %s", blobKey.Hex()))
}
s.logger.Warn("failed to get blob metadata", "err", err, "blobKey", blobKey.Hex())
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob metadata: %s", err.Error()))
}
Expand Down
39 changes: 23 additions & 16 deletions disperser/encoder/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/Layr-Labs/eigenda/api"
"github.com/Layr-Labs/eigenda/common/healthcheck"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -109,6 +110,11 @@ func (s *EncoderServerV2) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequ
s.metrics.ObserveLatency("total", time.Since(totalStart))
}()

// Validate request first
blobKey, encodingParams, err := s.validateAndParseRequest(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
blobSize := req.GetBlobSize()
sizeBucket := common.BlobSizeBucket(int(blobSize))

Expand All @@ -122,7 +128,7 @@ func (s *EncoderServerV2) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequ
default:
s.metrics.IncrementRateLimitedBlobRequestNum(int(blobSize))
s.logger.Warn("rate limiting as request queue is full", "requestQueueSize", s.config.RequestQueueSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests)
return nil, errors.New("too many requests")
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("request queue is full, max queue size: %d", s.config.RequestQueueSize))
}

// Limit the number of concurrent requests
Expand All @@ -134,7 +140,7 @@ func (s *EncoderServerV2) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequ
}

s.metrics.ObserveLatency("queuing", time.Since(totalStart))
reply, err := s.handleEncodingToChunkStore(ctx, req)
reply, err := s.handleEncodingToChunkStore(ctx, blobKey, encodingParams)
if err != nil {
s.metrics.IncrementFailedBlobRequestNum(int(blobSize))
} else {
Expand All @@ -144,13 +150,7 @@ func (s *EncoderServerV2) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequ
return reply, err
}

func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) {
// Validate request first
blobKey, encodingParams, err := s.validateAndParseRequest(req)
if err != nil {
return nil, err
}

func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams) (*pb.EncodeBlobReply, error) {
s.logger.Info("Preparing to encode", "blobKey", blobKey.Hex(), "encodingParams", encodingParams)

// Check if the blob has already been encoded
Expand Down Expand Up @@ -209,29 +209,36 @@ func (s *EncoderServerV2) validateAndParseRequest(req *pb.EncodeBlobRequest) (co
)

if req == nil {
return blobKey, params, status.Error(codes.InvalidArgument, "request cannot be nil")
return blobKey, params, errors.New("request cannot be nil")
}

if req.BlobKey == nil {
return blobKey, params, status.Error(codes.InvalidArgument, "blob key cannot be nil")
return blobKey, params, errors.New("blob key cannot be nil")
}

if req.EncodingParams == nil {
return blobKey, params, status.Error(codes.InvalidArgument, "encoding parameters cannot be nil")
return blobKey, params, errors.New("encoding parameters cannot be nil")
}

// Since these are uint32 in the proto, we only need to check for positive values
if req.EncodingParams.ChunkLength == 0 {
return blobKey, params, status.Error(codes.InvalidArgument, "chunk length must be greater than zero")
return blobKey, params, errors.New("chunk length must be greater than zero")
}
if req.EncodingParams.ChunkLength&(req.EncodingParams.ChunkLength-1) != 0 {
return blobKey, params, errors.New("chunk length must be power of 2")
}

if req.EncodingParams.NumChunks == 0 {
return blobKey, params, status.Error(codes.InvalidArgument, "number of chunks must be greater than zero")
return blobKey, params, errors.New("number of chunks must be greater than zero")
}

if req.BlobSize == 0 || uint64(encoding.GetBlobLength(uint(req.BlobSize))) > req.EncodingParams.ChunkLength*req.EncodingParams.NumChunks {
return blobKey, params, errors.New("blob size is invalid")
}

blobKey, err := corev2.BytesToBlobKey(req.BlobKey)
if err != nil {
return blobKey, params, status.Errorf(codes.InvalidArgument, "invalid blob key: %v", err)
return blobKey, params, fmt.Errorf("invalid blob key: %v", err)
}

// Convert proto EncodingParams to our domain type
Expand All @@ -242,7 +249,7 @@ func (s *EncoderServerV2) validateAndParseRequest(req *pb.EncodeBlobRequest) (co

err = encoding.ValidateEncodingParams(params, s.prover.GetSRSOrder())
if err != nil {
return blobKey, params, status.Errorf(codes.InvalidArgument, "invalid encoding parameters: %v", err)
return blobKey, params, fmt.Errorf("invalid encoding parameters: %v", err)
}

return blobKey, params, nil
Expand Down
1 change: 1 addition & 0 deletions disperser/encoder/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestEncodeBlob(t *testing.T) {
ChunkLength: uint64(chunkLength),
NumChunks: uint64(numChunks),
},
BlobSize: uint64(blobSize),
}

expectedUploadCalls := 1
Expand Down

0 comments on commit aaaee47

Please sign in to comment.