Skip to content

Commit

Permalink
perf(otelbench): add promql query sender
Browse files Browse the repository at this point in the history
Ref: #283
  • Loading branch information
ernado committed Jan 7, 2024
1 parent 2e062a6 commit 74fa703
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 2 deletions.
28 changes: 28 additions & 0 deletions cmd/otelbench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Binary otelbench implements benchmarking suite for oteldb.
package main

import (
"context"
"os"
"os/signal"

"github.com/spf13/cobra"
)

func main() {
rootCmd := &cobra.Command{
Use: "otelbench",
Short: "otelbench is a benchmarking suite for querying oteldb",

SilenceUsage: true,
}
rootCmd.AddCommand(
newPromQLCommand(),
)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
if err := rootCmd.ExecuteContext(ctx); err != nil {
_, _ = os.Stderr.WriteString(err.Error() + "\n")
os.Exit(1)
}
}
167 changes: 167 additions & 0 deletions cmd/otelbench/promql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"time"

"github.com/go-faster/errors"
"github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"

"github.com/go-faster/oteldb/internal/promapi"
"github.com/go-faster/oteldb/internal/promproxy"
)

type PromQL struct {
Addr string
Input string
RequestTimeout time.Duration

client *promapi.Client
}

func (p *PromQL) Setup() error {
var err error
p.client, err = promapi.NewClient(p.Addr)
if err != nil {
return errors.Wrap(err, "create client")
}
return nil
}

func toPrometheusTimestamp(t time.Time) promapi.PrometheusTimestamp {
return promapi.PrometheusTimestamp(strconv.FormatInt(t.Unix(), 10))
}

func (p *PromQL) sendRangeQuery(ctx context.Context, q promproxy.RangeQuery) error {
if _, err := p.client.GetQueryRange(ctx, promapi.GetQueryRangeParams{
Query: q.Query,
Step: strconv.Itoa(q.Step),
Start: toPrometheusTimestamp(q.Start),
End: toPrometheusTimestamp(q.End),
}); err != nil {
return errors.Wrap(err, "get query range")
}
return nil
}

func toOptPrometheusTimestamp(t promproxy.OptDateTime) promapi.OptPrometheusTimestamp {
if !t.IsSet() {
return promapi.OptPrometheusTimestamp{}
}
return promapi.NewOptPrometheusTimestamp(toPrometheusTimestamp(t.Value))
}

func (p *PromQL) sendInstantQuery(ctx context.Context, q promproxy.InstantQuery) error {
if _, err := p.client.GetQuery(ctx, promapi.GetQueryParams{
Query: q.Query,
Time: toOptPrometheusTimestamp(q.Time),
}); err != nil {
return errors.Wrap(err, "get query")
}
return nil
}

func (p *PromQL) sendSeriesQuery(ctx context.Context, query promproxy.SeriesQuery) error {
if _, err := p.client.GetSeries(ctx, promapi.GetSeriesParams{
Start: toOptPrometheusTimestamp(query.Start),
End: toOptPrometheusTimestamp(query.End),
Match: query.Matchers,
}); err != nil {
return errors.Wrap(err, "get series")
}
return nil
}

func (p *PromQL) send(ctx context.Context, q promproxy.Query) error {
ctx, cancel := context.WithTimeout(ctx, p.RequestTimeout)
defer cancel()
switch q.Type {
case promproxy.InstantQueryQuery:
return p.sendInstantQuery(ctx, q.InstantQuery)
case promproxy.RangeQueryQuery:
return p.sendRangeQuery(ctx, q.RangeQuery)
case promproxy.SeriesQueryQuery:
return p.sendSeriesQuery(ctx, q.SeriesQuery)
default:
return errors.Errorf("unknown query type %q", q.Type)
}
}

func (p *PromQL) each(ctx context.Context, fn func(ctx context.Context, q promproxy.Query) error) error {
f, err := os.Open(p.Input)
if err != nil {
return errors.Wrap(err, "read")
}
defer func() {
_ = f.Close()
}()
d := json.NewDecoder(f)
for {
var q promproxy.Query
if err := d.Decode(&q); err != nil {
if errors.Is(err, io.EOF) {
break
}
return errors.Wrap(err, "decode query")
}
if err := fn(ctx, q); err != nil {
return errors.Wrap(err, "callback")
}
}
return nil
}

func (p *PromQL) Run(ctx context.Context) error {
fmt.Println("sending", p.Input, "to", p.Addr)
var total int64
if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) error {
total++
return nil
}); err != nil {
return errors.Wrap(err, "count total")
}
pb := progressbar.Default(total)
start := time.Now()
if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) error {
if err := p.send(ctx, q); err != nil {
return errors.Wrap(err, "send")
}
if err := pb.Add(1); err != nil {
return errors.Wrap(err, "update progress bar")
}
return nil
}); err != nil {
_ = pb.Exit()
return errors.Wrap(err, "send")
}
if err := pb.Finish(); err != nil {
return errors.Wrap(err, "finish progress bar")
}
fmt.Println("done in", time.Since(start).Round(time.Millisecond))
return nil
}

func newPromQLCommand() *cobra.Command {
p := &PromQL{}
cmd := &cobra.Command{
Use: "promql",
Short: "Run promql queries",
RunE: func(cmd *cobra.Command, args []string) error {
if err := p.Setup(); err != nil {
return errors.Wrap(err, "setup")
}
return p.Run(cmd.Context())
},
}
f := cmd.Flags()
f.StringVar(&p.Addr, "addr", "http://localhost:9090", "Prometheus address")
f.StringVarP(&p.Input, "input", "i", "queries.jsonl", "Input file")
f.DurationVar(&p.RequestTimeout, "request-timeout", time.Second*10, "Request timeout")
return cmd
}
2 changes: 1 addition & 1 deletion cmd/oteldb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func loadConfig(name string) (cfg Config, _ error) {
return cfg, nil
}

// Config is a oteldb config.
// Config is the oteldb config.
type Config struct {
DSN string `json:"dsn" yaml:"dsn"`
TTL time.Duration `json:"ttl" yaml:"ttl"`
Expand Down
5 changes: 4 additions & 1 deletion dev/local/ch-bench-read/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ services:
build:
context: ../../../
dockerfile: Dockerfile
command:
- --config=/etc/otel/cfg.yml
volumes:
- ./oteldb.yml:/etc/otel/cfg.yml:ro
environment:
- OTELDB_STORAGE=ch
- CH_DSN=clickhouse://clickhouse:9000
- OTEL_LOG_LEVEL=info
- OTEL_METRICS_EXPORTER=none
- OTEL_LOGS_EXPORTER=none
Expand Down
20 changes: 20 additions & 0 deletions dev/local/ch-bench-read/oteldb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
prometheus:
bind: 0.0.0.0:9090
max_samples: 1_000_000
timeout: 1m
enable_negative_offset: true
health_check:
bind: 0.0.0.0:13133
otelcol:
receivers:
prometheusremotewrite:
endpoint: 0.0.0.0:19291
time_threshold: 100_000 # hours
exporters:
oteldbexporter:
dsn: clickhouse://clickhouse:9000
service:
pipelines:
metrics:
receivers: [prometheusremotewrite]
exporters: [oteldbexporter]

0 comments on commit 74fa703

Please sign in to comment.