From e35202dac01a14f5787d1c7b4dd0b00923e19938 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:14:43 +0800 Subject: [PATCH] refactor: improve the code structure and documentation of qrynexporter. Rename the batchV2 struct to traceWithTagsBatch to improve code readability. Update the names of struct fields to make them more descriptive. Rename the traces_v2.go file to trace_batch_processor.go. Use a custom contextKey type in the pushTraceData function to resolve the SA1029 warning. Optimize README.md to provide more detailed configuration instructions. These changes are aimed at improving code quality, maintainability, and documentation clarity. --- exporter/qrynexporter/README.md | 14 +++-- exporter/qrynexporter/config.go | 3 +- exporter/qrynexporter/logs.go | 2 +- exporter/qrynexporter/metrics.go | 2 +- exporter/qrynexporter/schema.go | 10 ++-- ...{traces_v2.go => trace_batch_processor.go} | 51 ++++++++++--------- exporter/qrynexporter/traces.go | 41 ++++++++------- 7 files changed, 68 insertions(+), 55 deletions(-) rename exporter/qrynexporter/{traces_v2.go => trace_batch_processor.go} (61%) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 35527da..a844bb0 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -8,10 +8,18 @@ # Configuration options: +- `dsn` (required): Data Source Name for Clickhouse. + - Example: `tcp://localhost:9000/?database=cloki` + +- `clustered_clickhouse` (required): + - Type: boolean + - Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`. + +- `client_side_trace_processing` (required): + - Type: boolean + - Default: `true` + - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. -- `dsn` (required): Clickhouse's dsn. -- `clustered_clickhouse` (required): true if clickhouse cluster is used -- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index 4075e71..d0e2477 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,7 +30,8 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` - TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"` + // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. + ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index cc8eafc..7791965 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index c2f0fd4..d1c4618 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index 34dce51..c84c745 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -20,7 +20,7 @@ import ( ) var ( - tracesInputSQL = func(clustered bool) string { + tracesInputSQL = func(_ bool) string { return `INSERT INTO traces_input ( trace_id, span_id, @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string { // // ) Engine=Null -// Trace represent trace model -type Trace struct { +// TraceInput represent trace model +type TraceInput struct { TraceID string `ch:"trace_id"` SpanID string `ch:"span_id"` ParentID string `ch:"parent_id"` @@ -124,7 +124,7 @@ type Trace struct { Tags [][]string `ch:"tags"` } -type TraceV2 struct { +type TempoTrace struct { OID string `ch:"oid"` TraceID []byte `ch:"trace_id"` SpanID []byte `ch:"span_id"` @@ -137,7 +137,7 @@ type TraceV2 struct { Payload string `ch:"payload"` } -type TraceTagsV2 struct { +type TempoTraceTag struct { OID string `ch:"oid"` Date time.Time `ch:"date"` Key string `ch:"key"` diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/trace_batch_processor.go similarity index 61% rename from exporter/qrynexporter/traces_v2.go rename to exporter/qrynexporter/trace_batch_processor.go index 37da8a9..6fcde08 100644 --- a/exporter/qrynexporter/traces_v2.go +++ b/exporter/qrynexporter/trace_batch_processor.go @@ -3,60 +3,61 @@ package qrynexporter import ( "encoding/hex" "fmt" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) -type batchV2 struct { +type traceWithTagsBatch struct { driver.Batch - subBatch driver.Batch + tagsBatch driver.Batch } -func (b *batchV2) AppendStruct(data any) error { - _data, ok := data.(*Trace) +func (b *traceWithTagsBatch) AppendStruct(v any) error { + ti, ok := v.(*TraceInput) if !ok { - return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + return fmt.Errorf("invalid data type, expected *Trace, got %T", v) } - binTraceId, err := unhexAndPad(_data.TraceID, 16) + binTraceId, err := unhexAndPad(ti.TraceID, 16) if err != nil { return err } - binParentID, err := unhexAndPad(_data.ParentID, 8) + binParentID, err := unhexAndPad(ti.ParentID, 8) if err != nil { return err } - binSpanID, err := unhexAndPad(_data.SpanID, 8) + binSpanID, err := unhexAndPad(ti.SpanID, 8) if err != nil { return err } - trace := &TraceV2{ + trace := &TempoTrace{ OID: "0", TraceID: binTraceId, SpanID: binSpanID, ParentID: binParentID, - Name: _data.Name, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, - ServiceName: _data.ServiceName, - PayloadType: _data.PayloadType, - Payload: _data.Payload, + Name: ti.Name, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + ServiceName: ti.ServiceName, + PayloadType: ti.PayloadType, + Payload: ti.Payload, } err = b.Batch.AppendStruct(trace) if err != nil { return err } - for _, tag := range _data.Tags { - attr := &TraceTagsV2{ + for _, tag := range ti.Tags { + attr := &TempoTraceTag{ OID: "0", Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), Key: tag[0], Val: tag[1], TraceID: binTraceId, SpanID: binSpanID, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, } - err = b.subBatch.AppendStruct(attr) + err = b.tagsBatch.AppendStruct(attr) if err != nil { return err } @@ -64,10 +65,10 @@ func (b *batchV2) AppendStruct(data any) error { return nil } -func (b *batchV2) Abort() error { +func (b *traceWithTagsBatch) Abort() error { var errs [2]error errs[0] = b.Batch.Abort() - errs[1] = b.subBatch.Abort() + errs[1] = b.tagsBatch.Abort() for _, err := range errs { if err != nil { return err @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error { return nil } -func (b *batchV2) Send() error { +func (b *traceWithTagsBatch) Send() error { var errs [2]error errs[0] = b.Batch.Send() - errs[1] = b.subBatch.Send() + errs[1] = b.tagsBatch.Send() for _, err := range errs { if err != nil { return err diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index e33979c..fd56330 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -34,8 +34,11 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +type contextKey string + const ( - spanLinkDataFormat = "%s|%s|%s|%s|%d" + spanLinkDataFormat = "%s|%s|%s|%s|%d" + clusterKey contextKey = "cluster" ) var delegate = &protojson.MarshalOptions{ @@ -48,9 +51,9 @@ type tracesExporter struct { logger *zap.Logger meter metric.Meter - db clickhouse.Conn - cluster bool - v2 bool + db clickhouse.Conn + cluster bool + clientSide bool } // newTracesExporter returns a SpanWriter for the database @@ -64,11 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ - logger: logger, - meter: set.MeterProvider.Meter(typeStr), - db: db, - cluster: cfg.ClusteredClickhouse, - v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2, + logger: logger, + meter: set.MeterProvider.Meter(typeStr), + db: db, + cluster: cfg.ClusteredClickhouse, + clientSide: cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -165,11 +168,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - isCluster := ctx.Value("cluster").(bool) + isCluster := ctx.Value(clusterKey).(bool) var batch driver.Batch var err error - if e.v2 { - batch, err = e.prepareBatchV2(ctx) + if e.clientSide { + batch, err = e.prepareBatchClientSide(ctx) } else { batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) } @@ -196,7 +199,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } -func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { +func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) { batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) if err != nil { return nil, err @@ -206,15 +209,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro batch.Abort() return nil, err } - return &batchV2{ - Batch: batch, - subBatch: subBatch, + return &traceWithTagsBatch{ + Batch: batch, + tagsBatch: subBatch, }, nil } // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - _ctx := context.WithValue(ctx, "cluster", e.cluster) + _ctx := context.WithValue(ctx, clusterKey, e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) @@ -387,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte, return delegate.Marshal(otlpSpan) } -func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) { +func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) { durationNano := uint64(span.EndTimestamp() - span.StartTimestamp()) tags = aggregateSpanTags(span, tags) tags["service.name"] = serviceName @@ -404,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName return nil, fmt.Errorf("failed to marshal span: %w", err) } - trace := &Trace{ + trace := &TraceInput{ TraceID: span.TraceID().String(), SpanID: span.SpanID().String(), ParentID: span.ParentSpanID().String(),