Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve rpc errors logging #6362

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
56 changes: 31 additions & 25 deletions circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@ type FunctorCallStatus struct {
Err error
}

func (cr CommandResult) Result() []any {
func (cr *CommandResult) Result() []any {
return cr.res
}

func (cr CommandResult) Error() error {
func (cr *CommandResult) Error() error {
return cr.err
}
func (cr CommandResult) Cancelled() bool {

func (cr *CommandResult) Cancelled() bool {
return cr.cancelled
}

func (cr CommandResult) FunctorCallStatuses() []FunctorCallStatus {
func (cr *CommandResult) FunctorCallStatuses() []FunctorCallStatus {
return cr.functorCallStatuses
}

func (cr *CommandResult) addCallStatus(circuitName string, err error) {
func (cr *CommandResult) addCallStatus(providerName string, err error) {
cr.functorCallStatuses = append(cr.functorCallStatuses, FunctorCallStatus{
Name: circuitName,
Name: providerName,
Timestamp: time.Now(),
Err: err,
})
Expand All @@ -62,8 +63,8 @@ func NewCommand(ctx context.Context, functors []*Functor) *Command {
}
}

func (cmd *Command) Add(ftor *Functor) {
cmd.functors = append(cmd.functors, ftor)
func (cmd *Command) Add(functor *Functor) {
cmd.functors = append(cmd.functors, functor)
}

func (cmd *Command) IsEmpty() bool {
Expand Down Expand Up @@ -94,14 +95,19 @@ func NewCircuitBreaker(config Config) *CircuitBreaker {
}

type Functor struct {
exec FallbackFunc
circuitName string
exec FallbackFunc
circuitName string
providerName string
}

func NewFunctor(exec FallbackFunc, circuitName string) *Functor {
// NewFunctor creates a new Functor with the provided FallbackFunc, circuitName and providerName.
// The circuitName is the name of the circuit to be used by the Functor. If the circuitName is empty,
// or there is only one Functor in the Command, the command will be executed without a circuit.
func NewFunctor(exec FallbackFunc, circuitName, providerName string) *Functor {
return &Functor{
exec: exec,
circuitName: circuitName,
exec: exec,
circuitName: circuitName,
providerName: providerName,
}
}

Expand All @@ -115,7 +121,7 @@ func accumulateCommandError(result CommandResult, circuitName string, err error)
return result
}

// Executes the command in its circuit if set.
// Execute the command in its circuit if set.
// If the command's circuit is not configured, the circuit of the CircuitBreaker is used.
// This is a blocking function.
func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
Expand All @@ -137,19 +143,20 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {

var err error
circuitName := f.circuitName
providerName := f.providerName
if cb.circuitNameHandler != nil {
circuitName = cb.circuitNameHandler(circuitName)
}

// if last command, execute without circuit
if i == len(cmd.functors)-1 {
if i == len(cmd.functors)-1 || circuitName == "" {
res, execErr := f.exec()
err = execErr
if err == nil {
result.res = res
result.err = nil
}
result.addCallStatus(circuitName, err)
result.addCallStatus(f.providerName, err)
} else {
if hystrix.GetCircuitSettings()[circuitName] == nil {
hystrix.ConfigureCommand(circuitName, hystrix.CommandConfig{
Expand All @@ -168,10 +175,10 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
result.res = res
result.err = nil
}
result.addCallStatus(circuitName, err)
result.addCallStatus(f.providerName, err)

// If the command has been cancelled, we don't count
// the error towars breaking the circuit, and then we break
// the error towards breaking the circuit, and then we break
if cmd.cancel {
result = accumulateCommandError(result, circuitName, err)
result.cancelled = true
Expand All @@ -187,25 +194,24 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
break
}

result = accumulateCommandError(result, circuitName, err)

// Lets abuse every provider with the same amount of MaxConcurrentRequests,
result = accumulateCommandError(result, providerName, err)
// Let's abuse every provider with the same amount of MaxConcurrentRequests,
// keep iterating even in case of ErrMaxConcurrency error
}
return result
}

func (c *CircuitBreaker) SetOverrideCircuitNameHandler(f func(string) string) {
c.circuitNameHandler = f
func (cb *CircuitBreaker) SetOverrideCircuitNameHandler(f func(string) string) {
cb.circuitNameHandler = f
}

// Expects a circuit to exist because a new circuit is always closed.
// Call CircuitExists to check if a circuit exists.
// IsCircuitOpen Expects a circuit to exist because a new circuit is always closed.
func IsCircuitOpen(circuitName string) bool {
circuit, wasCreated, _ := hystrix.GetCircuit(circuitName)
return !wasCreated && circuit.IsOpen()
}

// CircuitExists checks if a circuit exists.
func CircuitExists(circuitName string) bool {
_, wasCreated, _ := hystrix.GetCircuit(circuitName)
return !wasCreated
Expand Down
57 changes: 28 additions & 29 deletions circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCircuitBreaker_ExecuteSuccessSingle(t *testing.T) {
cmd := NewCommand(context.TODO(), []*Functor{
NewFunctor(func() ([]interface{}, error) {
return []any{expectedResult}, nil
}, circuitName)},
}, circuitName, "")},
)

result := cb.Execute(cmd)
Expand All @@ -53,13 +53,13 @@ func TestCircuitBreaker_ExecuteMultipleFallbacksFail(t *testing.T) {
NewFunctor(func() ([]interface{}, error) {
time.Sleep(100 * time.Millisecond) // will cause hystrix: timeout
return []any{success}, nil
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
return nil, errSecProvFailed
}, circuitName+"2"),
}, circuitName+"2", ""),
NewFunctor(func() ([]interface{}, error) {
return nil, errThirdProvFailed
}, circuitName+"3"),
}, circuitName+"3", ""),
})

result := cb.Execute(cmd)
Expand Down Expand Up @@ -87,13 +87,13 @@ func TestCircuitBreaker_ExecuteMultipleFallbacksFailButLastSuccessStress(t *test
cmd := NewCommand(context.TODO(), []*Functor{
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 1 failed")
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 2 failed")
}, circuitName+"2"),
}, circuitName+"2", ""),
NewFunctor(func() ([]interface{}, error) {
return []any{expectedResult}, nil
}, circuitName+"3"),
}, circuitName+"3", ""),
},
)

