Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

healthcheck writer #1687

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/kolide/launcher/ee/localserver"
kolidelog "github.com/kolide/launcher/ee/log/osquerylogs"
"github.com/kolide/launcher/ee/powereventwatcher"
"github.com/kolide/launcher/ee/restartservice"
"github.com/kolide/launcher/ee/tuf"
"github.com/kolide/launcher/pkg/augeas"
"github.com/kolide/launcher/pkg/backoff"
Expand Down Expand Up @@ -271,6 +272,17 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
go checkpointer.Once(ctx)
runGroup.Add("logcheckpoint", checkpointer.Run, checkpointer.Interrupt)

healthCheckStore, err := restartservice.OpenWriter(ctx, k.RootDirectory())
if err != nil { // log an error but don't stop runLauncher from continuing
slogger.Log(ctx, slog.LevelError,
"could not init health check result writer store, history will be absent for this run",
"err", err,
)
} else {
healthchecker := checkups.NewHealthChecker(slogger, k, healthCheckStore)
runGroup.Add("healthchecker", healthchecker.Run, healthchecker.Interrupt)
}

// Create a channel for signals
sigChannel := make(chan os.Signal, 1)

Expand Down
8 changes: 7 additions & 1 deletion ee/agent/storage/sqlite/keyvalue_store_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
type storeName int

const (
StartupSettingsStore storeName = iota
StartupSettingsStore storeName = iota
HealthCheckStore storeName = 1
RestartServiceLogStore storeName = 2
)

