Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/perf testing #728

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 90 additions & 17 deletions ci/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,53 @@ package main
import (
"fmt"
"path"
"time"
)

func (m *Ci) Etoe(test Optional[string]) *Container {
image := m.Build().ContainerImage("").
WithExposedPort(10000).
WithMountedFile("/etc/openmeter/config.yaml", dag.Host().File(path.Join(root(), "e2e", "config.yaml"))).
WithServiceBinding("kafka", dag.Kafka(KafkaOpts{Version: kafkaVersion}).Service()).
WithServiceBinding("clickhouse", clickhouse())
const (
perfTestDefaultConfig = "dagger.config.yaml"
)

api := image.
WithExposedPort(8080).
WithExec([]string{"openmeter", "--config", "/etc/openmeter/config.yaml"}).
AsService()
func (m *Ci) Perf(configName Optional[string]) *Container {
localStack := NewLocalStack(m.Source, m.Source.File(path.Join("perf", "configs", configName.GetOr(perfTestDefaultConfig))))

sinkWorker := image.
WithServiceBinding("redis", redis()).
WithServiceBinding("api", api). // Make sure api is up before starting sink worker
WithExec([]string{"openmeter-sink-worker", "--config", "/etc/openmeter/config.yaml"}).
AsService()
// build k6 tests with node20 and pnpm
testBuilder := dag.Container().
From("node:20-alpine").
WithMountedDirectory("/mnt/src", m.Source).
WithWorkdir("/mnt/src/perf/k6").
WithExec([]string{"pnpm", "install"}).
WithExec([]string{"pnpm", "build"})

k6Container := dag.Container().
From("grafana/k6:latest").
WithDirectory("/tests/dist", testBuilder.Directory("/mnt/src/perf/k6/dist")).
WithFile("/tests/run-all.sh", m.Source.File("perf/k6/run-all.sh")).
WithWorkdir("/tests").
WithServiceBinding("api", localStack.Api).
WithServiceBinding("clickhouse", localStack.Clickhouse).
WithEnvVariable("CLICKHOUSE_BASE_URL", "http://clickhouse:8123").
WithEnvVariable("OPENMETER_BASE_URL", "http://api:8888").
WithEnvVariable("OPENMETER_TELEMETRY_URL", "http://api:10000").
WithEntrypoint([]string{"/bin/ash", "-c"})

// seeding the system
dag.Container().
From("jeffail/benthos:latest").
WithMountedDirectory("/mnt/src", m.Source).
WithServiceBinding("api", localStack.Api).
WithEnvVariable("OPENMETER_BASE_URL", "http://api:8888").
WithEnvVariable("SEEDER_COUNT", "100").
WithExec([]string{"-c", "/mnt/src/perf/configs/seed.benthos.yaml"})

// TODO: add querying clickhouse to get the number of records instead
time.Sleep(10 * time.Second)

return k6Container.WithExec([]string{"./run-all.sh"})
}

func (m *Ci) Etoe(test Optional[string]) *Container {
localStack := NewLocalStack(m.Source, m.Source.File("e2e/config.yaml"))

args := []string{"go", "test", "-v"}

Expand All @@ -35,13 +63,58 @@ func (m *Ci) Etoe(test Optional[string]) *Container {
Container: goModule().
WithSource(m.Source).
Container().
WithServiceBinding("api", api).
WithServiceBinding("sink-worker", sinkWorker).
WithServiceBinding("api", localStack.Api).
WithServiceBinding("sink-worker", localStack.SinkWorker).
WithEnvVariable("OPENMETER_ADDRESS", "http://api:8080"),
}).
Exec(args)
}

type AppStack struct {
Api *Service
SinkWorker *Service
Clickhouse *Service
Redis *Service
Kafka *Service
}

func NewLocalStack(source *Directory, omConfig *File) *AppStack {
builder := &Build{
Source: source,
}

configPath := "/etc/openmeter/config.yaml"

kafka := dag.Kafka(KafkaOpts{Version: kafkaVersion}).Service()
clickhouse := clickhouse()
redis := redis()

base := builder.ContainerImage("").
WithExposedPort(10000).
WithMountedFile(configPath, omConfig).
WithServiceBinding("kafka", kafka).
WithServiceBinding("clickhouse", clickhouse)

api := base.
WithExposedPort(8080).
WithExec([]string{"openmeter", "--config", configPath}).
AsService()

sinkWorker := base.
WithServiceBinding("redis", redis).
WithServiceBinding("api", api). // Make sure api is up before starting sink worker
WithExec([]string{"openmeter-sink-worker", "--config", configPath}).
AsService()

return &AppStack{
Api: api,
SinkWorker: sinkWorker,
Clickhouse: clickhouse,
Redis: redis,
Kafka: kafka,
}
}

