diff --git a/exporter/qrynexporter/factory.go b/exporter/qrynexporter/factory.go index e7d8f26..fff354a 100644 --- a/exporter/qrynexporter/factory.go +++ b/exporter/qrynexporter/factory.go @@ -52,18 +52,18 @@ func createDefaultConfig() component.Config { // Traces are directly insert into clickhouse. func createTracesExporter( ctx context.Context, - params exporter.CreateSettings, + set exporter.CreateSettings, cfg component.Config, ) (exporter.Traces, error) { c := cfg.(*Config) - oce, err := newTracesExporter(params.Logger, c) + oce, err := newTracesExporter(set.Logger, c, &set) if err != nil { return nil, fmt.Errorf("cannot configure qryn traces exporter: %w", err) } return exporterhelper.NewTracesExporter( ctx, - params, + set, cfg, oce.pushTraceData, exporterhelper.WithShutdown(oce.Shutdown), @@ -81,7 +81,7 @@ func createLogsExporter( cfg component.Config, ) (exporter.Logs, error) { c := cfg.(*Config) - exporter, err := newLogsExporter(set.Logger, c) + exporter, err := newLogsExporter(set.Logger, c, &set) if err != nil { return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err) } @@ -106,7 +106,7 @@ func createMetricsExporter( cfg component.Config, ) (exporter.Metrics, error) { c := cfg.(*Config) - exporter, err := newMetricsExporter(set.Logger, c) + exporter, err := newMetricsExporter(set.Logger, c, &set) if err != nil { return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err) } diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index ff9a790..8a5bf50 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -10,8 +10,10 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/go-logfmt/logfmt" "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -30,6 +32,7 @@ const ( type logsExporter struct { logger *zap.Logger + meter metric.Meter db clickhouse.Conn @@ -39,7 +42,7 @@ type logsExporter struct { cluster bool } -func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { +func newLogsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*logsExporter, error) { opts, err := clickhouse.ParseDSN(cfg.DSN) if err != nil { return nil, err @@ -48,14 +51,20 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { if err != nil { return nil, err } - return &logsExporter{ + exp := &logsExporter{ logger: logger, + meter: set.MeterProvider.Meter(typeStr), db: db, format: cfg.Logs.Format, attributeLabels: cfg.Logs.AttributeLabels, resourceLabels: cfg.Logs.ResourceLabels, cluster: cfg.ClusteredClickhouse, - }, nil + } + if err := initMetrics(exp.meter); err != nil { + exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) + return exp, err + } + return exp, nil } // Shutdown will shutdown the exporter. @@ -433,11 +442,12 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) + e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } - + otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeLogs))) e.logger.Debug("pushLogsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String())) - return nil } diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index 5c9ae49..08da224 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -13,9 +13,11 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/value" + "go.opentelemetry.io/collector/exporter" conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -28,6 +30,7 @@ const ( type metricsExporter struct { logger *zap.Logger + meter metric.Meter db clickhouse.Conn @@ -35,7 +38,7 @@ type metricsExporter struct { cluster bool } -func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) { +func newMetricsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*metricsExporter, error) { opts, err := clickhouse.ParseDSN(cfg.DSN) if err != nil { return nil, err @@ -44,12 +47,18 @@ func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, erro if err != nil { return nil, err } - return &metricsExporter{ + exp := &metricsExporter{ logger: logger, + meter: set.MeterProvider.Meter(typeStr), db: db, namespace: cfg.Metrics.Namespace, cluster: cfg.ClusteredClickhouse, - }, nil + } + if err := initMetrics(exp.meter); err != nil { + exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) + return exp, err + } + return exp, nil } // Shutdown will shutdown the exporter. @@ -483,11 +492,12 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) + e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } - + otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeMetrics))) e.logger.Debug("pushMetricsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String())) - return nil } diff --git a/exporter/qrynexporter/self_metrics.go b/exporter/qrynexporter/self_metrics.go new file mode 100644 index 0000000..ee054ab --- /dev/null +++ b/exporter/qrynexporter/self_metrics.go @@ -0,0 +1,43 @@ +package qrynexporter + +import ( + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + prefix = "exporter_qryn_" + + errorCodeError = "1" + errorCodeSuccess = "" + + dataTypeLogs = "logs" + dataTypeMetrics = "metrics" + dataTypeTraces = "traces" +) + +var ( + otelcolExporterQrynBatchInsertDurationMillis metric.Int64Histogram +) + +func initMetrics(meter metric.Meter) error { + var err error + if otelcolExporterQrynBatchInsertDurationMillis, err = meter.Int64Histogram( + fmt.Sprint(prefix, "batch_insert_duration_millis"), + metric.WithDescription("Qryn exporter batch insert duration in millis"), + metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), + ); err != nil { + return err + } + return nil +} + +func newOtelcolAttrSetBatch(errorCode string, dataType string) *attribute.Set { + s := attribute.NewSet( + attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)}, + attribute.KeyValue{Key: "data_type", Value: attribute.StringValue(dataType)}, + ) + return &s +} diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index 0b47bc4..689d514 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -23,9 +23,11 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "go.opentelemetry.io/otel/metric" commonv1 "go.opentelemetry.io/proto/otlp/common/v1" tracev1 "go.opentelemetry.io/proto/otlp/trace/v1" "go.uber.org/zap" @@ -44,13 +46,14 @@ var delegate = &protojson.MarshalOptions{ // tracesExporter for writing spans to ClickHouse type tracesExporter struct { logger *zap.Logger + meter metric.Meter db clickhouse.Conn cluster bool } // newTracesExporter returns a SpanWriter for the database -func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) { +func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*tracesExporter, error) { opts, err := clickhouse.ParseDSN(cfg.DSN) if err != nil { return nil, err @@ -59,11 +62,17 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) if err != nil { return nil, err } - return &tracesExporter{ + exp := &tracesExporter{ logger: logger, + meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, - }, nil + } + if err := initMetrics(exp.meter); err != nil { + exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) + return exp, err + } + return exp, nil } func (e *tracesExporter) exportScopeSpans(serviceName string, @@ -183,8 +192,11 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er _ctx := context.WithValue(ctx, "cluster", e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { + otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) + e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } + otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeTraces))) e.logger.Debug("pushTraceData", zap.Int("spanCount", td.SpanCount()), zap.String("cost", time.Since(start).String())) return nil }