Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: polish api service shutdown and top-level context handling #237

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions packages/api/internal/handlers/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"context"
"errors"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -52,11 +53,9 @@ type APIStore struct {

var lokiAddress = os.Getenv("LOKI_ADDRESS")

func NewAPIStore() *APIStore {
func NewAPIStore(ctx context.Context) *APIStore {
fmt.Println("Initializing API store")

ctx := context.Background()

tracer := otel.Tracer("api")

logger, err := logging.New(env.IsLocal())
Expand Down Expand Up @@ -133,29 +132,32 @@ func NewAPIStore() *APIStore {
}
}

func (a *APIStore) Close() {
func (a *APIStore) Close() error {
a.templateSpawnCounter.Close()

err := a.analytics.Close()
if err != nil {
a.logger.Errorf("Error closing Analytics\n: %v", err)
errs := []error{}

if err := a.analytics.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing Analytics: %w", err))
}

err = a.posthog.Close()
if err != nil {
a.logger.Errorf("Error closing Posthog client\n: %v", err)
if err := a.posthog.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing Posthog client: %w", err))
}

err = a.orchestrator.Close()
if err != nil {
a.logger.Errorf("Error closing Orchestrator client\n: %v", err)
if err := a.orchestrator.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing Orchestrator client: %w", err))

}
err = a.templateManager.Close()
if err != nil {
a.logger.Errorf("Error closing Template manager client\n: %v", err)
if err := a.templateManager.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing Template manager client: %w", err))
}

if err := a.db.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing database client: %w", err))
}

a.db.Close()
return errors.Join(errs...)
}

// This function wraps sending of an error in the Error format, and
Expand Down
133 changes: 115 additions & 18 deletions packages/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/getkin/kin-openapi/openapi3"
Expand Down Expand Up @@ -36,7 +42,7 @@ const (
defaultPort = 80
)

func NewGinServer(apiStore *handlers.APIStore, swagger *openapi3.T, port int) *http.Server {
func NewGinServer(ctx context.Context, apiStore *handlers.APIStore, swagger *openapi3.T, port int) *http.Server {
// Clear out the servers array in the swagger spec, that skips validating
// that server names match. We don't know how this thing will be run.
swagger.Servers = nil
Expand Down Expand Up @@ -116,47 +122,138 @@ func NewGinServer(apiStore *handlers.APIStore, swagger *openapi3.T, port int) *h
Handler: r,
Addr: fmt.Sprintf("0.0.0.0:%d", port),
ReadHeaderTimeout: maxReadHeaderTimeout,
BaseContext: func(net.Listener) context.Context { return ctx },
}

return s
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background()) // root context
defer cancel()

fmt.Println("Initializing...")
signalCtx, sigCancel := signal.NotifyContext(ctx, syscall.SIGTERM)
defer sigCancel()
// TODO: additional improvements to signal handling/shutdown:
// - provide access to root context in the signal handling
// context so request scoped work can start background tasks
// without needing to make an unattached context.
// - provide mechanism to inform shutdown that background
// work has completed (waitgroup, counter, etc.) to avoid
// exiting early.

port := flag.Int("port", defaultPort, "Port for test HTTP server")
var (
port int
debug string
)
flag.IntVar(&port, "port", defaultPort, "Port for test HTTP server")
flag.StringVar(&debug, "true", "false", "is debug")
flag.Parse()

debug := flag.String("true", "false", "is debug")

if *debug != "true" {
log.Println("Starting API service...")
if debug != "true" {
gin.SetMode(gin.ReleaseMode)
}

swagger, err := api.GetSwagger()
if err != nil {
log.Printf("Error loading swagger spec\n: %v\n", err)
os.Exit(1)
log.Fatalf("Error loading swagger spec:\n%v", err)
}

var cleanupFns []func() error
exitCode := &atomic.Int32{}
cleanup := func() {
start := time.Now()
// doing shutdown in parallel to avoid
// unintentionally: creating shutdown ordering
// effects.
wg := &sync.WaitGroup{}
count := 0
for idx := range cleanupFns {
if cleanup := cleanupFns[idx]; cleanup != nil {
wg.Add(1)
count++
go func() {
defer wg.Done()
if err := cleanup(); err != nil {
exitCode.Add(1)
log.Printf("cleanup operation error: %v", err)
}
}()

// only run each cleanup once (in case cleanup is called
// explicitly.)
cleanupFns[idx] = nil
}
}
if count == 0 {
log.Println("no cleanup operations")
return
}
log.Printf("running %d cleanup operations", count)
wg.Wait() // this doesn't have a timeout
log.Printf("%d cleanup operations compleated in %s", count, time.Since(start))
}
defer cleanup()

if !env.IsLocal() {
shutdown := telemetry.InitOTLPExporter(ctx, serviceName, swagger.Info.Version)
defer shutdown(context.TODO())
cleanupFns = append(cleanupFns, func() error {
// shutdown handlers flush buffers upon call and take a context. passing a
// specific context here so that all timeout configuration is in one place.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

return shutdown(ctx)
})
}

// Create an instance of our handler which satisfies the generated interface
apiStore := handlers.NewAPIStore()
defer apiStore.Close()
// (use the outer context rather than the signal handling
// context so it doesn't exit first.)
apiStore := handlers.NewAPIStore(ctx)
cleanupFns = append(cleanupFns, apiStore.Close)

s := NewGinServer(apiStore, swagger, *port)
// pass the signal context so that handlers know when shutdown is happening.
s := NewGinServer(signalCtx, apiStore, swagger, port)

fmt.Printf("Starting server on port %d\n", *port)
// And we serve HTTP until the world ends.
err = s.ListenAndServe()
if err != nil {
log.Printf("server error: %v\n", err)
go func() {
log.Printf("http service (%d) starting", port)

// Serve HTTP until shutdown.
err := s.ListenAndServe()

switch {
case errors.Is(err, http.ErrServerClosed):
log.Printf("http service (%d) shutdown successfully", port)
case err != nil:
exitCode.Add(1)
log.Printf("http service (%d) encountered error: %v", port, err)
default:
// this probably shouldn't happen...
log.Printf("http service (%d) exited without error", port)
}

// cancel the parent context, in case the service
// ended outside of shutdown, (we want everything else
// to cleanup, and not to get stuck waiting).
cancel()
}()

select {
case <-signalCtx.Done():
// shutdown blocks until all active http handlers have returned.
if err := s.Shutdown(ctx); err != nil {
exitCode.Add(1)
log.Printf("http service (%d) shutdown error: %v", port, err)
}
case <-ctx.Done():
log.Println("http service (%d) shutdown outside of signal", port)
}

// call cleanup explicitly because defers do not run on os.Exit
cleanup()
jakubno marked this conversation as resolved.
Show resolved Hide resolved

// Exit, with appropriate code.
os.Exit(int(exitCode.Load()))
jakubno marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions packages/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func main() {

if !env.IsLocal() {
shutdown := telemetry.InitOTLPExporter(ctx, server.ServiceName, "no")

defer shutdown(context.TODO())
tychoish marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down