Skip to content

Commit

Permalink
Logstream: improve message draining (#3955)
Browse files Browse the repository at this point in the history
The previous PR earthly/earthly#3953 surfaced
that not all messages were being written after a user-initiated
cancelation. This PR adds a short sleep to allow the messages to be
written to the send channel. I tried another approach using channels but
gave up after several hours because target & command writers are not
reliably closed on cancellation. I don't like adding `time.Sleep` but
the alternative will require a lot of effort.
  • Loading branch information
mikejholly authored Mar 29, 2024
1 parent 50caafb commit 4893057
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/earthly/subcmd/build_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func (a *Build) ActionBuildImp(cliCtx *cli.Context, flagArgs, nonFlagArgs []stri
_, isCI := analytics.DetectCI(a.cli.Flags().EarthlyCIRunner)
setup.SetCI(isCI)
if doLogstreamUpload {
setup.StartLogStreamer(cliCtx.Context, cloudClient)
setup.StartLogStreamer(cloudClient)
}
return nil
}
Expand Down
11 changes: 9 additions & 2 deletions logbus/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"strings"
"time"

"github.com/earthly/cloud-api/logstream"
"github.com/earthly/earthly/cloud"
Expand Down Expand Up @@ -98,9 +99,9 @@ func (bs *BusSetup) LogStreamerStarted() bool {

// StartLogStreamer starts a LogStreamer for the given build. The
// LogStreamer streams logs to the cloud.
func (bs *BusSetup) StartLogStreamer(ctx context.Context, c *cloud.Client) {
func (bs *BusSetup) StartLogStreamer(c *cloud.Client) {
bs.LogStreamer = ship.NewLogShipper(c, bs.InitialManifest, bs.verbose)
bs.LogStreamer.Start(ctx)
bs.LogStreamer.Start()
bs.Bus.AddSubscriber(bs.LogStreamer)
bs.logStreamerStarted = true
}
Expand Down Expand Up @@ -164,6 +165,12 @@ func (bs *BusSetup) Close(ctx context.Context) error {
}

if bs.LogStreamer != nil {
// At the moment, it's very challenging to detect when all log writers
// are finished. Not all command & target writers are being reliably
// closed on cancellation. This short sleep gives all of the command &
// target writers a chance to finish sending messages. Perhaps this can
// be removed at some point in the future.
time.Sleep(20 * time.Millisecond)
bs.LogStreamer.Close()
if errs := bs.LogStreamer.Errs(); len(errs) > 0 {
multi := &multierror.Error{}
Expand Down
5 changes: 3 additions & 2 deletions logbus/ship/ship.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ func (l *LogShipper) Write(delta *pb.Delta) {
}

// Start the log streaming process and begin writing logs to the server.
func (l *LogShipper) Start(ctx context.Context) {
func (l *LogShipper) Start() {
go func() {
ctx, l.cancel = context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
l.cancel = cancel
defer l.cancel()
out := bufferedDeltaChan(ctx, l.ch)
errCh := l.cl.StreamLogs(ctx, l.man, out)
Expand Down
4 changes: 1 addition & 3 deletions logbus/ship/ship_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ func TestLogShipper(t *testing.T) {
done: make(chan struct{}),
}

ctx := context.Background()

s.Start(ctx)
s.Start()

n := 50
for i := 0; i < n; i++ {
Expand Down

0 comments on commit 4893057

Please sign in to comment.