Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clickhouse stat scraper #84

Merged
merged 6 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/ghcr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- id: tag_bump
name: Bump version and push tag
uses: anothrNick/[email protected]
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DEFAULT_BUMP: patch
RELEASE_BRANCHES: main
PRERELEASE: true
- name: Log in to the Container registry
uses: docker/[email protected]
with:
Expand All @@ -29,6 +37,7 @@ jobs:
ghcr.io/metrico/qryn-otel-collector
tags: |
latest
${{ steps.tag_bump.outputs.new_tag }}
- name: Build and push
uses: docker/[email protected]
with:
Expand Down
2 changes: 2 additions & 0 deletions cmd/otel-collector/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ import (

"github.com/metrico/otel-collector/exporter/clickhouseprofileexporter"
"github.com/metrico/otel-collector/exporter/qrynexporter"
"github.com/metrico/otel-collector/receiver/chstatsreceiver"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver"
)

Expand Down Expand Up @@ -300,6 +301,7 @@ func components() (otelcol.Factories, error) {
zipkinreceiver.NewFactory(),
zookeeperreceiver.NewFactory(),
pyroscopereceiver.NewFactory(),
chstatsreceiver.NewFactory(),
}
for _, rcv := range factories.Receivers {
receivers = append(receivers, rcv)
Expand Down
59 changes: 59 additions & 0 deletions receiver/chstatsreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Pyroscope Receiver

| Status | |
| ------------------------ |---------|
| Stability | [beta] |
| Supported pipeline types | metrics |

The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports metrics from ClickHouse databases. It uses the ClickHouse Go client library to connect to the database and execute SQL queries to retrieve metrics data. The module is designed to be highly configurable, allowing users to specify the database connection details, the SQL queries to execute, and the metrics to export.

## Configuration

- `dsn`: sets the Data Source Name (DSN) for the ClickHouse database.
The DSN is a string that contains the necessary information to connect to the database,
such as the host, port, and database name
- `queries`: list of the SQL queries that the receiver will execute against the database to retrieve metrics data.
The queries are specified as a list of strings.
- `timeout`: amount of time between two consecutive stats requests iterations.
The timeout is specified as the duration value like `20s`, `1m`, etc.

## Clickhouse Queries

Each clickhouse query should return two fields:
- labels as array of Tuple(String, String)
- value Float64

Labels should have the `__name__` label with the name of the metric.

For example
```sql
SELECT
[('__name__', 'some_metric'), ('label2', 'val2')]::Array(Tuple(String,String)),
2::Float64
```

## Example

```yaml
receivers:
chstatsreceiver:
dsn: clickhouse://localhost:9000
queries:
- |
SELECT [
('__name__', 'clickhouse_bytes_on_disk'), ('db', database), ('disk', disk_name), ('host', hostname())
],
sum(bytes_on_disk)::Float64
FROM system.parts
WHERE (active = 1) AND (database NOT IN ('system', '_system'))
GROUP BY database, disk_name
exporters:
prometheusremotewrite:
endpoint: http://localhost:3100/prom/remote/write
timeout: 30s
service:
pipelines:
metrics:
receivers: [chstatsreceiver]
exporters: [prometheusremotewrite]
```
33 changes: 33 additions & 0 deletions receiver/chstatsreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package chstatsreceiver

import (
"fmt"
"net/url"
"time"

"go.opentelemetry.io/collector/component"
)

// Represents the receiver config within the collector's config.yaml
type Config struct {
DSN string `mapstructure:"dsn"`
Timeout time.Duration `mapstructure:"timeout"`
Queries []string `mapstructure:"queries"`
}

var _ component.Config = (*Config)(nil)

// Checks that the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.Timeout < 15*time.Second {
return fmt.Errorf("timeout must be at least 15 seconds")
}
chDSN, err := url.Parse(cfg.DSN)
if err != nil {
return fmt.Errorf("invalid dsn: %w", err)
}
if chDSN.Scheme != "clickhouse" {
return fmt.Errorf("invalid dsn: scheme should be clickhouse://")
}
return nil
}
38 changes: 38 additions & 0 deletions receiver/chstatsreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package chstatsreceiver

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
typeStr = "chstatsreceiver"
defaultTimeout = 15 * time.Second
)

