From 4f82b70ee2503ed3d6d35145189866ec9ea71ecc Mon Sep 17 00:00:00 2001 From: Mike Holly Date: Sat, 21 Oct 2023 11:18:22 -0700 Subject: [PATCH] Logstream: fixes flaky test & improves error handling (#3415) Addresses: https://github.com/earthly/earthly/issues/3381 This test was essentially stuck in an infinite retry loop as the backoff time was always 0. This PR includes some revised error handling and test tweaks to fix the retry issue. The error handling changes ensure errors are always reported when the CLI exits. --- cloud/logstream.go | 60 +++++++++++++++++++++++----------------- cloud/logstream_test.go | 39 ++++++++++++++++---------- logbus/ship/ship.go | 8 +++--- logbus/ship/ship_test.go | 26 ++++++++++------- 4 files changed, 78 insertions(+), 55 deletions(-) diff --git a/cloud/logstream.go b/cloud/logstream.go index ab1d631c..dcefc897 100644 --- a/cloud/logstream.go +++ b/cloud/logstream.go @@ -24,39 +24,47 @@ func (s *StreamError) Error() string { return s.Err.Error() } -func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error { +func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error { if man.GetResumeToken() == "" { man.ResumeToken = stringutil.RandomAlphanumeric(40) } - var ( - errs []error - retry bool - counts []int - last *pb.Delta - ) - for { - first := []*pb.Delta{firstDelta(man, retry)} - if last != nil { - first = append(first, last) - } + errCh := make(chan error) + go func() { + defer close(errCh) var ( - err error - sendCount int + retry bool + counts []int + last *pb.Delta ) - sendCount, last, err = c.streamLogsAttempt(ctx, man.GetBuildId(), first, ch) - if err != nil { - recoverable := recoverableError(err) - errs = append(errs, &StreamError{Err: err, Recoverable: recoverable}) - if recoverable { - retry = true - counts = append(counts, sendCount) - time.Sleep(calcBackoff(c.logstreamBackoff, counts)) - continue + for { + first := []*pb.Delta{firstDelta(man, retry)} + if last != nil { + first = append(first, last) + } + var ( + err error + sendCount int + ) + sendCount, last, err = c.streamLogsAttempt(ctx, man.GetBuildId(), first, ch) + if err != nil { + recoverable := recoverableError(err) + errCh <- &StreamError{Err: err, Recoverable: recoverable} + if recoverable { + retry = true + counts = append(counts, sendCount) + select { + case <-time.After(calcBackoff(c.logstreamBackoff, counts)): + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + continue + } } + break } - break - } - return errs + }() + return errCh } // calcBackoff calculates the time to wait before attempting another stream. The diff --git a/cloud/logstream_test.go b/cloud/logstream_test.go index 1305d8c0..569ff685 100644 --- a/cloud/logstream_test.go +++ b/cloud/logstream_test.go @@ -95,17 +95,19 @@ func TestStreamLogs(t *testing.T) { }, } - cl := &Client{logstream: testClient} + cl := &Client{ + logstream: testClient, + logstreamBackoff: 10 * time.Millisecond, + } - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() man := &pb.RunManifest{ BuildId: uuid.NewString(), } ch := make(chan *pb.Delta) - errsCh := make(chan []error) - go func() { for i := 0; i < 10; i++ { ch <- logDelta("log") @@ -113,11 +115,13 @@ func TestStreamLogs(t *testing.T) { close(ch) }() - go func() { - errsCh <- cl.StreamLogs(ctx, man, ch) - }() + errCh := cl.StreamLogs(ctx, man, ch) + + var errs []error + for err := range errCh { + errs = append(errs, err) + } - errs := <-errsCh require.Empty(t, errs) require.Equal(t, 12, stream.calls["Send"], "expected 10 Sends plus first manifest & EOF (12)") require.Equal(t, 1, stream.calls["Recv"], "expected 1 Recv") @@ -134,16 +138,19 @@ func TestStreamLogsResume(t *testing.T) { }, } - cl := &Client{logstream: testClient} + cl := &Client{ + logstream: testClient, + logstreamBackoff: 10 * time.Millisecond, + } - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() man := &pb.RunManifest{ BuildId: uuid.NewString(), } ch := make(chan *pb.Delta) - errsCh := make(chan []error) go func() { for i := 0; i < 15; i++ { @@ -158,11 +165,13 @@ func TestStreamLogsResume(t *testing.T) { close(ch) }() - go func() { - errsCh <- cl.StreamLogs(ctx, man, ch) - }() + errCh := cl.StreamLogs(ctx, man, ch) + + var errs []error + for err := range errCh { + errs = append(errs, err) + } - errs := <-errsCh require.Len(t, errs, 1) // This is the second stream. diff --git a/logbus/ship/ship.go b/logbus/ship/ship.go index 28b2d830..f5cabf4b 100644 --- a/logbus/ship/ship.go +++ b/logbus/ship/ship.go @@ -12,7 +12,7 @@ import ( ) type streamer interface { - StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error + StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error } // LogShipper subscribes to the Log Bus & streams log entries up to the remote @@ -56,10 +56,10 @@ func (l *LogShipper) Start(ctx context.Context) { ctx, l.cancel = context.WithCancel(ctx) defer l.cancel() out := bufferedDeltaChan(ctx, l.ch) - errs := l.cl.StreamLogs(ctx, l.man, out) - if len(errs) > 0 { + errCh := l.cl.StreamLogs(ctx, l.man, out) + for err := range errCh { l.mu.Lock() - l.errs = errs + l.errs = append(l.errs, err) l.mu.Unlock() } l.done <- struct{}{} diff --git a/logbus/ship/ship_test.go b/logbus/ship/ship_test.go index 9380c852..ecd05a11 100644 --- a/logbus/ship/ship_test.go +++ b/logbus/ship/ship_test.go @@ -13,18 +13,24 @@ type testClient struct { count int } -func (t *testClient) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error { - for { - select { - case _, ok := <-ch: - if !ok { - return nil +func (t *testClient) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error { + errCh := make(chan error) + go func() { + defer close(errCh) + for { + select { + case _, ok := <-ch: + if !ok { + return + } + t.count++ + case <-ctx.Done(): + errCh <- ctx.Err() + return } - t.count++ - case <-ctx.Done(): - return []error{ctx.Err()} } - } + }() + return errCh } func TestLogShipper(t *testing.T) {