Skip to content

Commit c705d01

Browse files
authored
Fix Broadcaster interface mismatch to pubsubBroadcaster (#246)
1 parent 00bb231 commit c705d01

File tree

3 files changed

+10
-6
lines changed

3 files changed

+10
-6
lines changed

crdt.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ var (
6060
// all replicas and to retrieve payloads broadcasted.
6161
type Broadcaster interface {
6262
// Send payload to other replicas.
63-
Broadcast([]byte) error
63+
Broadcast(context.Context, []byte) error
6464
// Obtain the next payload received from the network.
65-
Next() ([]byte, error)
65+
Next(context.Context) ([]byte, error)
6666
}
6767

6868
// A SessionDAGService is a Sessions-enabled DAGService. This type of DAG-Service
@@ -351,7 +351,7 @@ func (store *Datastore) handleNext(ctx context.Context) {
351351
default:
352352
}
353353

354-
data, err := store.broadcaster.Next()
354+
data, err := store.broadcaster.Next(ctx)
355355
if err != nil {
356356
if err == ErrNoMoreBroadcast || ctx.Err() != nil {
357357
return
@@ -1301,7 +1301,7 @@ func (store *Datastore) broadcast(ctx context.Context, cids []cid.Cid) error {
13011301
return err
13021302
}
13031303

1304-
err = store.broadcaster.Broadcast(bcastBytes)
1304+
err = store.broadcaster.Broadcast(ctx, bcastBytes)
13051305
if err != nil {
13061306
return errors.Wrapf(err, "error broadcasting %s", cids)
13071307
}

crdt_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func newBroadcasters(t testing.TB, n int) ([]*mockBroadcaster, context.CancelFun
120120
return broadcasters, cancel
121121
}
122122

123-
func (mb *mockBroadcaster) Broadcast(data []byte) error {
123+
func (mb *mockBroadcaster) Broadcast(ctx context.Context, data []byte) error {
124124
var wg sync.WaitGroup
125125

126126
randg := rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -155,10 +155,12 @@ func (mb *mockBroadcaster) Broadcast(data []byte) error {
155155
return nil
156156
}
157157

158-
func (mb *mockBroadcaster) Next() ([]byte, error) {
158+
func (mb *mockBroadcaster) Next(ctx context.Context) ([]byte, error) {
159159
select {
160160
case data := <-mb.myChan:
161161
return data, nil
162+
case <-ctx.Done():
163+
return nil, ErrNoMoreBroadcast
162164
case <-mb.ctx.Done():
163165
return nil, ErrNoMoreBroadcast
164166
}

pubsub_broadcaster.go

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
pubsub "github.com/libp2p/go-libp2p-pubsub"
88
)
99

10+
var _ Broadcaster = (*PubSubBroadcaster)(nil)
11+
1012
// PubSubBroadcaster implements a Broadcaster using libp2p PubSub.
1113
type PubSubBroadcaster struct {
1214
ctx context.Context

0 commit comments

Comments
 (0)