Skip to content

Commit

Permalink
Logstream: fixes flaky test & improves error handling (#3415)
Browse files Browse the repository at this point in the history
Addresses: earthly/earthly#3381

This test was essentially stuck in an infinite retry loop as the backoff
time was always 0. This PR includes some revised error handling and test
tweaks to fix the retry issue. The error handling changes ensure errors
are always reported when the CLI exits.
  • Loading branch information
mikejholly authored Oct 21, 2023
1 parent a3069ce commit 4f82b70
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 55 deletions.
60 changes: 34 additions & 26 deletions cloud/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,47 @@ func (s *StreamError) Error() string {
return s.Err.Error()
}

func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error {
func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error {
if man.GetResumeToken() == "" {
man.ResumeToken = stringutil.RandomAlphanumeric(40)
}
var (
errs []error
retry bool
counts []int
last *pb.Delta
)
for {
first := []*pb.Delta{firstDelta(man, retry)}
if last != nil {
first = append(first, last)
}
errCh := make(chan error)
go func() {
defer close(errCh)
var (
err error
sendCount int
retry bool
counts []int
last *pb.Delta
)
sendCount, last, err = c.streamLogsAttempt(ctx, man.GetBuildId(), first, ch)
if err != nil {
recoverable := recoverableError(err)
errs = append(errs, &StreamError{Err: err, Recoverable: recoverable})
if recoverable {
retry = true
counts = append(counts, sendCount)
time.Sleep(calcBackoff(c.logstreamBackoff, counts))
continue
for {
first := []*pb.Delta{firstDelta(man, retry)}
if last != nil {
first = append(first, last)
}
var (
err error
sendCount int
)
sendCount, last, err = c.streamLogsAttempt(ctx, man.GetBuildId(), first, ch)
if err != nil {
recoverable := recoverableError(err)
errCh <- &StreamError{Err: err, Recoverable: recoverable}
if recoverable {
retry = true
counts = append(counts, sendCount)
select {
case <-time.After(calcBackoff(c.logstreamBackoff, counts)):
case <-ctx.Done():
errCh <- ctx.Err()
return
}
continue
}
}
break
}
break
}
return errs
}()
return errCh
}

// calcBackoff calculates the time to wait before attempting another stream. The
Expand Down
39 changes: 24 additions & 15 deletions cloud/logstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,33 @@ func TestStreamLogs(t *testing.T) {
},
}

cl := &Client{logstream: testClient}
cl := &Client{
logstream: testClient,
logstreamBackoff: 10 * time.Millisecond,
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

man := &pb.RunManifest{
BuildId: uuid.NewString(),
}

ch := make(chan *pb.Delta)
errsCh := make(chan []error)

go func() {
for i := 0; i < 10; i++ {
ch <- logDelta("log")
}
close(ch)
}()

go func() {
errsCh <- cl.StreamLogs(ctx, man, ch)
}()
errCh := cl.StreamLogs(ctx, man, ch)

var errs []error
for err := range errCh {
errs = append(errs, err)
}

errs := <-errsCh
require.Empty(t, errs)
require.Equal(t, 12, stream.calls["Send"], "expected 10 Sends plus first manifest & EOF (12)")
require.Equal(t, 1, stream.calls["Recv"], "expected 1 Recv")
Expand All @@ -134,16 +138,19 @@ func TestStreamLogsResume(t *testing.T) {
},
}

cl := &Client{logstream: testClient}
cl := &Client{
logstream: testClient,
logstreamBackoff: 10 * time.Millisecond,
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

man := &pb.RunManifest{
BuildId: uuid.NewString(),
}

ch := make(chan *pb.Delta)
errsCh := make(chan []error)

go func() {
for i := 0; i < 15; i++ {
Expand All @@ -158,11 +165,13 @@ func TestStreamLogsResume(t *testing.T) {
close(ch)
}()

go func() {
errsCh <- cl.StreamLogs(ctx, man, ch)
}()
errCh := cl.StreamLogs(ctx, man, ch)

var errs []error
for err := range errCh {
errs = append(errs, err)
}

errs := <-errsCh
require.Len(t, errs, 1)

// This is the second stream.
Expand Down
8 changes: 4 additions & 4 deletions logbus/ship/ship.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type streamer interface {
StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error
StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error
}

// LogShipper subscribes to the Log Bus & streams log entries up to the remote
Expand Down Expand Up @@ -56,10 +56,10 @@ func (l *LogShipper) Start(ctx context.Context) {
ctx, l.cancel = context.WithCancel(ctx)
defer l.cancel()
out := bufferedDeltaChan(ctx, l.ch)
errs := l.cl.StreamLogs(ctx, l.man, out)
if len(errs) > 0 {
errCh := l.cl.StreamLogs(ctx, l.man, out)
for err := range errCh {
l.mu.Lock()
l.errs = errs
l.errs = append(l.errs, err)
l.mu.Unlock()
}
l.done <- struct{}{}
Expand Down
26 changes: 16 additions & 10 deletions logbus/ship/ship_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@ type testClient struct {
count int
}

func (t *testClient) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) []error {
for {
select {
case _, ok := <-ch:
if !ok {
return nil
func (t *testClient) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta) <-chan error {
errCh := make(chan error)
go func() {
defer close(errCh)
for {
select {
case _, ok := <-ch:
if !ok {
return
}
t.count++
case <-ctx.Done():
errCh <- ctx.Err()
return
}
t.count++
case <-ctx.Done():
return []error{ctx.Err()}
}
}
}()
return errCh
}

func TestLogShipper(t *testing.T) {
Expand Down

0 comments on commit 4f82b70

Please sign in to comment.