Skip to content

Commit

Permalink
Merge pull request #112 from metrico/feature/ch-logs-receiver
Browse files Browse the repository at this point in the history
Feature/ch logs receiver
  • Loading branch information
akvlad authored Nov 28, 2024
2 parents fe02616 + b1a6a3c commit ad25f5d
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 24 deletions.
57 changes: 50 additions & 7 deletions receiver/chstatsreceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
# Clickhouse Statistics Receiver

| Status | |
| ------------------------ |---------|
| Stability | [beta] |
| Supported pipeline types | metrics |
| Status | |
| ------------------------ |----------------|
| Stability | [beta] |
| Supported pipeline types | metrics, logs |

The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports
metrics and logs from ClickHouse databases.

It uses the ClickHouse Go client library to connect to the database and execute SQL queries
to retrieve metrics and log data.

The module is designed to be highly configurable, allowing users to specify the database connection details,
the SQL queries to execute, and the metrics and logs to export.

The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports metrics from ClickHouse databases. It uses the ClickHouse Go client library to connect to the database and execute SQL queries to retrieve metrics data. The module is designed to be highly configurable, allowing users to specify the database connection details, the SQL queries to execute, and the metrics to export.

## Configuration

- `dsn`: sets the Data Source Name (DSN) for the ClickHouse database.
The DSN is a string that contains the necessary information to connect to the database,
such as the host, port, and database name
- `type`: specifies the type of data to collect. Valid values are:
- `"metrics"`: collect metrics data (default if not specified)
- `"logs"`: collect log data
- `queries`: list of the SQL queries that the receiver will execute against the database to retrieve metrics data.
The queries are specified as a list of strings.
- `timeout`: amount of time between two consecutive stats requests iterations.
The timeout is specified as the duration value like `20s`, `1m`, etc.

## Clickhouse Queries

