diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 7daa99c..35527da 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -11,6 +11,7 @@ - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used +- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index c71651b..4075e71 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,8 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` + TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"` + ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` // DSN is the ClickHouse server Data Source Name. diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index fed1546..34dce51 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -57,6 +57,40 @@ var ( } ) +func TracesV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces%s ( + oid, + trace_id, + span_id, + parent_id, + name, + timestamp_ns, + duration_ns, + service_name, + payload_type, + payload)`, dist) +} + +func TracesTagsV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s ( + oid, + date, + key, + val, + trace_id, + span_id, + timestamp_ns, + duration)`, dist) +} + // Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js // We need to align with the schema here. // @@ -90,6 +124,30 @@ type Trace struct { Tags [][]string `ch:"tags"` } +type TraceV2 struct { + OID string `ch:"oid"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + ParentID []byte `ch:"parent_id"` + Name string `ch:"name"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration_ns"` + ServiceName string `ch:"service_name"` + PayloadType int8 `ch:"payload_type"` + Payload string `ch:"payload"` +} + +type TraceTagsV2 struct { + OID string `ch:"oid"` + Date time.Time `ch:"date"` + Key string `ch:"key"` + Val string `ch:"val"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration"` +} + // Sample represent sample data // `CREATE TABLE IF NOT EXISTS samples_v3 // ( diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index 86dec6d..e33979c 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -50,6 +50,7 @@ type tracesExporter struct { db clickhouse.Conn cluster bool + v2 bool } // newTracesExporter returns a SpanWriter for the database @@ -67,6 +68,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, + v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -164,7 +166,14 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { isCluster := ctx.Value("cluster").(bool) - batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + var batch driver.Batch + var err error + if e.v2 { + batch, err = e.prepareBatchV2(ctx) + } else { + batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + } + if err != nil { return err } @@ -187,6 +196,22 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } +func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { + batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) + if err != nil { + return nil, err + } + subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster)) + if err != nil { + batch.Abort() + return nil, err + } + return &batchV2{ + Batch: batch, + subBatch: subBatch, + }, nil +} + // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { _ctx := context.WithValue(ctx, "cluster", e.cluster) diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/traces_v2.go new file mode 100644 index 0000000..37da8a9 --- /dev/null +++ b/exporter/qrynexporter/traces_v2.go @@ -0,0 +1,102 @@ +package qrynexporter + +import ( + "encoding/hex" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "time" +) + +type batchV2 struct { + driver.Batch + subBatch driver.Batch +} + +func (b *batchV2) AppendStruct(data any) error { + _data, ok := data.(*Trace) + if !ok { + return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + } + binTraceId, err := unhexAndPad(_data.TraceID, 16) + if err != nil { + return err + } + binParentID, err := unhexAndPad(_data.ParentID, 8) + if err != nil { + return err + } + binSpanID, err := unhexAndPad(_data.SpanID, 8) + if err != nil { + return err + } + trace := &TraceV2{ + OID: "0", + TraceID: binTraceId, + SpanID: binSpanID, + ParentID: binParentID, + Name: _data.Name, + TimestampNs: _data.TimestampNs, + DurationNs: _data.DurationNs, + ServiceName: _data.ServiceName, + PayloadType: _data.PayloadType, + Payload: _data.Payload, + } + err = b.Batch.AppendStruct(trace) + if err != nil { + return err + } + for _, tag := range _data.Tags { + attr := &TraceTagsV2{ + OID: "0", + Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), + Key: tag[0], + Val: tag[1], + TraceID: binTraceId, + SpanID: binSpanID, + TimestampNs: _data.TimestampNs, + DurationNs: _data.DurationNs, + } + err = b.subBatch.AppendStruct(attr) + if err != nil { + return err + } + } + return nil +} + +func (b *batchV2) Abort() error { + var errs [2]error + errs[0] = b.Batch.Abort() + errs[1] = b.subBatch.Abort() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func (b *batchV2) Send() error { + var errs [2]error + errs[0] = b.Batch.Send() + errs[1] = b.subBatch.Send() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func unhexAndPad(s string, size int) ([]byte, error) { + bStr, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + if len(bStr) < size { + res := make([]byte, size) + copy(res[size-len(bStr):], bStr) + return res, nil + } + return bStr[size-len(bStr):], nil +}