func createDefaultConfig() component.Config {
return &Config{
DSN: "",
Timeout: defaultTimeout,
Queries: []string{},
}
}

func createMetricsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
return &chReceiver{
cfg: cfg.(*Config),
logger: set.Logger,
consumer: consumer,
}, nil
}

func NewFactory() receiver.Factory {
return receiver.NewFactory(
component.MustNewType(typeStr),
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha))
}
143 changes: 143 additions & 0 deletions receiver/chstatsreceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package chstatsreceiver

import (
"bytes"
"context"
"fmt"
"text/template"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type chReceiver struct {
cfg *Config
db clickhouse.Conn
consumer consumer.Metrics
templates []*template.Template
logger *zap.Logger
cancel context.CancelFunc
ticker *time.Ticker
}

func (r *chReceiver) Start(ctx context.Context, _ component.Host) error {
opts, err := clickhouse.ParseDSN(r.cfg.DSN)
if err != nil {
return err
}
db, err := clickhouse.Open(opts)
if err != nil {
return err
}
r.db = db
r.templates = make([]*template.Template, len(r.cfg.Queries))
for i, query := range r.cfg.Queries {
r.templates[i], err = template.New(fmt.Sprintf("tpl-%d", i)).Parse(query)
if err != nil {
return err
}
}

_ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

r.ticker = time.NewTicker(r.cfg.Timeout)

go r.mainLoop(_ctx)
return nil
}

func (r *chReceiver) mainLoop(ctx context.Context) {
for {
r.logger.Info("tick start")
select {
case <-ctx.Done():
fmt.Println("tick stop")
return
case <-r.ticker.C:
err := r.GetMetrics(ctx)
if err != nil {
r.logger.Error("failed to get metrics", zap.Error(err))
}
}
r.logger.Info("tick end")
}
}

func (r *chReceiver) GetMetrics(ctx context.Context) error {
for _, tpl := range r.templates {
err := r.getMetricsTemplate(ctx, tpl)
if err != nil {
return err
}
}
return nil
}

func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Template) error {
queryBuf := bytes.Buffer{}
params := map[string]any{
"timestamp_ns": time.Now().UnixNano(),
"timestamp_ms": time.Now().UnixMilli(),
"timestamp_s": time.Now().Unix(),
}
err := tpl.Execute(&queryBuf, params)
wrapErr := func(err error) error {
return fmt.Errorf("failed to execute. Query: %s; error: %w", queryBuf.String(), err)
}
if err != nil {
return wrapErr(err)
}
rows, err := r.db.Query(ctx, queryBuf.String())
if err != nil {
return wrapErr(err)
}
defer rows.Close()
for rows.Next() {
var (
labels [][]string
value float64
)
err = rows.Scan(&labels, &value)
if err != nil {
return wrapErr(err)
}
metrics := pmetric.NewMetrics()
res := metrics.ResourceMetrics().AppendEmpty()
res.Resource().Attributes()
metric := res.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
data := metric.SetEmptyGauge().DataPoints().AppendEmpty()
for _, label := range labels {
if label[0] == "__name__" {
metric.SetName(label[1])
continue
}
data.Attributes().PutStr(label[0], label[1])
}
data.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
data.SetDoubleValue(value)
select {
case <-ctx.Done():
return nil
default:
err = r.consumer.ConsumeMetrics(ctx, metrics)
if err != nil {
return wrapErr(err)
}
}
}
return nil
}

func (r *chReceiver) Shutdown(_ context.Context) error {
fmt.Println("shutting down")
r.cancel()
r.ticker.Stop()
_ = r.db.Close()
return nil
}
Loading