Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre release #105

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 78 additions & 49 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -93,6 +94,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
payload_type := make([]string, 0)
payload := make([][]byte, 0)
tree := make([][]tuple, 0)
var pooledTrees []*PooledTree
functions := make([][]tuple, 0)

rl := ls.ResourceLogs()
Expand Down Expand Up @@ -179,7 +181,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
return 0, err
}

tree = append(tree, _tree)
pooledTrees = append(pooledTrees, _tree)
tree = append(tree, _tree.data)

idx = offset + s
ch.logger.Debug(
Expand Down Expand Up @@ -239,13 +242,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
return 0, err
}
err = b.Send()
for _, tpls := range tree {
for _, t := range tpls {
for _, v := range t[3].([]tuple) {
triples.put(v)
}
quadruples.put(t)
}
for _, tpls := range pooledTrees {
trees.put(tpls)
}
return offset, err
}
Expand Down Expand Up @@ -302,58 +300,90 @@ func readFunctionsFromMap(m pcommon.Map) ([]tuple, error) {
}

type LimitedPool struct {
m sync.RWMutex
pool *sync.Pool
size int
m sync.RWMutex
pool [20]*sync.Pool
createPool func() *sync.Pool
}

func (l *LimitedPool) get() tuple {
type PooledTree struct {
time time.Time
triplesCount int
data []tuple
triples []tuple
}

func (l *LimitedPool) get(quadruples int, triples int) *PooledTree {
l.m.Lock()
defer l.m.Unlock()
l.size--
if l.size < 0 {
l.size = 0
var pool *sync.Pool
if triples >= 20 {
pool = l.createPool()
} else if l.pool[triples] == nil {
l.pool[triples] = l.createPool()
pool = l.pool[triples]
} else {
pool = l.pool[triples]
}
tree := pool.Get().(*PooledTree)
var redo bool
if cap(tree.triples) < quadruples*triples {
tree.triples = make([]tuple, quadruples*triples)
for i := range tree.triples {
tree.triples[i] = tuple{nil, nil, nil}
}
redo = true
}
tree.triples = tree.triples[:quadruples*triples]
if cap(tree.data) < quadruples {
tree.data = make([]tuple, quadruples)
redo = true
}
return l.pool.Get().(tuple)
tree.data = tree.data[:quadruples]
if redo || tree.triplesCount != triples {
j := 0
for i := range tree.data {
_triples := tree.triples[j : j+triples]
j += triples
tree.data[i] = tuple{nil, nil, nil, _triples}
}
}
tree.triplesCount = triples
return tree
}

func (l *LimitedPool) put(t tuple) {
func (l *LimitedPool) put(t *PooledTree) {
l.m.Lock()
defer l.m.Unlock()
if l.size >= 100000 {
if t.triplesCount >= 20 {
return
}
l.size++
l.pool.Put(t)
}

var triples = LimitedPool{
pool: &sync.Pool{
New: func() interface{} {
return make(tuple, 3)
},
},
pool := l.pool[t.triplesCount]
if time.Now().Sub(t.time) < time.Minute {
pool.Put(t)
}
}

var quadruples = LimitedPool{
pool: &sync.Pool{
New: func() interface{} {
return make(tuple, 4)
},
var trees = LimitedPool{
createPool: func() *sync.Pool {
return &sync.Pool{
New: func() interface{} {
return &PooledTree{time: time.Now()}
},
}
},
}

func readTreeFromMap(m pcommon.Map) ([]tuple, error) {
func readTreeFromMap(m pcommon.Map) (*PooledTree, error) {
raw, _ := m.Get("tree")
bRaw := bytes.NewReader(raw.Bytes().AsRaw())
size, err := binary.ReadVarint(bRaw)
treeSize, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

res := make([]tuple, size)
var res *PooledTree

for i := int64(0); i < size; i++ {
for i := int64(0); i < treeSize; i++ {
parentId, err := binary.ReadUvarint(bRaw)
if err != nil {
return nil, err
Expand All @@ -374,8 +404,11 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) {
return nil, err
}

values := make([]tuple, size)
for i := range values {
if res == nil {
res = trees.get(int(treeSize), int(size))
}

for j := int64(0); j < size; j++ {
size, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
Expand All @@ -395,17 +428,13 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) {
if err != nil {
return nil, err
}

values[i] = triples.get() // tuple{name, self, total}
values[i][0] = name
values[i][1] = self
values[i][2] = total
res.data[i][3].([]tuple)[j][0] = name
res.data[i][3].([]tuple)[j][1] = self
res.data[i][3].([]tuple)[j][2] = total
}
res[i] = quadruples.get() // tuple{parentId, fnId, nodeId, values}
res[i][0] = parentId
res[i][1] = fnId
res[i][2] = nodeId
res[i][3] = values
res.data[i][0] = parentId
res.data[i][1] = fnId
res.data[i][2] = nodeId
}
return res, nil
}
13 changes: 11 additions & 2 deletions exporter/qrynexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@


# Configuration options:
- `dsn` (required): Data Source Name for Clickhouse.
- Example: `tcp://localhost:9000/qryn`

- `clustered_clickhouse` (required):
- Type: boolean
- Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`.

- `client_side_trace_processing` (required):
- Type: boolean
- Default: `true`
- Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage.

- `dsn` (required): Clickhouse's dsn.
- `clustered_clickhouse` (required): true if clickhouse cluster is used

# Example:
## Simple Trace Data
Expand Down
3 changes: 3 additions & 0 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side.
ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

// DSN is the ClickHouse server Data Source Name.
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
64 changes: 61 additions & 3 deletions exporter/qrynexporter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracesInputSQL = func(clustered bool) string {
tracesInputSQL = func(_ bool) string {
return `INSERT INTO traces_input (
trace_id,
span_id,
Expand Down Expand Up @@ -57,6 +57,40 @@ var (
}
)

func TracesV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces%s (
oid,
trace_id,
span_id,
parent_id,
name,
timestamp_ns,
duration_ns,
service_name,
payload_type,
payload)`, dist)
}

func TracesTagsV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s (
oid,
date,
key,
val,
trace_id,
span_id,
timestamp_ns,
duration)`, dist)
}

// Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
// We need to align with the schema here.
//
Expand All @@ -76,8 +110,8 @@ var (
//
// ) Engine=Null

// Trace represent trace model
type Trace struct {
// TraceInput represent trace model
type TraceInput struct {
TraceID string `ch:"trace_id"`
SpanID string `ch:"span_id"`
ParentID string `ch:"parent_id"`
Expand All @@ -90,6 +124,30 @@ type Trace struct {
Tags [][]string `ch:"tags"`
}

type TempoTrace struct {
OID string `ch:"oid"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
ParentID []byte `ch:"parent_id"`
Name string `ch:"name"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration_ns"`
ServiceName string `ch:"service_name"`
PayloadType int8 `ch:"payload_type"`
Payload string `ch:"payload"`
}

type TempoTraceTag struct {
OID string `ch:"oid"`
Date time.Time `ch:"date"`
Key string `ch:"key"`
Val string `ch:"val"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration"`
}

// Sample represent sample data
// `CREATE TABLE IF NOT EXISTS samples_v3
// (
Expand Down
Loading
Loading