Skip to content

Commit

Permalink
Revert concurrency setting and message processing loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum committed Jan 10, 2024
1 parent 613c95d commit ce3927b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 30 deletions.
2 changes: 2 additions & 0 deletions node/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func (n *Node) subscribeToTopics(ctx context.Context) error {
return fmt.Errorf("could not initialize pubsub: %w", err)
}

n.log.Info().Strs("topics", n.cfg.Topics).Msg("topics node will subscribe to")

// TODO: If some topics/subscriptions failed, cleanup those already subscribed to.
for _, topicName := range n.cfg.Topics {

Expand Down
4 changes: 3 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type Node struct {
executor blockless.Executor
fstore FStore

subgroups workSubgroups
sema chan struct{}
wg *sync.WaitGroup
subgroups workSubgroups
attributes *attributes.Attestation

rollCall *rollCallQueue
Expand Down Expand Up @@ -76,6 +77,7 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore
executor: cfg.Execute,

wg: &sync.WaitGroup{},
sema: make(chan struct{}, cfg.Concurrency),
subgroups: subgroups,

rollCall: newQueue(rollCallQueueBufferSize),
Expand Down
49 changes: 20 additions & 29 deletions node/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ import (
// Run will start the main loop for the node.
func (n *Node) Run(ctx context.Context) error {

n.log.Info().Strs("topics", n.cfg.Topics).Msg("topics node will subscribe to")

err := n.subscribeToTopics(ctx)
if err != nil {
return fmt.Errorf("could not subscribe to topic: %w", err)
return fmt.Errorf("could not subscribe to topics: %w", err)
}

// Sync functions now in case they were removed from the storage.
Expand Down Expand Up @@ -53,17 +51,16 @@ func (n *Node) Run(ctx context.Context) error {

n.log.Info().Uint("concurrency", n.cfg.Concurrency).Msg("starting node main loop")

msgs := make(chan *pubsub.Message, n.cfg.Concurrency)
var topicWorkers sync.WaitGroup
var workers sync.WaitGroup

// Process topic messages - spin up a goroutine for each topic that will feed the main processing loop below.
// No need for locking since we're still single threaded here and these (subscribed) topics will not be touched by other code.
for name, topic := range n.subgroups.topics {

topicWorkers.Add(1)
workers.Add(1)

go func(name string, subscription *pubsub.Subscription) {
defer topicWorkers.Done()
defer workers.Done()

// Message processing loops.
for {
Expand All @@ -83,33 +80,27 @@ func (n *Node) Run(ctx context.Context) error {

n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("received message")

msgs <- msg
// Try to get a slot for processing the request.
n.sema <- struct{}{}
n.wg.Add(1)

go func(msg *pubsub.Message) {
// Free up slot after we're done.
defer n.wg.Done()
defer func() { <-n.sema }()

err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data)
if err != nil {
n.log.Error().Err(err).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("could not process message")
}
}(msg)
}
}(name, topic.subscription)
}

// Read and process messages.
go func() {
for msg := range msgs {

n.log.Debug().Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("processing message")

n.wg.Add(1)
go func(msg *pubsub.Message) {
defer n.wg.Done()

err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data)
if err != nil {
n.log.Error().Err(err).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("could not process message")
}
}(msg)
}
}()
n.log.Debug().Msg("waiting for workers")

// Waiting for topic workers to stop (context canceled).
topicWorkers.Wait()
// Signal that no new messages will be incoming.
close(msgs)
workers.Wait()

n.log.Debug().Msg("waiting for messages being processed")
n.wg.Wait()
Expand Down

0 comments on commit ce3927b

Please sign in to comment.