### For Metrics (type: "metrics")
Each clickhouse query should return two fields:
- labels as array of Tuple(String, String)
- value Float64
Expand All @@ -32,12 +44,27 @@ SELECT
2::Float64
```

### For Logs (type: "logs")
Queries for logs should return two fields:
- labels as array of Tuple(String, String)
- message String

The receiver will automatically convert the query results into log records.

```sql
SELECT
[('level', 'debug'), ('label2', 'val2')]::Array(Tuple(String,String)),
'log line to send'
```

## Example

```yaml
receivers:
chstatsreceiver:
chstatsreceiver/metrics:
dsn: clickhouse://localhost:9000
type: metrics
timeout: 30s
queries:
- |
SELECT [
Expand All @@ -47,13 +74,29 @@ receivers:
FROM system.parts
WHERE (active = 1) AND (database NOT IN ('system', '_system'))
GROUP BY database, disk_name
chstatsreceiver/logs:
dsn: clickhouse://localhost:9000
type: logs
timeout: 1m
queries:
- |
SELECT
[('job', 'clickhouse_query_logs')],
format('id={} query={}', query_id, query)
FROM system.query_id
WHERE event_time > now() - INTERVAL 1 MINUTE
exporters:
prometheusremotewrite:
endpoint: http://localhost:3100/prom/remote/write
timeout: 30s
loki:
endpoint: http://localhost:3100/loki/api/v1/push
service:
pipelines:
metrics:
receivers: [chstatsreceiver]
receivers: [chstatsreceiver/metrics]
exporters: [prometheusremotewrite]
logs:
receivers: [chstatsreceiver/logs]
exporters: [loki]
```
6 changes: 6 additions & 0 deletions receiver/chstatsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ import (
"go.opentelemetry.io/collector/component"
)

const (
RCV_TYPE_METRICS = "metrics"
RCV_TYPE_LOGS = "logs"
)

// Represents the receiver config within the collector's config.yaml
type Config struct {
DSN string `mapstructure:"dsn"`
Type string `mapstructure:"type"`
Timeout time.Duration `mapstructure:"timeout"`
Queries []string `mapstructure:"queries"`
}
Expand Down
17 changes: 13 additions & 4 deletions receiver/chstatsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@ func createDefaultConfig() component.Config {

func createMetricsReceiver(_ context.Context, set receiver.Settings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
return &chReceiver{
cfg: cfg.(*Config),
logger: set.Logger,
consumer: consumer,
cfg: cfg.(*Config),
logger: set.Logger,
metricsConsumer: consumer,
}, nil
}

func createLogsReceiver(_ context.Context, set receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
return &chReceiver{
cfg: cfg.(*Config),
logger: set.Logger,
logsConsumer: consumer,
}, nil
}

func NewFactory() receiver.Factory {
return receiver.NewFactory(
component.MustNewType(typeStr),
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha))
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha),
receiver.WithLogs(createLogsReceiver, component.StabilityLevelAlpha))
}
88 changes: 75 additions & 13 deletions receiver/chstatsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"text/template"
"time"

"go.opentelemetry.io/collector/pdata/plog"
"golang.org/x/sync/errgroup"

"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -16,13 +19,14 @@ import (
)

type chReceiver struct {
cfg *Config
db clickhouse.Conn
consumer consumer.Metrics
templates []*template.Template
logger *zap.Logger
cancel context.CancelFunc
ticker *time.Ticker
cfg *Config
db clickhouse.Conn
metricsConsumer consumer.Metrics
logsConsumer consumer.Logs
templates []*template.Template
logger *zap.Logger
cancel context.CancelFunc
ticker *time.Ticker
}

func (r *chReceiver) Start(ctx context.Context, _ component.Host) error {
Expand Down Expand Up @@ -70,13 +74,21 @@ func (r *chReceiver) mainLoop(ctx context.Context) {
}

func (r *chReceiver) GetMetrics(ctx context.Context) error {
g := errgroup.Group{}
for _, tpl := range r.templates {
err := r.getMetricsTemplate(ctx, tpl)
if err != nil {
return err
}
_tpl := tpl
g.Go(func() error {
switch r.cfg.Type {
case RCV_TYPE_METRICS:
case "":
return r.getMetricsTemplate(ctx, _tpl)
case RCV_TYPE_LOGS:
return r.getLogsTemplate(ctx, _tpl)
}
return nil
})
}
return nil
return g.Wait()
}

func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Template) error {
Expand Down Expand Up @@ -125,7 +137,57 @@ func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Templ
case <-ctx.Done():
return nil
default:
err = r.consumer.ConsumeMetrics(ctx, metrics)
err = r.metricsConsumer.ConsumeMetrics(ctx, metrics)
if err != nil {
return wrapErr(err)
}
}
}
return nil
}

func (r *chReceiver) getLogsTemplate(ctx context.Context, tpl *template.Template) error {
queryBuf := bytes.Buffer{}
params := map[string]any{
"timestamp_ns": time.Now().UnixNano(),
"timestamp_ms": time.Now().UnixMilli(),
"timestamp_s": time.Now().Unix(),
}
err := tpl.Execute(&queryBuf, params)
wrapErr := func(err error) error {
return fmt.Errorf("failed to execute. Query: %s; error: %w", queryBuf.String(), err)
}
if err != nil {
return wrapErr(err)
}
rows, err := r.db.Query(ctx, queryBuf.String())
if err != nil {
return wrapErr(err)
}
defer rows.Close()
for rows.Next() {
var (
labels [][]string
value string
)
err = rows.Scan(&labels, &value)
if err != nil {
return wrapErr(err)
}
logs := plog.NewLogs()
res := logs.ResourceLogs().AppendEmpty()
res.Resource().Attributes()
log := res.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
for _, label := range labels {
log.Attributes().PutStr(label[0], label[1])
}
log.Body().SetStr(value)
log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
select {
case <-ctx.Done():
return nil
default:
err = r.logsConsumer.ConsumeLogs(ctx, logs)
if err != nil {
return wrapErr(err)
}
Expand Down

0 comments on commit ad25f5d

Please sign in to comment.