// String translates the exported int constant to the actual name of the
Expand All @@ -30,6 +32,10 @@ func (s storeName) String() string {
switch s {
case StartupSettingsStore:
return "startup_settings"
case HealthCheckStore:
return "health_check_results"
case RestartServiceLogStore:
return "restart_service_logs"
}

return ""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS health_check_results;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS health_check_results (
timestamp INT NOT NULL,
results TEXT
);
78 changes: 78 additions & 0 deletions ee/agent/storage/sqlite/sql_store_sqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package agentsqlite

import (
"context"
"database/sql"
"errors"
"fmt"

_ "modernc.org/sqlite"
)

func (s *sqliteStore) FetchResults(ctx context.Context) ([][]byte, error) {
results := make([][]byte, 0)

if s == nil || s.conn == nil {
return results, errors.New("store is nil")
}

// It's fine to interpolate the table name into the query because we allowlist via `storeName` type
query := fmt.Sprintf(`SELECT timestamp, results FROM %s;`, s.tableName)
rows, err := s.conn.QueryContext(ctx, query)
if err != nil {
return results, err
}

defer rows.Close()

for rows.Next() {
var timestamp int64
var result string
if err := rows.Scan(&timestamp, &result); err != nil {
return results, err
}
results = append(results, []byte(result))
}

return results, nil
}

func (s *sqliteStore) FetchLatestResult(ctx context.Context) ([]byte, error) {
if s == nil || s.conn == nil {
return []byte{}, errors.New("store is nil")
}

// It's fine to interpolate the table name into the query because we allowlist via `storeName` type
query := fmt.Sprintf(`SELECT timestamp, results FROM %s ORDER BY timestamp DESC LIMIT 1;`, s.tableName)
var timestamp int64
var result string

err := s.conn.QueryRowContext(ctx, query).Scan(&timestamp, &result)
switch {
case err == sql.ErrNoRows:
return []byte{}, nil
case err != nil:
return []byte{}, err
default:
return []byte(result), nil
}
}

func (s *sqliteStore) AddResult(ctx context.Context, timestamp int64, result []byte) error {
if s == nil || s.conn == nil {
return errors.New("store is nil")
}

if s.readOnly {
return errors.New("cannot perform AddResult with RO connection")
}

// It's fine to interpolate the table name into the query because we allowlist via `storeName` type
insertSql := fmt.Sprintf(`INSERT INTO %s (timestamp, results) VALUES (?, ?);`, s.tableName)

if _, err := s.conn.Exec(insertSql, timestamp, string(result)); err != nil {
return fmt.Errorf("inserting into %s: %w", s.tableName, err)
}

return nil
}
6 changes: 5 additions & 1 deletion ee/agent/types/keyvalue_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ type GetterSetter interface {
Setter
}

type Closer interface {
Close() error
}

// GetterCloser extends the Getter interface with a Close method.
type GetterCloser interface {
Getter
Close() error
Closer
}

// GetterUpdaterCloser groups the Get, Update, and Close methods.
Expand Down
24 changes: 24 additions & 0 deletions ee/agent/types/sql_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package types

import (
"context"
)

// ResultFetcher is an interface for querying a single text field from a structured data store.
// This is intentionally vague for potential future re-use, allowing the caller to unmarshal string results as needed.
// This was initially intended to support the sqlite health_check_results table
type ResultFetcher interface {
// Fetch retrieves all results rows
FetchResults(ctx context.Context) ([][]byte, error)
// FetchLatest retrieves the most recent result based on timestamp column
FetchLatestResult(ctx context.Context) ([]byte, error)
Closer
}

type ResultSetter interface {
// AddResult persists a marshalled result entry alongside the provided unix timestamp
AddResult(ctx context.Context, timestamp int64, result []byte) error
Closer
}

// TODO add rotation interface to cap limit on health check results
6 changes: 5 additions & 1 deletion ee/debug/checkups/checkups.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const (
doctorSupported targetBits = 1 << iota
flareSupported
logSupported
// note that in the future a failing checkup that is healthCheckSupported should
// result in a launcher restart from our watchdog service. ensure that this makes
// sense for a checkup before adding the healthCheckSupported bits
healthCheckSupported
)

//const checkupFor iota
Expand All @@ -94,7 +98,7 @@ func checkupsFor(k types.Knapsack, target targetBits) []checkupInt {
{&Platform{}, doctorSupported | flareSupported | logSupported},
{&Version{k: k}, doctorSupported | flareSupported | logSupported},
{&hostInfoCheckup{k: k}, doctorSupported | flareSupported | logSupported},
{&Processes{}, doctorSupported | flareSupported},
{&Processes{}, doctorSupported | flareSupported | healthCheckSupported},
{&RootDirectory{k: k}, doctorSupported | flareSupported},
{&Connectivity{k: k}, doctorSupported | flareSupported | logSupported},
{&Logs{k: k}, doctorSupported | flareSupported},
Expand Down
111 changes: 111 additions & 0 deletions ee/debug/checkups/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package checkups

import (
"context"
"encoding/json"
"io"
"log/slog"
"strings"
"time"

"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/ee/restartservice"
)

type (
healthChecker struct {
slogger *slog.Logger
knapsack types.Knapsack
interrupt chan struct{}
interrupted bool
writer *restartservice.HealthCheckWriter
}
)

func NewHealthChecker(slogger *slog.Logger, k types.Knapsack, writer *restartservice.HealthCheckWriter) *healthChecker {
return &healthChecker{
slogger: slogger.With("component", "healthchecker"),
knapsack: k,
interrupt: make(chan struct{}, 1),
writer: writer,
}
}

// Run starts a healthchecker routine. The purpose of this is to
// maintain a historical record of launcher health for general debugging
// and for our watchdog service to observe unhealthy states and respond accordingly
func (c *healthChecker) Run() error {
ticker := time.NewTicker(time.Minute * 30)
defer ticker.Stop()

for {
c.Once(context.TODO())

select {
case <-ticker.C:
continue
case <-c.interrupt:
c.slogger.Log(context.TODO(), slog.LevelDebug,
"interrupt received, exiting execute loop",
)
return nil
}
}
}

func (c *healthChecker) Interrupt(_ error) {
// Only perform shutdown tasks on first call to interrupt -- no need to repeat on potential extra calls.
if c.interrupted {
return
}

c.interrupted = true

c.interrupt <- struct{}{}
}

func (c *healthChecker) Once(ctx context.Context) {
checkups := checkupsFor(c.knapsack, healthCheckSupported)
results := make(map[string]Status)
checkupTime := time.Now().Unix()

for _, checkup := range checkups {
checkup.Run(ctx, io.Discard)
checkupName := normalizeCheckupName(checkup.Name())
results[checkupName] = checkup.Status()
// log all data for debugging if Failing
if checkup.Status() == Failing {
c.slogger.Log(ctx, slog.LevelWarn,
"detected health check failure",
"checkup", checkupName,
"data", checkup.Data(),
)
}
}

resultsJson, err := json.Marshal(results)
if err != nil {
c.slogger.Log(ctx, slog.LevelWarn,
"failure encoding health check results",
"err", err,
)

return
}

if err = c.writer.AddHealthCheckResult(ctx, checkupTime, resultsJson); err != nil {
c.slogger.Log(ctx, slog.LevelWarn,
"failure writing out health check results",
"err", err,
)

return
}
}

func normalizeCheckupName(name string) string {
return strings.ReplaceAll(
strings.ToLower(name),
" ", "_",
)
}
28 changes: 28 additions & 0 deletions ee/restartservice/health_check_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package restartservice

import (
"context"
"fmt"

agentsqlite "github.com/kolide/launcher/ee/agent/storage/sqlite"
"github.com/kolide/launcher/ee/agent/types"
)

type (
healthCheckReader struct {
store types.ResultFetcher
}
)

func OpenReader(ctx context.Context, rootDirectory string) (*healthCheckReader, error) {
store, err := agentsqlite.OpenRO(ctx, rootDirectory, agentsqlite.HealthCheckStore)
if err != nil {
return nil, fmt.Errorf("opening healthcheck db in %s: %w", rootDirectory, err)
}

return &healthCheckReader{store: store}, nil
}

func (r *healthCheckReader) Close() error {
return r.store.Close()
}
50 changes: 50 additions & 0 deletions ee/restartservice/health_check_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package restartservice

import (
"context"
"errors"
"fmt"

agentsqlite "github.com/kolide/launcher/ee/agent/storage/sqlite"
"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/pkg/traces"
)

// HealthCheckWriter adheres to the ResultSetter interface
type HealthCheckWriter struct {
store types.ResultSetter
}

// OpenWriter returns a new health check results writer, creating and initializing
// the database if necessary.
func OpenWriter(ctx context.Context, rootDirectory string) (*HealthCheckWriter, error) {
ctx, span := traces.StartSpan(ctx)
defer span.End()

store, err := agentsqlite.OpenRW(ctx, rootDirectory, agentsqlite.HealthCheckStore)
if err != nil {
return nil, fmt.Errorf("opening healthcheck db in %s: %w", rootDirectory, err)
}

s := &HealthCheckWriter{
store: store,
}

return s, nil
}

func (hw *HealthCheckWriter) AddHealthCheckResult(ctx context.Context, timestamp int64, value []byte) error {
if hw == nil || hw.store == nil {
return errors.New("store is nil")
}

if err := hw.store.AddResult(ctx, timestamp, value); err != nil {
return fmt.Errorf("adding healthcheck result: %w", err)
}

return nil
}

func (r *HealthCheckWriter) Close() error {
return r.store.Close()
}
Loading