Expand All @@ -120,15 +120,15 @@ func TestCircuitBreaker_ExecuteSwitchToWorkingProviderOnVolumeThresholdReached(t
NewFunctor(func() ([]interface{}, error) {
prov1Called++
return nil, errors.New("provider 1 failed")
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
prov2Called++
return nil, errors.New("provider 2 failed")
}, circuitName+"2"),
}, circuitName+"2", ""),
NewFunctor(func() ([]interface{}, error) {
prov3Called++
return []any{expectedResult}, nil
}, circuitName+"3"),
}, circuitName+"3", ""),
})

result := cb.Execute(cmd)
Expand Down Expand Up @@ -160,11 +160,11 @@ func TestCircuitBreaker_ExecuteHealthCheckOnWindowTimeout(t *testing.T) {
NewFunctor(func() ([]interface{}, error) {
prov1Called++
return nil, errors.New("provider 1 failed")
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
prov2Called++
return []any{expectedResult}, nil
}, circuitName+"2"),
}, circuitName+"2", ""),
})

result := cb.Execute(cmd)
Expand All @@ -183,11 +183,11 @@ func TestCircuitBreaker_ExecuteHealthCheckOnWindowTimeout(t *testing.T) {
NewFunctor(func() ([]interface{}, error) {
prov1Called++
return []any{expectedResult}, nil // Now it is working
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
prov2Called++
return []any{expectedResult}, nil
}, circuitName+"2"),
}, circuitName+"2", ""),
})
result := cb.Execute(cmd)
require.NoError(t, result.Error())
Expand All @@ -212,19 +212,18 @@ func TestCircuitBreaker_CommandCancel(t *testing.T) {
prov1Called++
cmd.Cancel()
return nil, expectedErr
}, circuitName+"1"))
}, circuitName+"1", ""))
cmd.Add(NewFunctor(func() ([]interface{}, error) {
prov2Called++
return nil, errors.New("provider 2 failed")
}, circuitName+"2"))
}, circuitName+"2", ""))

result := cb.Execute(cmd)
require.True(t, errors.Is(result.Error(), expectedErr))
require.True(t, result.Cancelled())

assert.Equal(t, 1, prov1Called)
assert.Equal(t, 0, prov2Called)

}

func TestCircuitBreaker_EmptyOrNilCommand(t *testing.T) {
Expand All @@ -247,11 +246,11 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) {
// We add it twice as otherwise it's only used for the fallback
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, nil
}, existCircuit))
}, existCircuit, ""))

cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, nil
}, existCircuit))
}, existCircuit, ""))
_ = cb.Execute(cmd)
require.True(t, CircuitExists(existCircuit))
require.False(t, IsCircuitOpen(existCircuit))
Expand All @@ -276,10 +275,10 @@ func TestCircuitBreaker_Fallback(t *testing.T) {
cmd := NewCommand(ctx, nil)
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, expectedErr
}, circuitName+"1"))
}, circuitName+"1", ""))
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 2 failed")
}, circuitName+"2"))
}, circuitName+"2", ""))

result := cb.Execute(cmd)
require.NotNil(t, result.Error())
Expand All @@ -297,7 +296,7 @@ func TestCircuitBreaker_Fallback(t *testing.T) {
cmd.Add(NewFunctor(func() ([]interface{}, error) {
prov1Called++
return nil, expectedErr
}, circuitName+"1"))
}, circuitName+"1", ""))

