From 489305798ea607f86e44ef9dc04eff0df6b6c8b8 Mon Sep 17 00:00:00 2001 From: Mike Holly Date: Fri, 29 Mar 2024 16:40:13 -0700 Subject: [PATCH] Logstream: improve message draining (#3955) The previous PR https://github.com/earthly/earthly/pull/3953 surfaced that not all messages were being written after a user-initiated cancelation. This PR adds a short sleep to allow the messages to be written to the send channel. I tried another approach using channels but gave up after several hours because target & command writers are not reliably closed on cancellation. I don't like adding `time.Sleep` but the alternative will require a lot of effort. --- cmd/earthly/subcmd/build_cmd.go | 2 +- logbus/setup/setup.go | 11 +++++++++-- logbus/ship/ship.go | 5 +++-- logbus/ship/ship_test.go | 4 +--- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/cmd/earthly/subcmd/build_cmd.go b/cmd/earthly/subcmd/build_cmd.go index 87d11a48..1e2dbcaf 100644 --- a/cmd/earthly/subcmd/build_cmd.go +++ b/cmd/earthly/subcmd/build_cmd.go @@ -651,7 +651,7 @@ func (a *Build) ActionBuildImp(cliCtx *cli.Context, flagArgs, nonFlagArgs []stri _, isCI := analytics.DetectCI(a.cli.Flags().EarthlyCIRunner) setup.SetCI(isCI) if doLogstreamUpload { - setup.StartLogStreamer(cliCtx.Context, cloudClient) + setup.StartLogStreamer(cloudClient) } return nil } diff --git a/logbus/setup/setup.go b/logbus/setup/setup.go index d583d25d..096957db 100644 --- a/logbus/setup/setup.go +++ b/logbus/setup/setup.go @@ -4,6 +4,7 @@ import ( "context" "os" "strings" + "time" "github.com/earthly/cloud-api/logstream" "github.com/earthly/earthly/cloud" @@ -98,9 +99,9 @@ func (bs *BusSetup) LogStreamerStarted() bool { // StartLogStreamer starts a LogStreamer for the given build. The // LogStreamer streams logs to the cloud. -func (bs *BusSetup) StartLogStreamer(ctx context.Context, c *cloud.Client) { +func (bs *BusSetup) StartLogStreamer(c *cloud.Client) { bs.LogStreamer = ship.NewLogShipper(c, bs.InitialManifest, bs.verbose) - bs.LogStreamer.Start(ctx) + bs.LogStreamer.Start() bs.Bus.AddSubscriber(bs.LogStreamer) bs.logStreamerStarted = true } @@ -164,6 +165,12 @@ func (bs *BusSetup) Close(ctx context.Context) error { } if bs.LogStreamer != nil { + // At the moment, it's very challenging to detect when all log writers + // are finished. Not all command & target writers are being reliably + // closed on cancellation. This short sleep gives all of the command & + // target writers a chance to finish sending messages. Perhaps this can + // be removed at some point in the future. + time.Sleep(20 * time.Millisecond) bs.LogStreamer.Close() if errs := bs.LogStreamer.Errs(); len(errs) > 0 { multi := &multierror.Error{} diff --git a/logbus/ship/ship.go b/logbus/ship/ship.go index ee4b544c..f9105210 100644 --- a/logbus/ship/ship.go +++ b/logbus/ship/ship.go @@ -50,9 +50,10 @@ func (l *LogShipper) Write(delta *pb.Delta) { } // Start the log streaming process and begin writing logs to the server. -func (l *LogShipper) Start(ctx context.Context) { +func (l *LogShipper) Start() { go func() { - ctx, l.cancel = context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + l.cancel = cancel defer l.cancel() out := bufferedDeltaChan(ctx, l.ch) errCh := l.cl.StreamLogs(ctx, l.man, out) diff --git a/logbus/ship/ship_test.go b/logbus/ship/ship_test.go index ecd05a11..9b4c475b 100644 --- a/logbus/ship/ship_test.go +++ b/logbus/ship/ship_test.go @@ -48,9 +48,7 @@ func TestLogShipper(t *testing.T) { done: make(chan struct{}), } - ctx := context.Background() - - s.Start(ctx) + s.Start() n := 50 for i := 0; i < n; i++ {