Skip to content

Commit

Permalink
traces ingestion V2
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 21, 2024
1 parent e432362 commit c52cf38
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 1 deletion.
1 change: 1 addition & 0 deletions exporter/qrynexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

- `dsn` (required): Clickhouse's dsn.
- `clustered_clickhouse` (required): true if clickhouse cluster is used
- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed

# Example:
## Simple Trace Data
Expand Down
2 changes: 2 additions & 0 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

// DSN is the ClickHouse server Data Source Name.
Expand Down
58 changes: 58 additions & 0 deletions exporter/qrynexporter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,40 @@ var (
}
)

func TracesV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces%s (
oid,
trace_id,
span_id,
parent_id,
name,
timestamp_ns,
duration_ns,
service_name,
payload_type,
payload)`, dist)
}

func TracesTagsV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s (
oid,
date,
key,
val,
trace_id,
span_id,
timestamp_ns,
duration)`, dist)
}

// Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
// We need to align with the schema here.
//
Expand Down Expand Up @@ -90,6 +124,30 @@ type Trace struct {
Tags [][]string `ch:"tags"`
}

type TraceV2 struct {
OID string `ch:"oid"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
ParentID []byte `ch:"parent_id"`
Name string `ch:"name"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration_ns"`
ServiceName string `ch:"service_name"`
PayloadType int8 `ch:"payload_type"`
Payload string `ch:"payload"`
}

type TraceTagsV2 struct {
OID string `ch:"oid"`
Date time.Time `ch:"date"`
Key string `ch:"key"`
Val string `ch:"val"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration"`
}

// Sample represent sample data
// `CREATE TABLE IF NOT EXISTS samples_v3
// (
Expand Down
27 changes: 26 additions & 1 deletion exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type tracesExporter struct {

db clickhouse.Conn
cluster bool
v2 bool
}

// newTracesExporter returns a SpanWriter for the database
Expand All @@ -67,6 +68,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings)
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2,
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
Expand Down Expand Up @@ -164,7 +166,14 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) {

func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error {
isCluster := ctx.Value("cluster").(bool)
batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
var batch driver.Batch
var err error
if e.v2 {
batch, err = e.prepareBatchV2(ctx)
} else {
batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
}

if err != nil {
return err
}
Expand All @@ -187,6 +196,22 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans
return nil
}

func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) {
batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster))
if err != nil {
return nil, err
}
subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster))
if err != nil {
batch.Abort()
return nil, err
}
return &batchV2{
Batch: batch,
subBatch: subBatch,
}, nil
}

// traceDataPusher implements OTEL exporterhelper.traceDataPusher
func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
_ctx := context.WithValue(ctx, "cluster", e.cluster)
Expand Down
102 changes: 102 additions & 0 deletions exporter/qrynexporter/traces_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package qrynexporter

import (
"encoding/hex"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"time"
)

type batchV2 struct {
driver.Batch
subBatch driver.Batch
}

func (b *batchV2) AppendStruct(data any) error {
_data, ok := data.(*Trace)
if !ok {
return fmt.Errorf("invalid data type, expected *Trace, got %T", data)
}
binTraceId, err := unhexAndPad(_data.TraceID, 16)
if err != nil {
return err
}
binParentID, err := unhexAndPad(_data.ParentID, 8)
if err != nil {
return err
}
binSpanID, err := unhexAndPad(_data.SpanID, 8)
if err != nil {
return err
}
trace := &TraceV2{
OID: "0",
TraceID: binTraceId,
SpanID: binSpanID,
ParentID: binParentID,
Name: _data.Name,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
ServiceName: _data.ServiceName,
PayloadType: _data.PayloadType,
Payload: _data.Payload,
}
err = b.Batch.AppendStruct(trace)
if err != nil {
return err
}
for _, tag := range _data.Tags {
attr := &TraceTagsV2{
OID: "0",
Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24),
Key: tag[0],
Val: tag[1],
TraceID: binTraceId,
SpanID: binSpanID,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
}
err = b.subBatch.AppendStruct(attr)
if err != nil {
return err
}
}
return nil
}

func (b *batchV2) Abort() error {
var errs [2]error
errs[0] = b.Batch.Abort()
errs[1] = b.subBatch.Abort()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}

func (b *batchV2) Send() error {
var errs [2]error
errs[0] = b.Batch.Send()
errs[1] = b.subBatch.Send()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}

func unhexAndPad(s string, size int) ([]byte, error) {
bStr, err := hex.DecodeString(s)
if err != nil {
return nil, err
}
if len(bStr) < size {
res := make([]byte, size)
copy(res[size-len(bStr):], bStr)
return res, nil
}
return bStr[size-len(bStr):], nil
}

0 comments on commit c52cf38

Please sign in to comment.