diff --git a/pkg/indexer/receiver/receiver.go b/pkg/indexer/receiver/receiver.go index 71bb756..212d11d 100644 --- a/pkg/indexer/receiver/receiver.go +++ b/pkg/indexer/receiver/receiver.go @@ -21,6 +21,38 @@ type Result struct { Block Block Traces []starknet.Trace StateUpdate starknetData.StateUpdate + + mx *sync.Mutex +} + +func NewResult() Result { + return Result{ + mx: new(sync.Mutex), + } +} + +func (r *Result) setBlock(block Block) { + r.mx.Lock() + { + r.Block = block + } + r.mx.Unlock() +} + +func (r *Result) setTraces(traces []starknet.Trace) { + r.mx.Lock() + { + r.Traces = traces + } + r.mx.Unlock() +} + +func (r *Result) setStateUpdates(stateUpdate starknetData.StateUpdate) { + r.mx.Lock() + { + r.StateUpdate = stateUpdate + } + r.mx.Unlock() } // Receiver - @@ -100,100 +132,16 @@ func (r *Receiver) worker(ctx context.Context, height uint64) { var ( result Result wg sync.WaitGroup - mx = new(sync.Mutex) ) wg.Add(1) - go func(mx *sync.Mutex, wg *sync.WaitGroup) { - defer wg.Done() - - for { - select { - case <-ctx.Done(): - return - default: - } - - response, err := r.api.GetBlock(ctx, blockId) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - r.log.Err(err).Uint64("height", height).Msg("get block request") - time.Sleep(time.Second) - continue - } - mx.Lock() - { - result.Block = response - } - mx.Unlock() - break - } - }(mx, &wg) + go r.getBlock(ctx, blockId, &result, &wg) wg.Add(1) - go func(mx *sync.Mutex, wg *sync.WaitGroup) { - defer wg.Done() - - api := r.api - for { - select { - case <-ctx.Done(): - return - default: - } - - response, err := api.TraceBlock(ctx, blockId) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - r.log.Err(err).Uint64("height", height).Msg("get block traces request") - time.Sleep(time.Second) - r.log.Warn().Msg("trying fallback node...") - api = r.fallbackAPI - continue - } - - mx.Lock() - { - result.Traces = response - } - mx.Unlock() - break - } - }(mx, &wg) + go r.traceBlock(ctx, blockId, &result, &wg) wg.Add(1) - go func(mx *sync.Mutex, wg *sync.WaitGroup) { - defer wg.Done() - - for { - select { - case <-ctx.Done(): - return - default: - } - - response, err := r.getStateUpdate(ctx, blockId) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - r.log.Err(err).Uint64("height", height).Msg("state update request") - time.Sleep(time.Second) - continue - } - - mx.Lock() - { - result.StateUpdate = response - } - mx.Unlock() - break - } - }(mx, &wg) + go r.receiveStateUpdate(ctx, blockId, &result, &wg) wg.Wait() @@ -261,11 +209,11 @@ func (r *Receiver) QueueSize() int { return r.pool.QueueSize() } -func (r *Receiver) getStateUpdate(ctx context.Context, blockId starknetData.BlockID) (starknetData.StateUpdate, error) { +func (r *Receiver) getStateUpdate(ctx context.Context, api API, blockId starknetData.BlockID) (starknetData.StateUpdate, error) { requestCtx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() - return r.api.GetStateUpdate(requestCtx, blockId) + return api.GetStateUpdate(requestCtx, blockId) } func (r *Receiver) Clear() { @@ -278,3 +226,90 @@ func (r *Receiver) Clear() { delete(r.processing, key) } } + +func (r *Receiver) getBlock(ctx context.Context, blockId starknetData.BlockID, result *Result, wg *sync.WaitGroup) { + defer wg.Done() + + api := r.api + for { + select { + case <-ctx.Done(): + return + default: + } + + response, err := api.GetBlock(ctx, blockId) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.log.Err(err).Uint64("height", *blockId.Number).Msg("get block request") + if r.fallbackAPI != nil { + r.log.Warn().Msg("trying fallback node...") + api = r.fallbackAPI + } + time.Sleep(time.Second) + continue + } + result.setBlock(response) + break + } +} + +func (r *Receiver) traceBlock(ctx context.Context, blockId starknetData.BlockID, result *Result, wg *sync.WaitGroup) { + defer wg.Done() + + api := r.api + for { + select { + case <-ctx.Done(): + return + default: + } + + response, err := api.TraceBlock(ctx, blockId) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.log.Err(err).Uint64("height", *blockId.Number).Msg("get block traces request") + if r.fallbackAPI != nil { + r.log.Warn().Msg("trying fallback node...") + api = r.fallbackAPI + } + time.Sleep(time.Second) + continue + } + result.setTraces(response) + break + } +} + +func (r *Receiver) receiveStateUpdate(ctx context.Context, blockId starknetData.BlockID, result *Result, wg *sync.WaitGroup) { + defer wg.Done() + + api := r.api + for { + select { + case <-ctx.Done(): + return + default: + } + + response, err := r.getStateUpdate(ctx, api, blockId) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.log.Err(err).Uint64("height", *blockId.Number).Msg("state update request") + if r.fallbackAPI != nil { + r.log.Warn().Msg("trying fallback node...") + api = r.fallbackAPI + } + time.Sleep(time.Second) + continue + } + result.setStateUpdates(response) + break + } +}