Skip to content

Commit

Permalink
refactor: improve the code structure and documentation of qrynexporter.
Browse files Browse the repository at this point in the history
Rename the batchV2 struct to traceWithTagsBatch to improve code readability.
Update the names of struct fields to make them more descriptive.
Rename the traces_v2.go file to trace_batch_processor.go.
Use a custom contextKey type in the pushTraceData function to resolve the SA1029 warning.
Optimize README.md to provide more detailed configuration instructions.
These changes are aimed at improving code quality, maintainability, and documentation clarity.
  • Loading branch information
Cluas committed Oct 21, 2024
1 parent c52cf38 commit e35202d
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 55 deletions.
14 changes: 11 additions & 3 deletions exporter/qrynexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@


# Configuration options:
- `dsn` (required): Data Source Name for Clickhouse.
- Example: `tcp://localhost:9000/?database=cloki`

- `clustered_clickhouse` (required):
- Type: boolean
- Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`.

- `client_side_trace_processing` (required):
- Type: boolean
- Default: `true`
- Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage.

- `dsn` (required): Clickhouse's dsn.
- `clustered_clickhouse` (required): true if clickhouse cluster is used
- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed

# Example:
## Simple Trace Data
Expand Down
3 changes: 2 additions & 1 deletion exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"`
// ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side.
ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
10 changes: 5 additions & 5 deletions exporter/qrynexporter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracesInputSQL = func(clustered bool) string {
tracesInputSQL = func(_ bool) string {
return `INSERT INTO traces_input (
trace_id,
span_id,
Expand Down Expand Up @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string {
//
// ) Engine=Null

// Trace represent trace model
type Trace struct {
// TraceInput represent trace model
type TraceInput struct {
TraceID string `ch:"trace_id"`
SpanID string `ch:"span_id"`
ParentID string `ch:"parent_id"`
Expand All @@ -124,7 +124,7 @@ type Trace struct {
Tags [][]string `ch:"tags"`
}

type TraceV2 struct {
type TempoTrace struct {
OID string `ch:"oid"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
Expand All @@ -137,7 +137,7 @@ type TraceV2 struct {
Payload string `ch:"payload"`
}

type TraceTagsV2 struct {
type TempoTraceTag struct {
OID string `ch:"oid"`
Date time.Time `ch:"date"`
Key string `ch:"key"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,72 @@ package qrynexporter
import (
"encoding/hex"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

type batchV2 struct {
type traceWithTagsBatch struct {
driver.Batch
subBatch driver.Batch
tagsBatch driver.Batch
}

func (b *batchV2) AppendStruct(data any) error {
_data, ok := data.(*Trace)
func (b *traceWithTagsBatch) AppendStruct(v any) error {
ti, ok := v.(*TraceInput)
if !ok {
return fmt.Errorf("invalid data type, expected *Trace, got %T", data)
return fmt.Errorf("invalid data type, expected *Trace, got %T", v)
}
binTraceId, err := unhexAndPad(_data.TraceID, 16)
binTraceId, err := unhexAndPad(ti.TraceID, 16)
if err != nil {
return err
}
binParentID, err := unhexAndPad(_data.ParentID, 8)
binParentID, err := unhexAndPad(ti.ParentID, 8)
if err != nil {
return err
}
binSpanID, err := unhexAndPad(_data.SpanID, 8)
binSpanID, err := unhexAndPad(ti.SpanID, 8)
if err != nil {
return err
}
trace := &TraceV2{
trace := &TempoTrace{
OID: "0",
TraceID: binTraceId,
SpanID: binSpanID,
ParentID: binParentID,
Name: _data.Name,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
ServiceName: _data.ServiceName,
PayloadType: _data.PayloadType,
Payload: _data.Payload,
Name: ti.Name,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
ServiceName: ti.ServiceName,
PayloadType: ti.PayloadType,
Payload: ti.Payload,
}
err = b.Batch.AppendStruct(trace)
if err != nil {
return err
}
for _, tag := range _data.Tags {
attr := &TraceTagsV2{
for _, tag := range ti.Tags {
attr := &TempoTraceTag{
OID: "0",
Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24),
Key: tag[0],
Val: tag[1],
TraceID: binTraceId,
SpanID: binSpanID,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
}
err = b.subBatch.AppendStruct(attr)
err = b.tagsBatch.AppendStruct(attr)
if err != nil {
return err
}
}
return nil
}

func (b *batchV2) Abort() error {
func (b *traceWithTagsBatch) Abort() error {
var errs [2]error
errs[0] = b.Batch.Abort()
errs[1] = b.subBatch.Abort()
errs[1] = b.tagsBatch.Abort()
for _, err := range errs {
if err != nil {
return err
Expand All @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error {
return nil
}

func (b *batchV2) Send() error {
func (b *traceWithTagsBatch) Send() error {
var errs [2]error
errs[0] = b.Batch.Send()
errs[1] = b.subBatch.Send()
errs[1] = b.tagsBatch.Send()
for _, err := range errs {
if err != nil {
return err
Expand Down
41 changes: 22 additions & 19 deletions exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

type contextKey string

const (
spanLinkDataFormat = "%s|%s|%s|%s|%d"
spanLinkDataFormat = "%s|%s|%s|%s|%d"
clusterKey contextKey = "cluster"
)

var delegate = &protojson.MarshalOptions{
Expand All @@ -48,9 +51,9 @@ type tracesExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn
cluster bool
v2 bool
db clickhouse.Conn
cluster bool
clientSide bool
}

// newTracesExporter returns a SpanWriter for the database
Expand All @@ -64,11 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings)
return nil, err
}
exp := &tracesExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2,
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
clientSide: cfg.ClientSideTraceProcessing,
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
Expand Down Expand Up @@ -165,11 +168,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) {
}

func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error {
isCluster := ctx.Value("cluster").(bool)
isCluster := ctx.Value(clusterKey).(bool)
var batch driver.Batch
var err error
if e.v2 {
batch, err = e.prepareBatchV2(ctx)
if e.clientSide {
batch, err = e.prepareBatchClientSide(ctx)
} else {
batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
}
Expand All @@ -196,7 +199,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans
return nil
}

func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) {
func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) {
batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster))
if err != nil {
return nil, err
Expand All @@ -206,15 +209,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro
batch.Abort()
return nil, err
}
return &batchV2{
Batch: batch,
subBatch: subBatch,
return &traceWithTagsBatch{
Batch: batch,
tagsBatch: subBatch,
}, nil
}

// traceDataPusher implements OTEL exporterhelper.traceDataPusher
func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
_ctx := context.WithValue(ctx, "cluster", e.cluster)
_ctx := context.WithValue(ctx, clusterKey, e.cluster)
start := time.Now()
if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces)))
Expand Down Expand Up @@ -387,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte,
return delegate.Marshal(otlpSpan)
}

func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) {
func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) {
durationNano := uint64(span.EndTimestamp() - span.StartTimestamp())
tags = aggregateSpanTags(span, tags)
tags["service.name"] = serviceName
Expand All @@ -404,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName
return nil, fmt.Errorf("failed to marshal span: %w", err)
}

trace := &Trace{
trace := &TraceInput{
TraceID: span.TraceID().String(),
SpanID: span.SpanID().String(),
ParentID: span.ParentSpanID().String(),
Expand Down

0 comments on commit e35202d

Please sign in to comment.