func clickhouse() *Service {
return dag.Container().
From(fmt.Sprintf("clickhouse/clickhouse-server:%s-alpine", clickhouseVersion)).
Expand Down
15 changes: 15 additions & 0 deletions docs/perftest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Running Performance Tests

Performance testing & benchmarking is a complicated topic that's hard to get right with synthetic environments.

We have some comparative benchmarks written in k6 under `perf/k6`. The measured numbers are highly dependent on the environment and the hardware they are run on. The numbers are not absolute, but they can give you a rough idea of performance.

## Running the tests

As the test results are highly dependent on the environment, the tests can be run separatly from any environment setup automations to allow for more controlled testing / further experimentation. Running on your local, the environments setup would consist of:

1. Running the OpenMeter stack. For the tests to work the meters defined in `perf/configs/config.json` must exist. The simplest way to achieve this is to run OpenMeter with that config file.
2. Seeding OpenMeter. `perf/configs/seed.benthos.yaml` generates data so that it matches the defined meters. Feel free to alter the SEEDER_COUNT to your liking, the default is 1M.
3. Running the tests. Check the `perf/k6` folder for the different options on how to run them.

Alternatively you can run the tests against a containerised configuration with dagger (`dagger call perf`).
3 changes: 3 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@

dagger
licensei

# perf testing
k6
];

env = {
Expand Down
43 changes: 43 additions & 0 deletions perf/configs/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
ingest:
kafka:
broker: localhost:29092

aggregation:
clickhouse:
address: localhost:9000

sink:
minCommitCount: 1000
maxCommitWait: 10s
namespaceRefetch: 1s

meters:
- slug: grouped_count_meter
description: Grouped Count Meter
eventType: etype
aggregation: COUNT
groupBy:
group1: $.group1
group2: $.group2

- slug: grouped_sum_meter
description: Grouped Sum Meter
eventType: etype
aggregation: SUM
valueProperty: $.value
groupBy:
group1: $.group1
group2: $.group2

- slug: grouped_avg_meter
description: Grouped Average Meter
eventType: etype
aggregation: AVG
valueProperty: $.value
groupBy:
group1: $.group1
group2: $.group2

portal:
enabled: true
tokenSecret: this-isnt-secure
55 changes: 55 additions & 0 deletions perf/configs/dagger.config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
address: 0.0.0.0:8080

telemetry:
address: 0.0.0.0:10000

ingest:
kafka:
broker: kafka:9092

aggregation:
clickhouse:
address: clickhouse:9000

sink:
minCommitCount: 1000
maxCommitWait: 1s
namespaceRefetch: 1s
dedupe:
enabled: true
driver: redis
config:
expiration: 10m
database: 1
address: redis:6379

meters:
- slug: grouped_count_meter
description: Grouped Count Meter
eventType: etype
aggregation: COUNT
groupBy:
group1: $.group1
group2: $.group2

- slug: grouped_sum_meter
description: Grouped Sum Meter
eventType: etype
aggregation: SUM
valueProperty: $.value
groupBy:
group1: $.group1
group2: $.group2

- slug: grouped_avg_meter
description: Grouped Average Meter
eventType: etype
aggregation: AVG
valueProperty: $.value
groupBy:
group1: $.group1
group2: $.group2

portal:
enabled: false
tokenSecret: this-isnt-secure
54 changes: 54 additions & 0 deletions perf/configs/seed.benthos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
input:
generate:
count: ${SEEDER_COUNT:1000000}
interval: "${SEEDER_INTERVAL:0ms}"
# batch_size: 1
mapping: |
let max_subjects = ${SEEDER_MAX_SUBJECTS:5}

let source = "seeder"
let group1_values = ["group1_value1", "group1_value2", "group1_value3"]
let group2_values = ["group2_value1", "group2_value2", "group2_value3"]


let subject = "subject-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects)
let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24)).ts_format()

let event_type = "etype"
let group1 = $group1_values.index(random_int(seed: timestamp_unix_nano()) % $group1_values.length())
let group2 = $group2_values.index(random_int(seed: timestamp_unix_nano()) % $group2_values.length())

let value = random_int(seed: timestamp_unix_nano(), max: 1000)

root = {
"id": uuid_v4(),
"specversion": "1.0",
"type": $event_type,
"source": $source,
"subject": $subject,
"time": $time,
"data": {
"value": $value,
"group1": $group1,
"group2": $group2,
},
}

output:
switch:
cases:
- check: ""
continue: true
output:
http_client:
url: ${OPENMETER_BASE_URL:http://127.0.0.1:8888}/api/v1/events
verb: POST
headers:
Content-Type: application/cloudevents+json
Authorization: "Bearer ${OPENMETER_TOKEN:}"
max_in_flight: 64

- check: '"${SEEDER_LOG:false}" == "true"'
output:
stdout:
codec: lines
4 changes: 4 additions & 0 deletions perf/k6/.babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"presets": ["@babel/env", "@babel/typescript"],
"plugins": ["@babel/proposal-class-properties", "@babel/proposal-object-rest-spread"]
}
Loading
Loading