Skip to content

Commit

Permalink
Redundant write on EigenDA failure
Browse files Browse the repository at this point in the history
If EigenDA fails for any reason, we still want to write our commitment to the DA layer if we have caches or fallbacks enabled.

Since certificate is not available at this point, a keccak of the payload has to be used as commitment.
  • Loading branch information
Inkvi committed Jan 29, 2025
1 parent 0e7dcaf commit 07fc191
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 31 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ In order to disperse to the EigenDA network in production, or at high throughput
- [Metrics](#metrics)
- [Flags](#flags)
- [Resources](#resources)


## Deployment Guide

Expand Down Expand Up @@ -170,6 +170,8 @@ In the event that the EigenDA disperser or network is down, the proxy will retur

This behavior is turned on by default, but configurable via the `--eigenda.confirmation-timeout` flag (set to 15 mins by default currently). If a blob is not confirmed within this time, the proxy will return a 503 status code. This should be set long enough to accomodate for the disperser's batching interval (typically 10 minutes), signature gathering, and onchain submission.

This behavior can be modified to write to a secondary storage target when EigenDA write fails by setting the `--store.enable-write-on-eigenda-failure` flag to `true`. This flag only works with OP Stack-based rollups.

## Blob Lifecycle

> Warning: the below diagrams describe EigenDA V2 interactions. EigenDA V1 is very similar, but has slight discrepancies.
Expand Down
76 changes: 76 additions & 0 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,79 @@ func TestProxyReadFallback(t *testing.T) {
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.Standard)
}

// Ensures that when EigenDA write fails and enable-write-on-eigenda-failure is true,
// the data is not written to the secondary storage and the write fails.
func TestProxyRedundantWriteOnEigenDAFailure(t *testing.T) {
if !runIntegrationTests || runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION env var not set")
}

t.Parallel()

// Setup server with S3 as secondary and simulate EigenDA failure
testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.UseWriteFallback = true
testCfg.SimulateEigenDAFailure = true

tsConfig := e2e.TestSuiteConfig(testCfg)
ts, kill := e2e.CreateTestSuite(tsConfig)
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)

// Write data when EigenDA is "failing"
expectedBlob := e2e.RandBytes(1_000_000)
t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, expectedBlob)
require.NoError(t, err)

// Try to read data - should succeed because it was written to secondary
t.Log("Getting input data from proxy server...")
actualBlob, err := daClient.GetData(ts.Ctx, blobInfo)
require.NoError(t, err)
require.Equal(t, expectedBlob, actualBlob)

// Verify metrics show secondary write and read
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
}

// Ensures that when EigenDA write fails and enable-write-on-eigenda-failure is false,
// the data is not written to the secondary storage.
func TestProxyRedundantWriteDisabled(t *testing.T) {
if !runIntegrationTests || runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION env var not set")
}

t.Parallel()

// Setup server with S3 as secondary and simulate EigenDA failure
testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.UseWriteFallback = false
testCfg.SimulateEigenDAFailure = true

tsConfig := e2e.TestSuiteConfig(testCfg)
ts, kill := e2e.CreateTestSuite(tsConfig)
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)

// Write data when EigenDA is "failing"
expectedBlob := e2e.RandBytes(1_000_000)
t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, expectedBlob)
require.Error(t, err)

// Try to read data - should fail because it was not written to secondary
t.Log("Getting input data from proxy server...")
_, err = daClient.GetData(ts.Ctx, blobInfo)
require.Error(t, err)
}
27 changes: 17 additions & 10 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,23 @@ type Cfg struct {
UseS3Caching bool
UseRedisCaching bool
UseS3Fallback bool
// enable writing to secondary storage on EigenDA failure
UseWriteFallback bool
// simulate EigenDA failure
SimulateEigenDAFailure bool
}

func TestConfig(useMemory bool) *Cfg {
return &Cfg{
UseMemory: useMemory,
Expiration: 14 * 24 * time.Hour,
UseKeccak256ModeS3: false,
UseS3Caching: false,
UseRedisCaching: false,
UseS3Fallback: false,
WriteThreadCount: 0,
UseMemory: useMemory,
Expiration: 14 * 24 * time.Hour,
UseKeccak256ModeS3: false,
UseS3Caching: false,
UseRedisCaching: false,
UseS3Fallback: false,
WriteThreadCount: 0,
UseWriteFallback: false,
SimulateEigenDAFailure: false,
}
}

Expand Down Expand Up @@ -210,10 +216,11 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
},
MemstoreEnabled: testCfg.UseMemory,
MemstoreConfig: memstore.Config{
BlobExpiration: testCfg.Expiration,
MaxBlobSizeBytes: maxBlobLengthBytes,
BlobExpiration: testCfg.Expiration,
MaxBlobSizeBytes: maxBlobLengthBytes,
SimulateEigenDAFailure: testCfg.SimulateEigenDAFailure,
},

UseWriteFallback: testCfg.UseWriteFallback,
StorageConfig: store.Config{
AsyncPutWorkers: testCfg.WriteThreadCount,
},
Expand Down
11 changes: 10 additions & 1 deletion flags/eigendaflags/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ var (
EthRPCURLFlagName = withFlagPrefix("eth-rpc")
SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr")
// Flags that are proxy specific, and not used by the eigenda-client
PutRetriesFlagName = withFlagPrefix("put-retries")
PutRetriesFlagName = withFlagPrefix("put-retries")
EnableWriteFallbackFlagName = withFlagPrefix("enable-write-on-eigenda-failure")
)

