diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 7daa99c..f22c2c9 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -8,9 +8,18 @@ # Configuration options: +- `dsn` (required): Data Source Name for Clickhouse. + - Example: `tcp://localhost:9000/qryn` + +- `clustered_clickhouse` (required): + - Type: boolean + - Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`. + +- `client_side_trace_processing` (required): + - Type: boolean + - Default: `true` + - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. -- `dsn` (required): Clickhouse's dsn. -- `clustered_clickhouse` (required): true if clickhouse cluster is used # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index c71651b..d0e2477 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,9 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` + // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. + ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` + ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` // DSN is the ClickHouse server Data Source Name. diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index cc8eafc..7791965 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index c2f0fd4..d1c4618 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index fed1546..c84c745 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -20,7 +20,7 @@ import ( ) var ( - tracesInputSQL = func(clustered bool) string { + tracesInputSQL = func(_ bool) string { return `INSERT INTO traces_input ( trace_id, span_id, @@ -57,6 +57,40 @@ var ( } ) +func TracesV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces%s ( + oid, + trace_id, + span_id, + parent_id, + name, + timestamp_ns, + duration_ns, + service_name, + payload_type, + payload)`, dist) +} + +func TracesTagsV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s ( + oid, + date, + key, + val, + trace_id, + span_id, + timestamp_ns, + duration)`, dist) +} + // Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js // We need to align with the schema here. // @@ -76,8 +110,8 @@ var ( // // ) Engine=Null -// Trace represent trace model -type Trace struct { +// TraceInput represent trace model +type TraceInput struct { TraceID string `ch:"trace_id"` SpanID string `ch:"span_id"` ParentID string `ch:"parent_id"` @@ -90,6 +124,30 @@ type Trace struct { Tags [][]string `ch:"tags"` } +type TempoTrace struct { + OID string `ch:"oid"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + ParentID []byte `ch:"parent_id"` + Name string `ch:"name"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration_ns"` + ServiceName string `ch:"service_name"` + PayloadType int8 `ch:"payload_type"` + Payload string `ch:"payload"` +} + +type TempoTraceTag struct { + OID string `ch:"oid"` + Date time.Time `ch:"date"` + Key string `ch:"key"` + Val string `ch:"val"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration"` +} + // Sample represent sample data // `CREATE TABLE IF NOT EXISTS samples_v3 // ( diff --git a/exporter/qrynexporter/trace_batch_processor.go b/exporter/qrynexporter/trace_batch_processor.go new file mode 100644 index 0000000..6fcde08 --- /dev/null +++ b/exporter/qrynexporter/trace_batch_processor.go @@ -0,0 +1,103 @@ +package qrynexporter + +import ( + "encoding/hex" + "fmt" + "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" +) + +type traceWithTagsBatch struct { + driver.Batch + tagsBatch driver.Batch +} + +func (b *traceWithTagsBatch) AppendStruct(v any) error { + ti, ok := v.(*TraceInput) + if !ok { + return fmt.Errorf("invalid data type, expected *Trace, got %T", v) + } + binTraceId, err := unhexAndPad(ti.TraceID, 16) + if err != nil { + return err + } + binParentID, err := unhexAndPad(ti.ParentID, 8) + if err != nil { + return err + } + binSpanID, err := unhexAndPad(ti.SpanID, 8) + if err != nil { + return err + } + trace := &TempoTrace{ + OID: "0", + TraceID: binTraceId, + SpanID: binSpanID, + ParentID: binParentID, + Name: ti.Name, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + ServiceName: ti.ServiceName, + PayloadType: ti.PayloadType, + Payload: ti.Payload, + } + err = b.Batch.AppendStruct(trace) + if err != nil { + return err + } + for _, tag := range ti.Tags { + attr := &TempoTraceTag{ + OID: "0", + Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), + Key: tag[0], + Val: tag[1], + TraceID: binTraceId, + SpanID: binSpanID, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + } + err = b.tagsBatch.AppendStruct(attr) + if err != nil { + return err + } + } + return nil +} + +func (b *traceWithTagsBatch) Abort() error { + var errs [2]error + errs[0] = b.Batch.Abort() + errs[1] = b.tagsBatch.Abort() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func (b *traceWithTagsBatch) Send() error { + var errs [2]error + errs[0] = b.Batch.Send() + errs[1] = b.tagsBatch.Send() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func unhexAndPad(s string, size int) ([]byte, error) { + bStr, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + if len(bStr) < size { + res := make([]byte, size) + copy(res[size-len(bStr):], bStr) + return res, nil + } + return bStr[size-len(bStr):], nil +} diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index 86dec6d..dfe6d7e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -34,8 +34,11 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +type contextKey string + const ( - spanLinkDataFormat = "%s|%s|%s|%s|%d" + spanLinkDataFormat = "%s|%s|%s|%s|%d" + clusterKey contextKey = "cluster" ) var delegate = &protojson.MarshalOptions{ @@ -48,8 +51,9 @@ type tracesExporter struct { logger *zap.Logger meter metric.Meter - db clickhouse.Conn - cluster bool + db clickhouse.Conn + cluster bool + clientSide bool } // newTracesExporter returns a SpanWriter for the database @@ -63,10 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ - logger: logger, - meter: set.MeterProvider.Meter(typeStr), - db: db, - cluster: cfg.ClusteredClickhouse, + logger: logger, + meter: set.MeterProvider.Meter(typeStr), + db: db, + cluster: cfg.ClusteredClickhouse, + clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -163,8 +168,15 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - isCluster := ctx.Value("cluster").(bool) - batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + isCluster := ctx.Value(clusterKey).(bool) + var batch driver.Batch + var err error + if e.clientSide { + batch, err = e.prepareBatchClientSide(ctx) + } else { + batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + } + if err != nil { return err } @@ -187,9 +199,25 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } +func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) { + batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) + if err != nil { + return nil, err + } + subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster)) + if err != nil { + batch.Abort() + return nil, err + } + return &traceWithTagsBatch{ + Batch: batch, + tagsBatch: subBatch, + }, nil +} + // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - _ctx := context.WithValue(ctx, "cluster", e.cluster) + _ctx := context.WithValue(ctx, clusterKey, e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) @@ -362,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte, return delegate.Marshal(otlpSpan) } -func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) { +func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) { durationNano := uint64(span.EndTimestamp() - span.StartTimestamp()) tags = aggregateSpanTags(span, tags) tags["service.name"] = serviceName @@ -379,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName return nil, fmt.Errorf("failed to marshal span: %w", err) } - trace := &Trace{ + trace := &TraceInput{ TraceID: span.TraceID().String(), SpanID: span.SpanID().String(), ParentID: span.ParentSpanID().String(),