result := cb.Execute(cmd)
require.True(t, errors.Is(result.Error(), expectedErr))
Expand All @@ -310,7 +309,7 @@ func TestCircuitBreaker_SuccessCallStatus(t *testing.T) {

functor := NewFunctor(func() ([]any, error) {
return []any{"success"}, nil
}, "successCircuit")
}, "successCircuit", "")

cmd := NewCommand(context.Background(), []*Functor{functor})

Expand All @@ -337,7 +336,7 @@ func TestCircuitBreaker_ErrorCallStatus(t *testing.T) {
expectedError := errors.New("functor error")
functor := NewFunctor(func() ([]any, error) {
return nil, expectedError
}, "errorCircuit")
}, "errorCircuit", "")

cmd := NewCommand(context.Background(), []*Functor{functor})

Expand All @@ -364,7 +363,7 @@ func TestCircuitBreaker_CancelledResult(t *testing.T) {
functor := NewFunctor(func() ([]any, error) {
time.Sleep(500 * time.Millisecond)
return []any{"should not be returned"}, nil
}, "cancelCircuit")
}, "cancelCircuit", "")

cmd := NewCommand(context.Background(), []*Functor{functor})
cmd.Cancel()
Expand All @@ -388,11 +387,11 @@ func TestCircuitBreaker_MultipleFunctorsResult(t *testing.T) {

functor1 := NewFunctor(func() ([]any, error) {
return nil, errors.New("functor1 error")
}, "circuit1")
}, "circuit1", "")

functor2 := NewFunctor(func() ([]any, error) {
return []any{"success from functor2"}, nil
}, "circuit2")
}, "circuit2", "")

cmd := NewCommand(context.Background(), []*Functor{functor1, functor2})

Expand Down Expand Up @@ -424,11 +423,11 @@ func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) {
failingFunctor := NewFunctor(func() ([]any, error) {
time.Sleep(20 * time.Millisecond)
return nil, errors.New("should time out")
}, "circuitName")
}, "circuitName", "")

successFunctor := NewFunctor(func() ([]any, error) {
return []any{"success without circuit"}, nil
}, "circuitName")
}, "circuitName", "")

cmd := NewCommand(context.Background(), []*Functor{failingFunctor, successFunctor})

Expand Down
6 changes: 6 additions & 0 deletions healthmanager/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (a *Aggregator) Update(providerStatus rpcstatus.ProviderStatus) {
// Update existing provider status or add a new provider.
if ps, exists := a.providerStatuses[providerStatus.Name]; exists {
ps.Status = providerStatus.Status
ps.ChainID = providerStatus.ChainID
Copy link
Contributor

@dlipicar dlipicar Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this layer supposed to be chainID-agnostic? (as in, we don't deal with chainIDs with market providers, but we do once we go into a more specific layer of RPC/collectibles providers)

if providerStatus.Status == rpcstatus.StatusUp {
ps.LastSuccessAt = providerStatus.LastSuccessAt
} else if providerStatus.Status == rpcstatus.StatusDown {
Expand All @@ -53,6 +54,7 @@ func (a *Aggregator) Update(providerStatus rpcstatus.ProviderStatus) {
} else {
a.providerStatuses[providerStatus.Name] = &rpcstatus.ProviderStatus{
Name: providerStatus.Name,
ChainID: providerStatus.ChainID,
LastSuccessAt: providerStatus.LastSuccessAt,
LastErrorAt: providerStatus.LastErrorAt,
LastError: providerStatus.LastError,
Expand Down Expand Up @@ -81,6 +83,8 @@ func (a *Aggregator) ComputeAggregatedStatus() rpcstatus.ProviderStatus {
var lastError error
anyUp := false
anyUnknown := false
// chainid
var chainID uint64 = 0

for _, ps := range a.providerStatuses {
switch ps.Status {
Expand All @@ -97,10 +101,12 @@ func (a *Aggregator) ComputeAggregatedStatus() rpcstatus.ProviderStatus {
lastError = ps.LastError
}
}
chainID = ps.ChainID
}

aggregatedStatus := rpcstatus.ProviderStatus{
Name: a.name,
ChainID: chainID,
LastSuccessAt: lastSuccessAt,
LastErrorAt: lastErrorAt,
LastError: lastError,
Expand Down
4 changes: 2 additions & 2 deletions healthmanager/blockchain_health_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type BlockchainHealthManager struct {
aggregator *aggregator.Aggregator
subscriptionManager *SubscriptionManager

providers map[uint64]*ProvidersHealthManager
cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions
providers map[uint64]*ProvidersHealthManager // ChainID to its providers health manager
cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions
lastStatus *BlockchainStatus
wg sync.WaitGroup
}
Expand Down
Loading
Loading