From 5b43ad4488349e4ac3bf2564bde1932def73520f Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Fri, 5 Jan 2024 22:25:20 +0300 Subject: [PATCH] feat(promrw): request writer Ref: #283 --- cmd/promrw/main.go | 131 ++++++++++++++++++++++++++++++++++++++++ cmd/promrw/main_test.go | 37 ++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 cmd/promrw/main.go create mode 100644 cmd/promrw/main_test.go diff --git a/cmd/promrw/main.go b/cmd/promrw/main.go new file mode 100644 index 00000000..8d62aff2 --- /dev/null +++ b/cmd/promrw/main.go @@ -0,0 +1,131 @@ +// Binary promrw implements prometheusremotewrite receiver that can record +// requests or send them to specified target. +package main + +import ( + "context" + "encoding/gob" + "flag" + "fmt" + "io" + "net/http" + "os" + "sync/atomic" + "time" + + "github.com/go-faster/errors" + "github.com/go-faster/sdk/app" + "github.com/klauspost/compress/zstd" + "go.uber.org/multierr" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +func disableTelemetry() { + for _, v := range []string{ + "OTEL_METRICS_EXPORTER", + "OTEL_LOGS_EXPORTER", + "OTEL_TRACES_EXPORTER", + } { + _ = os.Setenv(v, "none") + } +} + +func main() { + disableTelemetry() + var arg struct { + Listen bool + Addr string + Data string + Duration time.Duration + } + flag.BoolVar(&arg.Listen, "listen", false, "Listen mode") + flag.StringVar(&arg.Addr, "addr", ":8080", "Address") + flag.StringVar(&arg.Data, "f", "rw.gob.zstd", "Data file") + flag.DurationVar(&arg.Duration, "d", time.Minute, "Duration in seconds of recorded data") + flag.Parse() + + if arg.Listen { + app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) (rerr error) { + f, err := os.Create(arg.Data) + if err != nil { + return errors.Wrap(err, "create file") + } + defer func() { + if err := f.Close(); err != nil { + rerr = multierr.Append(rerr, errors.Wrap(err, "close file")) + } else { + lg.Info("Saved", zap.String("file", arg.Data)) + } + }() + w, err := zstd.NewWriter(f) + if err != nil { + return errors.Wrap(err, "create encoder") + } + defer func() { + if err := w.Close(); err != nil { + rerr = multierr.Append(rerr, errors.Wrap(err, "close encoder")) + } + }() + e := gob.NewEncoder(w) + ctx, cancel := context.WithCancel(ctx) + var start atomic.Pointer[time.Time] + srv := &http.Server{ + Addr: arg.Addr, + ReadHeaderTimeout: time.Second, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data, err := io.ReadAll(r.Body) + if err != nil { + lg.Error("read", zap.Error(err)) + return + } + now := time.Now() + start.CompareAndSwap(nil, &now) + duration := now.Sub(*start.Load()) + if duration > arg.Duration { + cancel() + w.WriteHeader(http.StatusAccepted) + return + } + if err := e.Encode(data); err != nil { + lg.Error("Write", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + cancel() + return + } + w.WriteHeader(http.StatusAccepted) + }), + } + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + ticker := time.NewTicker(time.Second * 2) + defer ticker.Stop() + for { + select { + case now := <-ticker.C: + if v := start.Load(); v != nil { + duration := now.Sub(*v) + fmt.Printf("d=%s of %s\n", duration.Round(time.Second), arg.Duration) + } else { + fmt.Println(`d="not started"`) + } + case <-ctx.Done(): + return nil + } + } + }) + g.Go(func() error { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return errors.Wrap(err, "listen and serve") + } + return nil + }) + g.Go(func() error { + <-ctx.Done() + stopCtx := context.Background() + return srv.Shutdown(stopCtx) + }) + return g.Wait() + }) + } +} diff --git a/cmd/promrw/main_test.go b/cmd/promrw/main_test.go new file mode 100644 index 00000000..8f62c7c6 --- /dev/null +++ b/cmd/promrw/main_test.go @@ -0,0 +1,37 @@ +package main + +import ( + "bytes" + "encoding/gob" + "testing" + + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" +) + +func TestProtobufReadWrite(t *testing.T) { + req := &prompb.WriteRequest{ + Metadata: []prompb.MetricMetadata{ + { + Type: prompb.MetricMetadata_GAUGE, + Help: "Help", + }, + }, + } + out := new(bytes.Buffer) + w := gob.NewEncoder(out) + for i := 0; i < 10; i++ { + data, err := req.Marshal() + require.NoError(t, err) + require.NoError(t, w.Encode(data)) + } + + r := gob.NewDecoder(out) + for i := 0; i < 10; i++ { + var data []byte + require.NoError(t, r.Decode(&data)) + var got prompb.WriteRequest + require.NoError(t, got.Unmarshal(data)) + require.Equal(t, req.Metadata, got.Metadata) + } +}