diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 8c37c0a..7daa99c 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -10,6 +10,7 @@ # Configuration options: - `dsn` (required): Clickhouse's dsn. +- `clustered_clickhouse` (required): true if clickhouse cluster is used # Example: ## Simple Trace Data @@ -21,6 +22,7 @@ receivers: exporters: qryn: dsn: tcp://localhost:9000/?database=cloki + clustered_clickhouse: false service: pipelines: diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index 05ebddb..d151780 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -31,6 +31,8 @@ type Config struct { // because only QueueSize is user-settable. QueueSettings QueueSettings `mapstructure:"sending_queue"` + ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` + // DSN is the ClickHouse server Data Source Name. // For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn). // For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn). diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index ccc636c..b0a1810 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -36,6 +36,7 @@ type logsExporter struct { attritubeLabels string resourceLabels string format string + cluster bool } func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { @@ -53,6 +54,7 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { format: cfg.Logs.Format, attritubeLabels: cfg.Logs.AttritubeLabels, resourceLabels: cfg.Logs.ResourceLabels, + cluster: cfg.ClusteredClickhouse, }, nil } @@ -428,11 +430,12 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - return batchSamplesAndTimeSeries(ctx, e.db, samples, timeSeries) + return batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries) } func batchSamplesAndTimeSeries(ctx context.Context, db clickhouse.Conn, samples []Sample, timeSeries []TimeSerie) error { - samplesBatch, err := db.PrepareBatch(ctx, samplesSQL) + isCluster := ctx.Value("cluster").(bool) + samplesBatch, err := db.PrepareBatch(ctx, samplesSQL(isCluster)) if err != nil { return err } @@ -445,7 +448,7 @@ func batchSamplesAndTimeSeries(ctx context.Context, db clickhouse.Conn, samples return err } - timeSeriesBatch, err := db.PrepareBatch(ctx, TimeSerieSQL) + timeSeriesBatch, err := db.PrepareBatch(ctx, TimeSerieSQL(isCluster)) if err != nil { return err } diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index 0d80794..dd127ef 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -31,6 +31,7 @@ type metricsExporter struct { db clickhouse.Conn namespace string + cluster bool } func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) { @@ -46,6 +47,7 @@ func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, erro logger: logger, db: db, namespace: cfg.Metrics.Namespace, + cluster: cfg.ClusteredClickhouse, }, nil } @@ -477,7 +479,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - return batchSamplesAndTimeSeries(ctx, e.db, samples, timeSeries) + return batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries) } // isValidAggregationTemporality checks whether an OTel metric has a valid diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index b9fb5db..fed1546 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -15,11 +15,13 @@ package qrynexporter import ( + "fmt" "time" ) -const ( - tracesInputSQL = `INSERT INTO traces_input ( +var ( + tracesInputSQL = func(clustered bool) string { + return `INSERT INTO traces_input ( trace_id, span_id, parent_id, @@ -30,16 +32,29 @@ const ( payload_type, payload, tags)` - samplesSQL = `INSERT INTO samples_v3 ( + } + samplesSQL = func(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO samples_v3%s ( fingerprint, timestamp_ns, value, - string)` - TimeSerieSQL = `INSERT INTO time_series ( + string)`, dist) + } + TimeSerieSQL = func(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO time_series%s ( date, fingerprint, labels, - name)` + name)`, dist) + } ) // Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index a7d4484..3ef0111 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -45,7 +45,8 @@ var delegate = &protojson.MarshalOptions{ type tracesExporter struct { logger *zap.Logger - db clickhouse.Conn + db clickhouse.Conn + cluster bool } // newTracesExporter returns a SpanWriter for the database @@ -59,8 +60,9 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) return nil, err } return &tracesExporter{ - logger: logger, - db: db, + logger: logger, + db: db, + cluster: cfg.ClusteredClickhouse, }, nil } @@ -152,7 +154,8 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - batch, err := e.db.PrepareBatch(ctx, tracesInputSQL) + isCluster := ctx.Value("cluster").(bool) + batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) if err != nil { return err } @@ -177,8 +180,9 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { + _ctx := context.WithValue(ctx, "cluster", e.cluster) start := time.Now() - if err := e.exportResourceSapns(ctx, td.ResourceSpans()); err != nil { + if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { return err } e.logger.Info("pushTraceData", zap.Int("spanCount", td.SpanCount()), zap.String("cost", time.Since(start).String()))