Skip to content

Commit

Permalink
Added Adapter module
Browse files Browse the repository at this point in the history
  • Loading branch information
k-karuna committed Dec 19, 2024
1 parent 6aa2dff commit 46937ab
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 16 deletions.
6 changes: 3 additions & 3 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package indexer
import (
"bytes"
"context"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver"
sqdRcvr "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -54,7 +54,7 @@ type Indexer struct {
state *state
idGenerator *generator.IdGenerator
receiver *receiver.Receiver
sqdReceiver *sqd_receiver.Receiver
sqdReceiver *sqdRcvr.Receiver
statusChecker *statusChecker
rollbackManager models.Rollback

Expand Down Expand Up @@ -98,7 +98,7 @@ func New(

switch cfg.Datasource {
case "subsquid":
sqdReceiver, err := sqd_receiver.New(
sqdReceiver, err := sqdRcvr.New(
cfg,
datasource,
cfg.StartLevel,
Expand Down
40 changes: 40 additions & 0 deletions pkg/indexer/subsquid/adapter/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package adapter

import (
"context"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
)

type Adapter struct {
modules.BaseModule
}

var _ modules.Module = (*Adapter)(nil)

const (
InputName = "blocks"
OutputName = "parsed_blocks"
StopOutput = "stop"
)

func New() Adapter {
m := Adapter{
BaseModule: modules.New("sqd adapter"),
}
m.CreateInputWithCapacity(InputName, 128)
m.CreateOutput(OutputName)
m.CreateOutput(StopOutput)

return m
}

func (a *Adapter) Start(ctx context.Context) {
a.Log.Info().Msg("starting...")
a.G.GoCtx(ctx, a.listen)
}

func (a *Adapter) Close() error {
a.Log.Info().Msg("closing...")
a.G.Wait()
return nil
}
36 changes: 36 additions & 0 deletions pkg/indexer/subsquid/adapter/listen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package adapter

import (
"context"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api"
)

func (a *Adapter) listen(ctx context.Context) {
a.Log.Info().Msg("module started")

input := a.MustInput(InputName)

for {
select {
case <-ctx.Done():
return
case msg, ok := <-input.Listen():
if !ok {
a.Log.Warn().Msg("can't read message from input, it was drained and closed")
a.MustOutput(StopOutput).Push(struct{}{})
return
}

block, ok := msg.(*api.SqdBlockResponse)

if !ok {
a.Log.Warn().Msgf("invalid message type: %T", msg)
continue
}

a.Log.Info().
Uint64("level", block.Header.Number).
Msg("received block")
}
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ type Fields struct {
}

type BlockField struct {
Timestamp bool `json:"timestamp"`
Timestamp bool `json:"timestamp"`
ParentHash bool `json:"parentHash,omitempty"`
Status bool `json:"status,omitempty"`
NewRoot bool `json:"newRoot,omitempty"`
SequencerAddress bool `json:"sequencerAddress,omitempty"`
}

type StateUpdateField struct {
Expand Down Expand Up @@ -98,7 +102,11 @@ func NewRequest(fromLevel uint64, toLevel uint64) *Request {
IncludeAllBlocks: true,
Fields: Fields{
Block: BlockField{
Timestamp: true,
ParentHash: true,
Status: true,
NewRoot: true,
Timestamp: true,
SequencerAddress: true,
},
StateUpdate: StateUpdateField{
NewRoot: true,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package sqd_receiver
package receiver

import (
"context"
rcvr "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api"
api2 "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api"
"github.com/dipdup-io/workerpool"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/pkg/errors"
Expand All @@ -29,11 +29,11 @@ const (

type Receiver struct {
modules.BaseModule
api *api.Subsquid
api *api2.Subsquid
startLevel uint64
level uint64
threadsCount int
blocks chan *api.SqdBlockResponse
blocks chan *api2.SqdBlockResponse
getIndexerHeight GetIndexerHeight
pool *workerpool.Pool[BlocksToWorker]
processing map[uint64]struct{}
Expand All @@ -57,12 +57,12 @@ func New(cfg config.Config,
}

receiver := &Receiver{
BaseModule: modules.New("subsquid receiver"),
BaseModule: modules.New("sqd receiver"),
startLevel: startLevel,
getIndexerHeight: getIndexerHeight,
threadsCount: threadsCount,
api: api.NewSubsquid(dsCfg),
blocks: make(chan *api.SqdBlockResponse, cfg.ThreadsCount*10),
api: api2.NewSubsquid(dsCfg),
blocks: make(chan *api2.SqdBlockResponse, cfg.ThreadsCount*10),
processing: make(map[uint64]struct{}),
processingMx: new(sync.Mutex),
timeout: time.Duration(cfg.Timeout) * time.Second,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package sqd_receiver
package receiver

import (
"context"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api"
"github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api"
)

func (r *Receiver) sequencer(ctx context.Context) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sqd_receiver
package receiver

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sqd_receiver
package receiver

import (
"context"
Expand Down

0 comments on commit 46937ab

Please sign in to comment.