diff --git a/Dockerfile b/Dockerfile index 66731957..e1ce3d83 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Dependencies Stage -FROM golang:1.22-alpine AS base +FROM golang:1.22.4-alpine AS base LABEL maintainer="Konstantin Makarov " WORKDIR /listener diff --git a/README.md b/README.md index 238dda8b..6034850a 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,9 @@ You can take metrics by specifying an endpoint for Prometheus in the configurati | published_events_total | the total number of published events | `subject`, `table` | | filter_skipped_events_total | the total number of skipped events | `table` | +### Kubernetes +Application initializes a web server (*if a port is specified in the configuration*) with two endpoints +for readiness `/ready` and liveness `/healthz` probes. ## Docker diff --git a/cmd/wal-listener/main.go b/cmd/wal-listener/main.go index 81553edf..6010dda7 100644 --- a/cmd/wal-listener/main.go +++ b/cmd/wal-listener/main.go @@ -5,6 +5,8 @@ import ( "fmt" "log/slog" "os" + "os/signal" + "syscall" scfg "github.com/ihippik/config" "github.com/urfave/cli/v2" @@ -35,6 +37,9 @@ func main() { }, }, Action: func(c *cli.Context) error { + ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + cfg, err := config.InitConfig(c.String("config")) if err != nil { return fmt.Errorf("get config: %w", err) @@ -44,7 +49,7 @@ func main() { return fmt.Errorf("validate config: %w", err) } - if err := scfg.InitSentry(cfg.Monitoring.SentryDSN, version); err != nil { + if err = scfg.InitSentry(cfg.Monitoring.SentryDSN, version); err != nil { return fmt.Errorf("init sentry: %w", err) } @@ -57,7 +62,7 @@ func main() { return fmt.Errorf("pgx connection: %w", err) } - pub, err := factoryPublisher(c.Context, cfg.Publisher, logger) + pub, err := factoryPublisher(ctx, cfg.Publisher, logger) if err != nil { return fmt.Errorf("factory publisher: %w", err) } @@ -78,7 +83,9 @@ func main() { config.NewMetrics(), ) - if err := service.Process(c.Context); err != nil { + go service.InitHandlers(ctx) + + if err := service.Process(ctx); err != nil { slog.Error("service process failed", "err", err.Error()) } diff --git a/config/config.go b/config/config.go index b58e2395..e55e327d 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,7 @@ type Config struct { // ListenerCfg path of the listener config. type ListenerCfg struct { SlotName string `valid:"required"` + ServerPort int AckTimeout time.Duration RefreshConnection time.Duration `valid:"required"` HeartbeatInterval time.Duration `valid:"required"` @@ -38,7 +39,7 @@ type ListenerCfg struct { TopicsMap map[string]string } -// PublisherCfg represent configuration for any types publisher. +// PublisherCfg represent configuration for any publisher types. type PublisherCfg struct { Type PublisherType `valid:"required"` Address string diff --git a/config_example.yml b/config_example.yml index 2fa1c758..93389322 100644 --- a/config_example.yml +++ b/config_example.yml @@ -1,4 +1,5 @@ listener: + serverPort: 80 # k8s probes, optional slotName: myslot_1 refreshConnection: 30s heartbeatInterval: 10s diff --git a/go.mod b/go.mod index 20fcdc52..8827c689 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ihippik/wal-listener/v2 -go 1.22 +go 1.22.4 require ( cloud.google.com/go/pubsub v1.37.0 diff --git a/listener/listener.go b/listener/listener.go index f7441cd0..941c2bfc 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -4,9 +4,12 @@ import ( "context" "fmt" "log/slog" + "net/http" "os" "os/signal" + "strconv" "sync" + "sync/atomic" "time" "github.com/jackc/pgx" @@ -62,6 +65,7 @@ type Listener struct { repository repository parser parser lsn uint64 + isAlive atomic.Bool } // NewWalListener create and initialize new service instance. @@ -85,6 +89,84 @@ func NewWalListener( } } +// InitHandlers init web handlers for liveness & readiness k8s probes. +func (l *Listener) InitHandlers(ctx context.Context) { + const defaultTimeout = 500 * time.Millisecond + + if l.cfg.Listener.ServerPort == 0 { + l.log.Debug("web server port for probes not specified, skip") + return + } + + handler := http.NewServeMux() + handler.HandleFunc("GET /healthz", l.liveness) + handler.HandleFunc("GET /ready", l.readiness) + + addr := ":" + strconv.Itoa(l.cfg.Listener.ServerPort) + srv := http.Server{ + Addr: addr, + Handler: handler, + ReadTimeout: defaultTimeout, + WriteTimeout: defaultTimeout, + } + + go func() { + if err := srv.ListenAndServe(); err != nil { + l.log.Error("error starting http listener", "err", err) + } + }() + + l.log.Debug("web handlers were initialised", slog.String("addr", addr)) + + <-ctx.Done() +} + +const contentTypeTextPlain = "text/plain" + +func (l *Listener) liveness(w http.ResponseWriter, r *http.Request) { + var ( + respCode = http.StatusOK + resp = []byte(`ok`) + ) + + w.Header().Set("Content-Type", contentTypeTextPlain) + + if !l.replicator.IsAlive() || !l.repository.IsAlive() { + resp = []byte("failed") + respCode = http.StatusInternalServerError + + l.log.Warn("liveness probe failed") + } + + w.WriteHeader(respCode) + + if _, err := w.Write(resp); err != nil { + l.log.Error("liveness: error writing response", "err", err) + } +} + +func (l *Listener) readiness(w http.ResponseWriter, r *http.Request) { + var ( + respCode = http.StatusOK + resp = []byte(`ok`) + ) + + w.Header().Set("Content-Type", contentTypeTextPlain) + + if !l.isAlive.Load() { + resp = []byte("failed") + respCode = http.StatusInternalServerError + + l.log.Warn("readiness probe failed") + } + + w.WriteHeader(respCode) + + if _, err := w.Write(resp); err != nil { + l.log.Error("liveness: error writing response", "err", err) + } +} + // Process is main service entry point. func (l *Listener) Process(ctx context.Context) error { logger := l.log.With("slot_name", l.cfg.Listener.SlotName) @@ -346,9 +428,12 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { case <-heart.C: if err := l.SendStandbyStatus(); err != nil { l.log.Error("failed to send heartbeat status", "err", err) + l.isAlive.Store(false) + continue } + l.isAlive.Store(true) l.log.Debug("sending periodic heartbeat status") } } @@ -365,7 +450,7 @@ func (l *Listener) SendStandbyStatus() error { standbyStatus.ReplyRequested = 0 - if err := l.replicator.SendStandbyStatus(standbyStatus); err != nil { + if err = l.replicator.SendStandbyStatus(standbyStatus); err != nil { return fmt.Errorf("unable to send StandbyStatus object: %w", err) }