Skip to content

Commit

Permalink
qrynexporter: add insert duration hist
Browse files Browse the repository at this point in the history
  • Loading branch information
tomershafir committed Mar 20, 2024
1 parent a7399e2 commit b47bbfe
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 19 deletions.
2 changes: 1 addition & 1 deletion config/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ processors:

exporters:
qryn:
dsn: tcp://clickhouse-server:9000/qryn?username=qryn&password=demo
dsn: clickhouse://0.0.0.0:9000/qryn
timeout: 10s
sending_queue:
queue_size: 100
Expand Down
10 changes: 5 additions & 5 deletions exporter/qrynexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ func createDefaultConfig() component.Config {
// Traces are directly insert into clickhouse.
func createTracesExporter(
ctx context.Context,
params exporter.CreateSettings,
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
c := cfg.(*Config)
oce, err := newTracesExporter(params.Logger, c)
oce, err := newTracesExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn traces exporter: %w", err)
}

return exporterhelper.NewTracesExporter(
ctx,
params,
set,
cfg,
oce.pushTraceData,
exporterhelper.WithShutdown(oce.Shutdown),
Expand All @@ -81,7 +81,7 @@ func createLogsExporter(
cfg component.Config,
) (exporter.Logs, error) {
c := cfg.(*Config)
exporter, err := newLogsExporter(set.Logger, c)
exporter, err := newLogsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand All @@ -106,7 +106,7 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
c := cfg.(*Config)
exporter, err := newMetricsExporter(set.Logger, c)
exporter, err := newMetricsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand Down
20 changes: 15 additions & 5 deletions exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-logfmt/logfmt"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -30,6 +32,7 @@ const (

type logsExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn

Expand All @@ -39,7 +42,7 @@ type logsExporter struct {
cluster bool
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
func newLogsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*logsExporter, error) {
opts, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
Expand All @@ -48,14 +51,20 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
if err != nil {
return nil, err
}
return &logsExporter{
exp := &logsExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
format: cfg.Logs.Format,
attributeLabels: cfg.Logs.AttributeLabels,
resourceLabels: cfg.Logs.ResourceLabels,
cluster: cfg.ClusteredClickhouse,
}, nil
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

// Shutdown will shutdown the exporter.
Expand Down Expand Up @@ -433,11 +442,12 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}

otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeLogs)))
e.logger.Debug("pushLogsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

Expand Down
20 changes: 15 additions & 5 deletions exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/value"
"go.opentelemetry.io/collector/exporter"
conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -28,14 +30,15 @@ const (

type metricsExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn

namespace string
cluster bool
}

func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) {
func newMetricsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*metricsExporter, error) {
opts, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
Expand All @@ -44,12 +47,18 @@ func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, erro
if err != nil {
return nil, err
}
return &metricsExporter{
exp := &metricsExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
namespace: cfg.Metrics.Namespace,
cluster: cfg.ClusteredClickhouse,
}, nil
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

// Shutdown will shutdown the exporter.
Expand Down Expand Up @@ -483,11 +492,12 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}

otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeMetrics)))
e.logger.Debug("pushMetricsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

Expand Down
43 changes: 43 additions & 0 deletions exporter/qrynexporter/self_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package qrynexporter

import (
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
prefix = "exporter_qryn_"

errorCodeError = "1"
errorCodeSuccess = ""

dataTypeLogs = "logs"
dataTypeMetrics = "metrics"
dataTypeTraces = "traces"
)

var (
otelcolExporterQrynBatchInsertDurationMillis metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
var err error
if otelcolExporterQrynBatchInsertDurationMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "batch_insert_duration_millis"),
metric.WithDescription("Qryn exporter batch insert duration in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
}
return nil
}

func newOtelcolAttrSetBatch(errorCode string, dataType string) *attribute.Set {
s := attribute.NewSet(
attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)},
attribute.KeyValue{Key: "data_type", Value: attribute.StringValue(dataType)},
)
return &s
}
18 changes: 15 additions & 3 deletions exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.opentelemetry.io/otel/metric"
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
Expand All @@ -44,13 +46,14 @@ var delegate = &protojson.MarshalOptions{
// tracesExporter for writing spans to ClickHouse
type tracesExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn
cluster bool
}

// newTracesExporter returns a SpanWriter for the database
func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) {
func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*tracesExporter, error) {
opts, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
Expand All @@ -59,11 +62,17 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error)
if err != nil {
return nil, err
}
return &tracesExporter{
exp := &tracesExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
}, nil
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

func (e *tracesExporter) exportScopeSpans(serviceName string,
Expand Down Expand Up @@ -183,8 +192,11 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
_ctx := context.WithValue(ctx, "cluster", e.cluster)
start := time.Now()
if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeTraces)))
e.logger.Debug("pushTraceData", zap.Int("spanCount", td.SpanCount()), zap.String("cost", time.Since(start).String()))
return nil
}
Expand Down

0 comments on commit b47bbfe

Please sign in to comment.