Skip to content

Commit

Permalink
Added splitting block ranged for smaller chunks to download; Finalize…
Browse files Browse the repository at this point in the history
…d working sequencer
  • Loading branch information
k-karuna committed Dec 19, 2024
1 parent 03a4417 commit b9395a2
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 131 deletions.
36 changes: 0 additions & 36 deletions pkg/indexer/sequencer/listen.go

This file was deleted.

47 changes: 0 additions & 47 deletions pkg/indexer/sequencer/sequencer.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/indexer/sqd_receiver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func (s *Subsquid) GetWorkerUrl(_ context.Context, startLevel uint64) (string, e
return response.Body().AsString()
}

func (s *Subsquid) GetBlocks(_ context.Context, from uint64, workerUrl string) ([]*SqdBlockResponse, error) {
func (s *Subsquid) GetBlocks(_ context.Context, from, to uint64, workerUrl string) ([]*SqdBlockResponse, error) {
var workerClient = fastshot.NewClient(workerUrl).
Build()

response, err := workerClient.POST("").
Header().AddContentType(mime.JSON).
Body().AsJSON(NewRequest(from)).
Body().AsJSON(NewRequest(from, to)).
Send()

if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/indexer/sqd_receiver/api/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
type Request struct {
Type string `json:"type"`
FromBlock uint64 `json:"fromBlock"`
ToBlock uint64 `json:"toBlock,omitempty"`
IncludeAllBlocks bool `json:"includeAllBlocks"`
Fields Fields `json:"fields,omitempty"`
StateUpdates []map[string]any `json:"stateUpdates,omitempty"`
Expand Down Expand Up @@ -89,10 +90,11 @@ type TransactionWithTrace struct {
Events bool `json:"events"`
}

func NewRequest(fromLevel uint64) *Request {
func NewRequest(fromLevel uint64, toLevel uint64) *Request {
return &Request{
Type: "starknet",
FromBlock: fromLevel,
ToBlock: toLevel,
IncludeAllBlocks: true,
Fields: Fields{
Block: BlockField{
Expand Down
24 changes: 23 additions & 1 deletion pkg/indexer/sqd_receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *Receiver) checkQueue(ctx context.Context) bool {
case <-ctx.Done():
return true
default:
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 10)
}
}

Expand Down Expand Up @@ -187,6 +187,28 @@ func (r *Receiver) GetSqdWorkerRanges(ctx context.Context, fromLevel, height uin
return result, nil
}

func (r *Receiver) SplitWorkerRanger(workerRanges []BlocksToWorker) []BlocksToWorker {
var result []BlocksToWorker
batchSize := uint64(200)

for _, worker := range workerRanges {
for start := worker.From; start <= worker.To; start += batchSize {
end := start + batchSize - 1
if end > worker.To {
end = worker.To
}

result = append(result, BlocksToWorker{
From: start,
To: end,
WorkerURL: worker.WorkerURL,
})
}
}

return result
}

func (r *Receiver) Level() uint64 {
r.mx.RLock()
defer r.mx.RUnlock()
Expand Down
8 changes: 0 additions & 8 deletions pkg/indexer/sqd_receiver/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ func (r *Receiver) sequencer(ctx context.Context) {
b, ok := orderedBlocks[currentBlock]
for ok {
r.MustOutput(OutputName).Push(b)
r.Log.Info().
Uint64("ID", b.Header.Number).
Msg("sended block")

r.setLevel(currentBlock)
r.Log.Debug().
Uint64("height", currentBlock).
Msg("put in order block")

delete(orderedBlocks, currentBlock)
currentBlock += 1

Expand Down
11 changes: 1 addition & 10 deletions pkg/indexer/sqd_receiver/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ func (r *Receiver) sync(ctx context.Context) {
Uint64("indexer_height", r.getIndexerHeight()).
Uint64("node_height", head).
Msg("rollback detected by block height")

// todo: makeRollback
//if err := f.indexer.makeRollback(ctx, head); err != nil {
// return errors.Wrap(err, "makeRollback")
//}
}

r.log.Info().
Expand All @@ -41,14 +36,10 @@ func (r *Receiver) sync(ctx context.Context) {
return
}

for _, blockRange := range blocksToWorker {
for _, blockRange := range r.SplitWorkerRanger(blocksToWorker) {
select {
case <-ctx.Done():
return
// todo: f.indexer.rollback
//case <-f.indexer.rollback:
// log.Info().Msg("stop receiving blocks")
// return nil
default:
if r.checkQueue(ctx) {
return
Expand Down
42 changes: 16 additions & 26 deletions pkg/indexer/sqd_receiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,39 @@ package sqd_receiver

import (
"context"
"github.com/pkg/errors"
"time"
"github.com/rs/zerolog/log"
)

func (r *Receiver) worker(ctx context.Context, blockRange BlocksToWorker) {
r.log.Info().
Str("URL", blockRange.WorkerURL).
Msg("worker handling sqd worker...")

from := blockRange.From
for {
var allBlocksDownloaded bool

for !allBlocksDownloaded {
select {
case <-ctx.Done():
return
// todo: indexer.rollback
//case <-f.indexer.rollback:
// log.Info().Msg("stop receiving blocks")
// return
default:
blocks, err := r.api.GetBlocks(ctx, from, blockRange.WorkerURL)
blocks, err := r.api.GetBlocks(ctx, from, blockRange.To, blockRange.WorkerURL)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
time.Sleep(time.Second)
continue
log.Err(err).
Uint64("fromLevel", from).
Uint64("toLevel", blockRange.To).
Str("worker url", blockRange.WorkerURL).
Msg("loading blocks error")
return
}

lastBlock := blocks[len(blocks)-1]
if lastBlock.Header.Number == blockRange.To {
break
}
from = lastBlock.Header.Number + 1

for _, block := range blocks {
r.blocks <- block
}

r.log.Info().
Uint64("From", blocks[0].Header.Number).
Uint64("To", lastBlock.Header.Number).
Msg("worker received blocks")
if lastBlock.Header.Number == blockRange.To {
allBlocksDownloaded = true
} else {
from = lastBlock.Header.Number + 1
}
}
}

}

0 comments on commit b9395a2

Please sign in to comment.