diff --git a/cloud/logstream.go b/cloud/logstream.go index ab1d631c..dcefc897 100644 --- a/cloud/logstream.go +++ b/cloud/logstream.go @@ -24,39 +24,47 @@ func (s *StreamError) Error() string { return s.Err.Error() } -func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error { +func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error { if man.GetResumeToken() == "" { man.ResumeToken = stringutil.RandomAlphanumeric(40) } - var ( - errs []error - retry bool - counts []int - last *pb.Delta - ) - for { - first := []*pb.Delta{firstDelta(man, retry)} - if last != nil { - first = append(first, last) - } + errCh := make(chan error) + go func() { + defer close(errCh) var ( - err error - sendCount int + retry bool + counts []int + last *pb.Delta ) - sendCount, last, err = c.streamLogsAttempt(ctx, man.GetBuildId(), first, ch) - if err != nil { - recoverable := recoverableError(err) - errs = append(errs, &StreamError{Err: err, Recoverable: recoverable}) - if recoverable { - retry = true - counts = append(counts, sendCount) - time.Sleep(calcBackoff(c.logstreamBackoff, counts)) - continue + for { + first := []*pb.Delta{firstDelta(man, retry)} + if last != nil { + first = append(first, last) + } + var ( + err error + sendCount int + ) + sendCount, last, err = c.streamLogsAttempt(ctx, man.GetBuildId(), first, ch) + if err != nil { + recoverable := recoverableError(err) + errCh <- &StreamError{Err: err, Recoverable: recoverable} + if recoverable { + retry = true + counts = append(counts, sendCount) + select { + case <-time.After(calcBackoff(c.logstreamBackoff, counts)): + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + continue + } } + break } - break - } - return errs + }() + return errCh } // calcBackoff calculates the time to wait before attempting another stream. The diff --git a/cloud/logstream_test.go b/cloud/logstream_test.go index 1305d8c0..569ff685 100644 --- a/cloud/logstream_test.go +++ b/cloud/logstream_test.go @@ -95,17 +95,19 @@ func TestStreamLogs(t *testing.T) { }, } - cl := &Client{logstream: testClient} + cl := &Client{ + logstream: testClient, + logstreamBackoff: 10 * time.Millisecond, + } - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() man := &pb.RunManifest{ BuildId: uuid.NewString(), } ch := make(chan *pb.Delta) - errsCh := make(chan []error) - go func() { for i := 0; i < 10; i++ { ch <- logDelta("log") @@ -113,11 +115,13 @@ func TestStreamLogs(t *testing.T) { close(ch) }() - go func() { - errsCh <- cl.StreamLogs(ctx, man, ch) - }() + errCh := cl.StreamLogs(ctx, man, ch) + + var errs []error + for err := range errCh { + errs = append(errs, err) + } - errs := <-errsCh require.Empty(t, errs) require.Equal(t, 12, stream.calls["Send"], "expected 10 Sends plus first manifest & EOF (12)") require.Equal(t, 1, stream.calls["Recv"], "expected 1 Recv") @@ -134,16 +138,19 @@ func TestStreamLogsResume(t *testing.T) { }, } - cl := &Client{logstream: testClient} + cl := &Client{ + logstream: testClient, + logstreamBackoff: 10 * time.Millisecond, + } - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() man := &pb.RunManifest{ BuildId: uuid.NewString(), } ch := make(chan *pb.Delta) - errsCh := make(chan []error) go func() { for i := 0; i < 15; i++ { @@ -158,11 +165,13 @@ func TestStreamLogsResume(t *testing.T) { close(ch) }() - go func() { - errsCh <- cl.StreamLogs(ctx, man, ch) - }() + errCh := cl.StreamLogs(ctx, man, ch) + + var errs []error + for err := range errCh { + errs = append(errs, err) + } - errs := <-errsCh require.Len(t, errs, 1) // This is the second stream. diff --git a/logbus/ship/ship.go b/logbus/ship/ship.go index 28b2d830..f5cabf4b 100644 --- a/logbus/ship/ship.go +++ b/logbus/ship/ship.go @@ -12,7 +12,7 @@ import ( ) type streamer interface { - StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error + StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error } // LogShipper subscribes to the Log Bus & streams log entries up to the remote @@ -56,10 +56,10 @@ func (l *LogShipper) Start(ctx context.Context) { ctx, l.cancel = context.WithCancel(ctx) defer l.cancel() out := bufferedDeltaChan(ctx, l.ch) - errs := l.cl.StreamLogs(ctx, l.man, out) - if len(errs) > 0 { + errCh := l.cl.StreamLogs(ctx, l.man, out) + for err := range errCh { l.mu.Lock() - l.errs = errs + l.errs = append(l.errs, err) l.mu.Unlock() } l.done <- struct{}{} diff --git a/logbus/ship/ship_test.go b/logbus/ship/ship_test.go index 9380c852..ecd05a11 100644 --- a/logbus/ship/ship_test.go +++ b/logbus/ship/ship_test.go @@ -13,18 +13,24 @@ type testClient struct { count int } -func (t *testClient) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error { - for { - select { - case _, ok := <-ch: - if !ok { - return nil +func (t *testClient) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error { + errCh := make(chan error) + go func() { + defer close(errCh) + for { + select { + case _, ok := <-ch: + if !ok { + return + } + t.count++ + case <-ctx.Done(): + errCh <- ctx.Err() + return } - t.count++ - case <-ctx.Done(): - return []error{ctx.Err()} } - } + }() + return errCh } func TestLogShipper(t *testing.T) {