diff --git a/node/message.go b/node/message.go index 03bd2186..d2f2d17e 100644 --- a/node/message.go +++ b/node/message.go @@ -21,6 +21,8 @@ func (n *Node) subscribeToTopics(ctx context.Context) error { return fmt.Errorf("could not initialize pubsub: %w", err) } + n.log.Info().Strs("topics", n.cfg.Topics).Msg("topics node will subscribe to") + // TODO: If some topics/subscriptions failed, cleanup those already subscribed to. for _, topicName := range n.cfg.Topics { diff --git a/node/node.go b/node/node.go index bf44cd95..50d08a0f 100644 --- a/node/node.go +++ b/node/node.go @@ -31,8 +31,9 @@ type Node struct { executor blockless.Executor fstore FStore - subgroups workSubgroups + sema chan struct{} wg *sync.WaitGroup + subgroups workSubgroups attributes *attributes.Attestation rollCall *rollCallQueue @@ -76,6 +77,7 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore executor: cfg.Execute, wg: &sync.WaitGroup{}, + sema: make(chan struct{}, cfg.Concurrency), subgroups: subgroups, rollCall: newQueue(rollCallQueueBufferSize), diff --git a/node/run.go b/node/run.go index d0bfa47c..94f33212 100644 --- a/node/run.go +++ b/node/run.go @@ -17,11 +17,9 @@ import ( // Run will start the main loop for the node. func (n *Node) Run(ctx context.Context) error { - n.log.Info().Strs("topics", n.cfg.Topics).Msg("topics node will subscribe to") - err := n.subscribeToTopics(ctx) if err != nil { - return fmt.Errorf("could not subscribe to topic: %w", err) + return fmt.Errorf("could not subscribe to topics: %w", err) } // Sync functions now in case they were removed from the storage. @@ -53,17 +51,16 @@ func (n *Node) Run(ctx context.Context) error { n.log.Info().Uint("concurrency", n.cfg.Concurrency).Msg("starting node main loop") - msgs := make(chan *pubsub.Message, n.cfg.Concurrency) - var topicWorkers sync.WaitGroup + var workers sync.WaitGroup // Process topic messages - spin up a goroutine for each topic that will feed the main processing loop below. // No need for locking since we're still single threaded here and these (subscribed) topics will not be touched by other code. for name, topic := range n.subgroups.topics { - topicWorkers.Add(1) + workers.Add(1) go func(name string, subscription *pubsub.Subscription) { - defer topicWorkers.Done() + defer workers.Done() // Message processing loops. for { @@ -83,33 +80,27 @@ func (n *Node) Run(ctx context.Context) error { n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("received message") - msgs <- msg + // Try to get a slot for processing the request. + n.sema <- struct{}{} + n.wg.Add(1) + + go func(msg *pubsub.Message) { + // Free up slot after we're done. + defer n.wg.Done() + defer func() { <-n.sema }() + + err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) + if err != nil { + n.log.Error().Err(err).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("could not process message") + } + }(msg) } }(name, topic.subscription) } - // Read and process messages. - go func() { - for msg := range msgs { - - n.log.Debug().Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("processing message") - - n.wg.Add(1) - go func(msg *pubsub.Message) { - defer n.wg.Done() - - err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) - if err != nil { - n.log.Error().Err(err).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("could not process message") - } - }(msg) - } - }() + n.log.Debug().Msg("waiting for workers") - // Waiting for topic workers to stop (context canceled). - topicWorkers.Wait() - // Signal that no new messages will be incoming. - close(msgs) + workers.Wait() n.log.Debug().Msg("waiting for messages being processed") n.wg.Wait()