diff --git a/stream_test.go b/stream_test.go index eb01ee3..7503372 100644 --- a/stream_test.go +++ b/stream_test.go @@ -99,6 +99,15 @@ type testReadWriter struct { serverBuffer bytes.Buffer } +func (rw *testReadWriter) maybeBroadcastEmpty() { + if rw.serverBuffer.Len() == 0 { + select { + case rw.readEmptyChan <- struct{}{}: + default: + } + } +} + func (rw *testReadWriter) Read(buf []byte) (int, error) { if rw.queuedReadError != nil { err := rw.queuedReadError @@ -112,14 +121,10 @@ func (rw *testReadWriter) Read(buf []byte) (int, error) { if err == io.EOF { err = nil } + rw.maybeBroadcastEmpty() return s, err } - select { - case rw.readEmptyChan <- struct{}{}: - default: - } - // Read from server. We're either waiting for this whole test to // finish or for data to come in from the server buffer. We expect // only one read to be happening at once. @@ -130,6 +135,7 @@ func (rw *testReadWriter) Read(buf []byte) (int, error) { if err == io.EOF { err = nil } + rw.maybeBroadcastEmpty() return s, err case <-rw.exiting: return 0, io.EOF