From df6ca957017ced1a900cfa5fb82505e2a891a36a Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 10 Jan 2025 14:12:52 -0500 Subject: [PATCH 1/6] api: polish api service shutdown and top-level context handling --- packages/api/internal/handlers/store.go | 36 ++++--- packages/api/main.go | 135 +++++++++++++++++++++--- packages/shared/pkg/db/client.go | 4 +- packages/shared/pkg/telemetry/otel.go | 16 +-- 4 files changed, 144 insertions(+), 47 deletions(-) diff --git a/packages/api/internal/handlers/store.go b/packages/api/internal/handlers/store.go index 71aece9b1..e42992c44 100644 --- a/packages/api/internal/handlers/store.go +++ b/packages/api/internal/handlers/store.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "errors" "fmt" "net/http" "os" @@ -52,11 +53,9 @@ type APIStore struct { var lokiAddress = os.Getenv("LOKI_ADDRESS") -func NewAPIStore() *APIStore { +func NewAPIStore(ctx context.Context) *APIStore { fmt.Println("Initializing API store") - ctx := context.Background() - tracer := otel.Tracer("api") logger, err := logging.New(env.IsLocal()) @@ -133,29 +132,32 @@ func NewAPIStore() *APIStore { } } -func (a *APIStore) Close() { +func (a *APIStore) Close() error { a.templateSpawnCounter.Close() - err := a.analytics.Close() - if err != nil { - a.logger.Errorf("Error closing Analytics\n: %v", err) + errs := []error{} + + if err := a.analytics.Close(); err != nil { + errs = append(errs, fmt.Errorf("closing Analytics: %w", err)) } - err = a.posthog.Close() - if err != nil { - a.logger.Errorf("Error closing Posthog client\n: %v", err) + if err := a.posthog.Close(); err != nil { + errs = append(errs, fmt.Errorf("closing Posthog client: %w", err)) } - err = a.orchestrator.Close() - if err != nil { - a.logger.Errorf("Error closing Orchestrator client\n: %v", err) + if err := a.orchestrator.Close(); err != nil { + errs = append(errs, fmt.Errorf("closing Orchestrator client: %w", err)) + } - err = a.templateManager.Close() - if err != nil { - a.logger.Errorf("Error closing Template manager client\n: %v", err) + if err := a.templateManager.Close(); err != nil { + errs = append(errs, fmt.Errorf("closing Template manager client: %w", err)) + } + + if err := a.db.Close(); err != nil { + errs = append(errs, fmt.Errorf("closing database client: %w", err)) } - a.db.Close() + return errors.Join(errs...) } // This function wraps sending of an error in the Error format, and diff --git a/packages/api/main.go b/packages/api/main.go index 8496a4803..76c218ca4 100644 --- a/packages/api/main.go +++ b/packages/api/main.go @@ -1,11 +1,18 @@ package main import ( + "context" + "errors" "flag" "fmt" "log" + "net" "net/http" "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" "time" "github.com/getkin/kin-openapi/openapi3" @@ -34,7 +41,7 @@ const ( defaultPort = 80 ) -func NewGinServer(apiStore *handlers.APIStore, swagger *openapi3.T, port int) *http.Server { +func NewGinServer(ctx context.Context, apiStore *handlers.APIStore, swagger *openapi3.T, port int) *http.Server { // Clear out the servers array in the swagger spec, that skips validating // that server names match. We don't know how this thing will be run. swagger.Servers = nil @@ -114,44 +121,138 @@ func NewGinServer(apiStore *handlers.APIStore, swagger *openapi3.T, port int) *h Handler: r, Addr: fmt.Sprintf("0.0.0.0:%d", port), ReadHeaderTimeout: maxReadHeaderTimeout, + BaseContext: func(net.Listener) context.Context { return ctx }, } return s } func main() { - fmt.Println("Initializing...") + ctx, cancel := context.WithCancel(context.Background()) // root context + defer cancel() - port := flag.Int("port", defaultPort, "Port for test HTTP server") - flag.Parse() + signalCtx, sigCancel := signal.NotifyContext(ctx, syscall.SIGTERM) + defer sigCancel() + // TODO: additional improvements to signal handling/shutdown: + // - provide access to root context in the signal handling + // context so request scoped work can start background tasks + // without needing to make an unattached context. + // - provide mechanism to inform shutdown that background + // work has completed (waitgroup, counter, etc.) to avoid + // exiting early. - debug := flag.String("true", "false", "is debug") + var ( + port int + debug string + ) + flag.IntVar(&port, "port", defaultPort, "Port for test HTTP server") + flag.StringVar(&debug, "true", "false", "is debug") + flag.Parse() - if *debug != "true" { + log.Println("Starting API service...") + if debug != "true" { gin.SetMode(gin.ReleaseMode) } swagger, err := api.GetSwagger() if err != nil { - log.Printf("Error loading swagger spec\n: %v\n", err) - os.Exit(1) + log.Fatalf("Error loading swagger spec:\n%v", err) } + var cleanupFns []func() error + exitCode := &atomic.Int32{} + cleanup := func() { + start := time.Now() + // doing shutdown in parallel to avoid + // unintentionally: creating shutdown ordering + // effects. + wg := &sync.WaitGroup{} + count := 0 + for idx := range cleanupFns { + if cleanup := cleanupFns[idx]; cleanup != nil { + wg.Add(1) + count++ + go func() { + defer wg.Done() + if err := cleanup(); err != nil { + exitCode.Add(1) + log.Printf("cleanup operation error: %v", err) + } + }() + + // only run each cleanup once (in case cleanup is called + // explicitly.) + cleanupFns[idx] = nil + } + } + if count == 0 { + log.Println("no cleanup operations") + return + } + log.Printf("running %d cleanup operations", count) + wg.Wait() // this doesn't have a timeout + log.Printf("%d cleanup operations compleated in %s", count, time.Since(start)) + } + defer cleanup() + if !env.IsLocal() { shutdown := telemetry.InitOTLPExporter(serviceName, swagger.Info.Version) - defer shutdown() + cleanupFns = append(cleanupFns, func() error { + // shutdown handlers flush buffers upon call and take a context. passing a + // specific context here so that all timeout configuration is in one place. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + return shutdown(ctx) + }) } // Create an instance of our handler which satisfies the generated interface - apiStore := handlers.NewAPIStore() - defer apiStore.Close() + // (use the outer context rather than the signal handling + // context so it doesn't exit first.) + apiStore := handlers.NewAPIStore(ctx) + cleanupFns = append(cleanupFns, apiStore.Close) - s := NewGinServer(apiStore, swagger, *port) + // pass the signal context so that handlers know when shutdown is happening. + s := NewGinServer(signalCtx, apiStore, swagger, port) - fmt.Printf("Starting server on port %d\n", *port) - // And we serve HTTP until the world ends. - err = s.ListenAndServe() - if err != nil { - log.Printf("server error: %v\n", err) + go func() { + log.Printf("http service (%d) starting", port) + + // Serve HTTP until shutdown. + err := s.ListenAndServe() + + switch { + case errors.Is(err, http.ErrServerClosed): + log.Printf("http service (%d) shutdown successfully", port) + case err != nil: + exitCode.Add(2) + log.Printf("http service (%d) encountered error: %v", port, err) + default: + // this probably shouldn't happen... + log.Printf("http service (%d) exited without error", port) + } + + // cancel the parent context, in case the service + // ended outside of shutdown, (we want everything else + // to cleanup, and not to get stuck waiting). + cancel() + }() + + select { + case <-signalCtx.Done(): + // shutdown blocks until all active http handlers have returned. + if err := s.Shutdown(ctx); err != nil { + exitCode.Add(1) + log.Printf("http service (%d) shutdown error: %v", port, err) + } + case <-ctx.Done(): + log.Println("http service (%d) shutdown outside of signal", port) } + + // call cleanup explicitly because defers do not run on os.Exit + cleanup() + + // Exit, with appropriate code. + os.Exit(int(exitCode.Load())) } diff --git a/packages/shared/pkg/db/client.go b/packages/shared/pkg/db/client.go index ae2b2994b..5ee5a88c2 100644 --- a/packages/shared/pkg/db/client.go +++ b/packages/shared/pkg/db/client.go @@ -36,6 +36,6 @@ func NewClient() (*DB, error) { return &DB{Client: client}, nil } -func (db *DB) Close() { - _ = db.Client.Close() +func (db *DB) Close() error { + return db.Client.Close() } diff --git a/packages/shared/pkg/telemetry/otel.go b/packages/shared/pkg/telemetry/otel.go index e3a3544e0..b799748a8 100644 --- a/packages/shared/pkg/telemetry/otel.go +++ b/packages/shared/pkg/telemetry/otel.go @@ -33,7 +33,7 @@ type client struct { } // InitOTLPExporter initializes an OTLP exporter, and configures the corresponding trace providers. -func InitOTLPExporter(serviceName, serviceVersion string) func() { +func InitOTLPExporter(ctx context.Context, serviceName, serviceVersion string) func(ctx context.Context) error { attributes := []attribute.KeyValue{ semconv.ServiceName(serviceName), semconv.ServiceVersion(serviceVersion), @@ -46,8 +46,6 @@ func InitOTLPExporter(serviceName, serviceVersion string) func() { attributes = append(attributes, semconv.HostName(hostname)) } - ctx := context.Background() - res, err := resource.New( ctx, resource.WithSchemaURL(semconv.SchemaURL), @@ -126,23 +124,19 @@ func InitOTLPExporter(serviceName, serviceVersion string) func() { }() // Shutdown will flush any remaining spans and shut down the exporter. - return func() { - otelClient.close() - } + return otelClient.close } -func (c *client) close() { - ctx := context.Background() - +func (c *client) close(ctx context.Context) error { if c.tracerProvider != nil { if err := c.tracerProvider.Shutdown(ctx); err != nil { - log.Fatal(err) + return err } } if c.meterProvider != nil { if err := c.meterProvider.Shutdown(ctx); err != nil { - log.Fatal(err) + return err } } } From 9a2a89faa63f9ffcdb56462bae9e8f0dced026b4 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 17 Jan 2025 16:07:56 -0500 Subject: [PATCH 2/6] fix: adapt to changes in function signatures --- packages/api/main.go | 2 +- packages/shared/pkg/telemetry/otel.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/api/main.go b/packages/api/main.go index d42cf9c76..baa1d7441 100644 --- a/packages/api/main.go +++ b/packages/api/main.go @@ -197,7 +197,7 @@ func main() { defer cleanup() if !env.IsLocal() { - shutdown := telemetry.InitOTLPExporter(serviceName, swagger.Info.Version) + shutdown := telemetry.InitOTLPExporter(ctx, serviceName, swagger.Info.Version) cleanupFns = append(cleanupFns, func() error { // shutdown handlers flush buffers upon call and take a context. passing a // specific context here so that all timeout configuration is in one place. diff --git a/packages/shared/pkg/telemetry/otel.go b/packages/shared/pkg/telemetry/otel.go index b799748a8..5d654d5e5 100644 --- a/packages/shared/pkg/telemetry/otel.go +++ b/packages/shared/pkg/telemetry/otel.go @@ -139,4 +139,6 @@ func (c *client) close(ctx context.Context) error { return err } } + + return nil } From 431a4cc638362527f7495695135a43398ab3d4c8 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 17 Jan 2025 17:08:03 -0500 Subject: [PATCH 3/6] fix: additional binaries for new shared code --- packages/orchestrator/main.go | 8 ++++++-- packages/template-manager/main.go | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 3329b8304..05aadd81a 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "log" @@ -14,14 +15,17 @@ import ( const defaultPort = 5008 func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + port := flag.Int("port", defaultPort, "orchestrator server port") flag.Parse() if !env.IsLocal() { - shutdown := telemetry.InitOTLPExporter(server.ServiceName, "no") + shutdown := telemetry.InitOTLPExporter(ctx, server.ServiceName, "no") - defer shutdown() + defer shutdown(context.TODO()) } lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) diff --git a/packages/template-manager/main.go b/packages/template-manager/main.go index 4b717dc37..9b06b94f2 100644 --- a/packages/template-manager/main.go +++ b/packages/template-manager/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "log" @@ -19,6 +20,9 @@ const ( ) func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testFlag := flag.String("test", "", "run tests") templateID := flag.String("template", "", "template id") buildID := flag.String("build", "", "build id") @@ -41,8 +45,8 @@ func main() { } if !env.IsLocal() { - shutdown := telemetry.InitOTLPExporter(constants.ServiceName, "no") - defer shutdown() + shutdown := telemetry.InitOTLPExporter(ctx, constants.ServiceName, "no") + defer shutdown(context.TODO()) } lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) From dd94dc6acc1f67a29aafa3fdfe666a15af62e7c2 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 21 Jan 2025 13:12:03 -0500 Subject: [PATCH 4/6] fix: avoid inexpected double exit code increment --- packages/api/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/main.go b/packages/api/main.go index baa1d7441..82eefa705 100644 --- a/packages/api/main.go +++ b/packages/api/main.go @@ -227,7 +227,7 @@ func main() { case errors.Is(err, http.ErrServerClosed): log.Printf("http service (%d) shutdown successfully", port) case err != nil: - exitCode.Add(2) + exitCode.Add(1) log.Printf("http service (%d) encountered error: %v", port, err) default: // this probably shouldn't happen... From b01ae447f615e983af83fadd5ff43e27361c81bd Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 24 Jan 2025 08:59:27 -0500 Subject: [PATCH 5/6] whitespace --- packages/orchestrator/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 05aadd81a..6f9b5ca63 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -24,7 +24,6 @@ func main() { if !env.IsLocal() { shutdown := telemetry.InitOTLPExporter(ctx, server.ServiceName, "no") - defer shutdown(context.TODO()) } From 5e138f963c2f8c67b90a4aeed1b0ba82016ce9fe Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 28 Jan 2025 11:42:37 -0500 Subject: [PATCH 6/6] fix: also handle sigint --- packages/api/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/main.go b/packages/api/main.go index 82eefa705..e16316d4e 100644 --- a/packages/api/main.go +++ b/packages/api/main.go @@ -132,7 +132,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) // root context defer cancel() - signalCtx, sigCancel := signal.NotifyContext(ctx, syscall.SIGTERM) + signalCtx, sigCancel := signal.NotifyContext(ctx, syscall.SIGTERM, syscall.SIGINT) defer sigCancel() // TODO: additional improvements to signal handling/shutdown: // - provide access to root context in the signal handling