Skip to content

Commit

Permalink
erigon-lib: move event notifier and observers to erigon-lib (#13307)
Browse files Browse the repository at this point in the history
moving `EventNotifier` and `Observers` structs from
`polygon/polygoncommon` pkg to `erigon-lib/event` pkg since those will
be used in other packages outside of polygon (e.g. using in `shutter`
pkg as part of #13306)
  • Loading branch information
taratorio authored Jan 2, 2025
1 parent 7316b09 commit ef62be7
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,44 @@
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package polygoncommon
package event

import (
"context"
"sync"
"sync/atomic"
)

// EventNotifier notifies waiters about an event.
// Notifier notifies waiters about an event.
// It supports a single "producer" and multiple waiters.
// A producer can set the event state to "signaled" or "non-signaled".
// Waiters can wait for the "signaled" event state.
type EventNotifier struct {
type Notifier struct {
mutex sync.Mutex
cond *sync.Cond
hasEvent atomic.Bool
}

func NewEventNotifier() *EventNotifier {
instance := &EventNotifier{}
func NewNotifier() *Notifier {
instance := &Notifier{}
instance.cond = sync.NewCond(&instance.mutex)
return instance
}

// Reset to the "non-signaled" state.
func (en *EventNotifier) Reset() {
func (en *Notifier) Reset() {
en.hasEvent.Store(false)
}

// SetAndBroadcast sets the "signaled" state and notifies all waiters.
func (en *EventNotifier) SetAndBroadcast() {
func (en *Notifier) SetAndBroadcast() {
en.hasEvent.Store(true)
en.cond.Broadcast()
}

// Wait for the "signaled" state.
// If the event is already "signaled" it returns immediately.
func (en *EventNotifier) Wait(ctx context.Context) error {
func (en *Notifier) Wait(ctx context.Context) error {
waitCtx, waitCancel := context.WithCancel(ctx)
defer waitCancel()

Expand All @@ -74,7 +74,7 @@ func (en *EventNotifier) Wait(ctx context.Context) error {
<-waitCtx.Done()

// if the parent context is done, force the waiting goroutine to exit
// this might lead to spurious wake ups for other waiters,
// this might lead to spurious wake-ups for other waiters,
// but it is ok due to the waiting loop conditions
en.cond.Broadcast()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package polygoncommon
package event

import (
"sync"
Expand Down
15 changes: 7 additions & 8 deletions polygon/heimdall/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@ import (
"fmt"
"time"

libcommon "github.com/erigontech/erigon-lib/common"
commonerrors "github.com/erigontech/erigon-lib/common/errors"
"github.com/erigontech/erigon-lib/common/generics"
"github.com/erigontech/erigon-lib/event"
"github.com/erigontech/erigon-lib/log/v3"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon/polygon/polygoncommon"
)

type Scraper[TEntity Entity] struct {
name string
store EntityStore[TEntity]
fetcher entityFetcher[TEntity]
pollDelay time.Duration
observers *polygoncommon.Observers[[]TEntity]
syncEvent *polygoncommon.EventNotifier
observers *event.Observers[[]TEntity]
syncEvent *event.Notifier
transientErrors []error
logger log.Logger
}
Expand All @@ -54,8 +53,8 @@ func NewScraper[TEntity Entity](
store: store,
fetcher: fetcher,
pollDelay: pollDelay,
observers: polygoncommon.NewObservers[[]TEntity](),
syncEvent: polygoncommon.NewEventNotifier(),
observers: event.NewObservers[[]TEntity](),
syncEvent: event.NewNotifier(),
transientErrors: transientErrors,
logger: logger,
}
Expand Down Expand Up @@ -146,7 +145,7 @@ func (s *Scraper[TEntity]) Run(ctx context.Context) error {
return ctx.Err()
}

func (s *Scraper[TEntity]) RegisterObserver(observer func([]TEntity)) polygoncommon.UnregisterFunc {
func (s *Scraper[TEntity]) RegisterObserver(observer func([]TEntity)) event.UnregisterFunc {
return s.observers.Register(observer)
}

Expand Down
8 changes: 4 additions & 4 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"golang.org/x/sync/errgroup"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/event"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/bor/valset"
"github.com/erigontech/erigon/polygon/polygoncommon"
)

type ServiceConfig struct {
Expand Down Expand Up @@ -222,7 +222,7 @@ func (s *Service) Producers(ctx context.Context, blockNum uint64) (*valset.Valid
return s.reader.Producers(ctx, blockNum)
}

func (s *Service) RegisterMilestoneObserver(callback func(*Milestone), opts ...ObserverOption) polygoncommon.UnregisterFunc {
func (s *Service) RegisterMilestoneObserver(callback func(*Milestone), opts ...ObserverOption) event.UnregisterFunc {
options := NewObserverOptions(opts...)
return s.milestoneScraper.RegisterObserver(func(entities []*Milestone) {
for _, entity := range libcommon.SliceTakeLast(entities, options.eventsLimit) {
Expand All @@ -231,7 +231,7 @@ func (s *Service) RegisterMilestoneObserver(callback func(*Milestone), opts ...O
})
}

func (s *Service) RegisterCheckpointObserver(callback func(*Checkpoint), opts ...ObserverOption) polygoncommon.UnregisterFunc {
func (s *Service) RegisterCheckpointObserver(callback func(*Checkpoint), opts ...ObserverOption) event.UnregisterFunc {
options := NewObserverOptions(opts...)
return s.checkpointScraper.RegisterObserver(func(entities []*Checkpoint) {
for _, entity := range libcommon.SliceTakeLast(entities, options.eventsLimit) {
Expand All @@ -240,7 +240,7 @@ func (s *Service) RegisterCheckpointObserver(callback func(*Checkpoint), opts ..
})
}

func (s *Service) RegisterSpanObserver(callback func(*Span), opts ...ObserverOption) polygoncommon.UnregisterFunc {
func (s *Service) RegisterSpanObserver(callback func(*Span), opts ...ObserverOption) event.UnregisterFunc {
options := NewObserverOptions(opts...)
return s.spanScraper.RegisterObserver(func(entities []*Span) {
for _, entity := range libcommon.SliceTakeLast(entities, options.eventsLimit) {
Expand Down
39 changes: 19 additions & 20 deletions polygon/p2p/message_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (

"google.golang.org/grpc"

"github.com/erigontech/erigon-lib/log/v3"

"github.com/erigontech/erigon-lib/event"
"github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/p2p/sentry"
"github.com/erigontech/erigon-lib/rlp"
"github.com/erigontech/erigon/eth/protocols/eth"
"github.com/erigontech/erigon/polygon/polygoncommon"
)

type DecodedInboundMessage[TPacket any] struct {
Expand All @@ -38,7 +37,7 @@ type DecodedInboundMessage[TPacket any] struct {
PeerId *PeerId
}

type UnregisterFunc = polygoncommon.UnregisterFunc
type UnregisterFunc = event.UnregisterFunc

func NewMessageListener(
logger log.Logger,
Expand All @@ -51,11 +50,11 @@ func NewMessageListener(
sentryClient: sentryClient,
statusDataFactory: statusDataFactory,
peerPenalizer: peerPenalizer,
newBlockObservers: polygoncommon.NewObservers[*DecodedInboundMessage[*eth.NewBlockPacket]](),
newBlockHashesObservers: polygoncommon.NewObservers[*DecodedInboundMessage[*eth.NewBlockHashesPacket]](),
blockHeadersObservers: polygoncommon.NewObservers[*DecodedInboundMessage[*eth.BlockHeadersPacket66]](),
blockBodiesObservers: polygoncommon.NewObservers[*DecodedInboundMessage[*eth.BlockBodiesPacket66]](),
peerEventObservers: polygoncommon.NewObservers[*sentryproto.PeerEvent](),
newBlockObservers: event.NewObservers[*DecodedInboundMessage[*eth.NewBlockPacket]](),
newBlockHashesObservers: event.NewObservers[*DecodedInboundMessage[*eth.NewBlockHashesPacket]](),
blockHeadersObservers: event.NewObservers[*DecodedInboundMessage[*eth.BlockHeadersPacket66]](),
blockBodiesObservers: event.NewObservers[*DecodedInboundMessage[*eth.BlockBodiesPacket66]](),
peerEventObservers: event.NewObservers[*sentryproto.PeerEvent](),
}
}

Expand All @@ -64,11 +63,11 @@ type MessageListener struct {
sentryClient sentryproto.SentryClient
statusDataFactory sentry.StatusDataFactory
peerPenalizer *PeerPenalizer
newBlockObservers *polygoncommon.Observers[*DecodedInboundMessage[*eth.NewBlockPacket]]
newBlockHashesObservers *polygoncommon.Observers[*DecodedInboundMessage[*eth.NewBlockHashesPacket]]
blockHeadersObservers *polygoncommon.Observers[*DecodedInboundMessage[*eth.BlockHeadersPacket66]]
blockBodiesObservers *polygoncommon.Observers[*DecodedInboundMessage[*eth.BlockBodiesPacket66]]
peerEventObservers *polygoncommon.Observers[*sentryproto.PeerEvent]
newBlockObservers *event.Observers[*DecodedInboundMessage[*eth.NewBlockPacket]]
newBlockHashesObservers *event.Observers[*DecodedInboundMessage[*eth.NewBlockHashesPacket]]
blockHeadersObservers *event.Observers[*DecodedInboundMessage[*eth.BlockHeadersPacket66]]
blockBodiesObservers *event.Observers[*DecodedInboundMessage[*eth.BlockBodiesPacket66]]
peerEventObservers *event.Observers[*sentryproto.PeerEvent]
stopWg sync.WaitGroup
}

Expand Down Expand Up @@ -98,23 +97,23 @@ func (ml *MessageListener) Run(ctx context.Context) error {
return ctx.Err()
}

func (ml *MessageListener) RegisterNewBlockObserver(observer polygoncommon.Observer[*DecodedInboundMessage[*eth.NewBlockPacket]]) UnregisterFunc {
func (ml *MessageListener) RegisterNewBlockObserver(observer event.Observer[*DecodedInboundMessage[*eth.NewBlockPacket]]) UnregisterFunc {
return ml.newBlockObservers.Register(observer)
}

func (ml *MessageListener) RegisterNewBlockHashesObserver(observer polygoncommon.Observer[*DecodedInboundMessage[*eth.NewBlockHashesPacket]]) UnregisterFunc {
func (ml *MessageListener) RegisterNewBlockHashesObserver(observer event.Observer[*DecodedInboundMessage[*eth.NewBlockHashesPacket]]) UnregisterFunc {
return ml.newBlockHashesObservers.Register(observer)
}

func (ml *MessageListener) RegisterBlockHeadersObserver(observer polygoncommon.Observer[*DecodedInboundMessage[*eth.BlockHeadersPacket66]]) UnregisterFunc {
func (ml *MessageListener) RegisterBlockHeadersObserver(observer event.Observer[*DecodedInboundMessage[*eth.BlockHeadersPacket66]]) UnregisterFunc {
return ml.blockHeadersObservers.Register(observer)
}

func (ml *MessageListener) RegisterBlockBodiesObserver(observer polygoncommon.Observer[*DecodedInboundMessage[*eth.BlockBodiesPacket66]]) UnregisterFunc {
func (ml *MessageListener) RegisterBlockBodiesObserver(observer event.Observer[*DecodedInboundMessage[*eth.BlockBodiesPacket66]]) UnregisterFunc {
return ml.blockBodiesObservers.Register(observer)
}

func (ml *MessageListener) RegisterPeerEventObserver(observer polygoncommon.Observer[*sentryproto.PeerEvent]) UnregisterFunc {
func (ml *MessageListener) RegisterPeerEventObserver(observer event.Observer[*sentryproto.PeerEvent]) UnregisterFunc {
return ml.peerEventObservers.Register(observer)
}

Expand Down Expand Up @@ -193,7 +192,7 @@ func notifyInboundMessageObservers[TPacket any](
ctx context.Context,
logger log.Logger,
peerPenalizer *PeerPenalizer,
observers *polygoncommon.Observers[*DecodedInboundMessage[TPacket]],
observers *event.Observers[*DecodedInboundMessage[TPacket]],
message *sentryproto.InboundMessage,
) error {
peerId := PeerIdFromH512(message.PeerId)
Expand Down
8 changes: 4 additions & 4 deletions polygon/p2p/peer_event_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package p2p

import (
"github.com/erigontech/erigon-lib/event"
"github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
"github.com/erigontech/erigon/eth/protocols/eth"
"github.com/erigontech/erigon/polygon/polygoncommon"
)

//go:generate mockgen -typed=true -source=./peer_event_registrar.go -destination=./peer_event_registrar_mock.go -package=p2p
type peerEventRegistrar interface {
RegisterPeerEventObserver(observer polygoncommon.Observer[*sentryproto.PeerEvent]) UnregisterFunc
RegisterNewBlockObserver(observer polygoncommon.Observer[*DecodedInboundMessage[*eth.NewBlockPacket]]) UnregisterFunc
RegisterNewBlockHashesObserver(observer polygoncommon.Observer[*DecodedInboundMessage[*eth.NewBlockHashesPacket]]) UnregisterFunc
RegisterPeerEventObserver(observer event.Observer[*sentryproto.PeerEvent]) UnregisterFunc
RegisterNewBlockObserver(observer event.Observer[*DecodedInboundMessage[*eth.NewBlockPacket]]) UnregisterFunc
RegisterNewBlockHashesObserver(observer event.Observer[*DecodedInboundMessage[*eth.NewBlockHashesPacket]]) UnregisterFunc
}
21 changes: 10 additions & 11 deletions polygon/p2p/peer_event_registrar_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ef62be7

Please sign in to comment.