func withFlagPrefix(s string) string {
Expand Down Expand Up @@ -163,6 +164,14 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETRIES")},
Category: category,
},
&cli.BoolFlag{
Name: EnableWriteFallbackFlagName,
Usage: "Enable writing to secondary storage when EigenDA write fails. " +
"DANGER: incompatible with Nitro stack!!! Default is false.",
Value: false,
EnvVars: []string{withEnvPrefix(envPrefix, "WRITE_ON_EIGENDA_FAILURE")},
Category: category,
},
}
}

Expand Down
24 changes: 13 additions & 11 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import (
)

type Config struct {
EdaClientConfig clients.EigenDAClientConfig
MemstoreConfig memstore.Config
StorageConfig store.Config
VerifierConfig verify.Config
PutRetries uint
EdaClientConfig clients.EigenDAClientConfig
MemstoreConfig memstore.Config
StorageConfig store.Config
VerifierConfig verify.Config
PutRetries uint
UseWriteFallback bool

MemstoreEnabled bool
}
Expand All @@ -28,12 +29,13 @@ type Config struct {
func ReadConfig(ctx *cli.Context) Config {
edaClientConfig := eigendaflags.ReadConfig(ctx)
return Config{
EdaClientConfig: edaClientConfig,
VerifierConfig: verify.ReadConfig(ctx, edaClientConfig),
PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName),
MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName),
MemstoreConfig: memstore.ReadConfig(ctx),
StorageConfig: store.ReadConfig(ctx),
EdaClientConfig: edaClientConfig,
VerifierConfig: verify.ReadConfig(ctx, edaClientConfig),
PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName),
UseWriteFallback: ctx.Bool(eigendaflags.EnableWriteFallbackFlagName),
MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName),
MemstoreConfig: memstore.ReadConfig(ctx),
StorageConfig: store.ReadConfig(ctx),
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,5 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr
}

log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil)
return store.NewManager(eigenDA, s3Store, log, secondary)
return store.NewManager(eigenDA, s3Store, log, secondary, cfg.EigenDAConfig.UseWriteFallback)
}
13 changes: 13 additions & 0 deletions store/generated_key/memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
// artificial latency added for memstore backend to mimic eigenda's latency
PutLatency time.Duration
GetLatency time.Duration
// SimulateEigenDAFailure forces Put operations to fail, simulating EigenDA failures
SimulateEigenDAFailure bool
}

/*
Expand Down Expand Up @@ -108,6 +110,12 @@ func (e *MemStore) pruneExpired() {
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.config.GetLatency)
e.reads++

// Simulate EigenDA failure if configured
if e.config.SimulateEigenDAFailure {
return nil, fmt.Errorf("simulated EigenDA failure")
}

e.RLock()
defer e.RUnlock()

Expand Down Expand Up @@ -147,6 +155,11 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
e.Lock()
defer e.Unlock()

// Simulate EigenDA failure if configured
if e.config.SimulateEigenDAFailure {
return nil, fmt.Errorf("simulated EigenDA failure")
}

commitment, err := e.verifier.Commit(encodedVal)
if err != nil {
return nil, err
Expand Down
33 changes: 27 additions & 6 deletions store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -27,16 +28,18 @@ type Manager struct {

// secondary storage backends (caching and fallbacks)
secondary ISecondary
// redundant write flag
useWriteFallback bool
}

// NewManager ... Init
func NewManager(eigenda common.GeneratedKeyStore, s3 common.PrecomputedKeyStore, l log.Logger,
secondary ISecondary) (IManager, error) {
func NewManager(eigenda common.GeneratedKeyStore, s3 common.PrecomputedKeyStore, l log.Logger, secondary ISecondary, useWriteFallback bool) (IManager, error) {
return &Manager{
log: l,
eigenda: eigenda,
s3: s3,
secondary: secondary,
log: l,
eigenda: eigenda,
s3: s3,
secondary: secondary,
useWriteFallback: useWriteFallback,
}, nil
}

Expand Down Expand Up @@ -124,6 +127,24 @@ func (m *Manager) Put(ctx context.Context, cm commitments.CommitmentMode, key, v
}

if err != nil {
log.Error("Failed to write to EigenDA backend", "err", err)

// don't do redundant write to hide the misuse/misconfiguration of the proxy
if errors.Is(err, common.ErrProxyOversizedBlob) {
return nil, err
}

// write to EigenDA failed, which shouldn't happen if the backend is functioning properly
// use the payload as the key to keep the op-batcher alive
if m.useWriteFallback && m.secondary.Enabled() && !m.secondary.AsyncWriteEntry() {
redundantErr := m.secondary.HandleRedundantWrites(ctx, value, value)
if redundantErr != nil {
log.Error("Failed to write to redundant backends", "err", redundantErr)
return nil, redundantErr
}

return crypto.Keccak256(value), nil
}
return nil, err
}

Expand Down
20 changes: 19 additions & 1 deletion store/secondary.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package store

import (
"bytes"
"context"
"errors"
"net/http"
"sync"

"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/metrics"
verifypackage "github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"

"github.com/ethereum/go-ethereum/log"
)
Expand Down Expand Up @@ -151,6 +154,14 @@ func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []by
}

key := crypto.Keccak256(commitment)

// check if key is an RLP encoded certificate, if not, assume it's a cache key
var cert verifypackage.Certificate
err := rlp.DecodeBytes(commitment, &cert)
if err != nil {
key = commitment
}

for _, src := range sources {
cb := sm.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodGet)
data, err := src.Get(ctx, key)
Expand All @@ -168,7 +179,14 @@ func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []by

// verify cert:data using provided verification function
sm.verifyLock.Lock()
err = verify(ctx, commitment, data)

if bytes.Equal(key, commitment) {
err = src.Verify(ctx, commitment, data)
} else {
// verify cert:data using EigenDA verification checks
err = verify(ctx, commitment, data)
}

if err != nil {
cb(Failed)
log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType())
Expand Down

0 comments on commit 07fc191

Please sign in to comment.