Skip to content

Commit

Permalink
Merge pull request #70 from metrico/pre-release
Browse files Browse the repository at this point in the history
Pre release
  • Loading branch information
akvlad authored Feb 19, 2024
2 parents b214ae3 + 666cb6b commit a7399e2
Show file tree
Hide file tree
Showing 13 changed files with 1,031 additions and 548 deletions.
176 changes: 175 additions & 1 deletion exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package ch

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"strconv"
"sync"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -89,6 +92,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
duration_ns := make([]uint64, 0)
payload_type := make([]string, 0)
payload := make([][]byte, 0)
tree := make([][]tuple, 0)
functions := make([][]tuple, 0)

rl := ls.ResourceLogs()
var (
Expand Down Expand Up @@ -162,6 +167,20 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)

payload = append(payload, r.Body().Bytes().AsRaw())

_functions, err := readFunctionsFromMap(m)
if err != nil {
return 0, err
}

functions = append(functions, _functions)

_tree, err := readTreeFromMap(m)
if err != nil {
return 0, err
}

tree = append(tree, _tree)

idx = offset + s
ch.logger.Debug(
fmt.Sprintf("batch insert prepared row %d", idx),
Expand Down Expand Up @@ -212,7 +231,23 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
if err := b.Column(10).Append(values_agg); err != nil {
return 0, err
}
return offset, b.Send()

if err := b.Column(11).Append(tree); err != nil {
return 0, err
}
if err := b.Column(12).Append(functions); err != nil {
return 0, err
}
err = b.Send()
for _, tpls := range tree {
for _, t := range tpls {
for _, v := range t[3].([]tuple) {
triples.put(v)
}
quadruples.put(t)
}
}
return offset, err
}

// Closes the clickhouse connection pool
Expand All @@ -235,3 +270,142 @@ func valueAggToTuple(value *pcommon.Value) ([]tuple, error) {
}
return res, nil
}

func readFunctionsFromMap(m pcommon.Map) ([]tuple, error) {
raw, _ := m.Get("functions")
bRaw := bytes.NewReader(raw.Bytes().AsRaw())
size, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

res := make([]tuple, size)

for i := int64(0); i < size; i++ {
id, err := binary.ReadUvarint(bRaw)
if err != nil {
return nil, err
}
size, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

name := make([]byte, size)
_, err = bRaw.Read(name)
if err != nil {
return nil, err
}
res[i] = tuple{id, string(name)}
}
return res, nil
}

type LimitedPool struct {
m sync.RWMutex
pool *sync.Pool
size int
}

func (l *LimitedPool) get() tuple {
l.m.Lock()
defer l.m.Unlock()
l.size--
if l.size < 0 {
l.size = 0
}
return l.pool.Get().(tuple)
}

func (l *LimitedPool) put(t tuple) {
l.m.Lock()
defer l.m.Unlock()
if l.size >= 100000 {
return
}
l.size++
l.pool.Put(t)
}

var triples = LimitedPool{
pool: &sync.Pool{
New: func() interface{} {
return make(tuple, 3)
},
},
}

var quadruples = LimitedPool{
pool: &sync.Pool{
New: func() interface{} {
return make(tuple, 4)
},
},
}

func readTreeFromMap(m pcommon.Map) ([]tuple, error) {
raw, _ := m.Get("tree")
bRaw := bytes.NewReader(raw.Bytes().AsRaw())
size, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

res := make([]tuple, size)

for i := int64(0); i < size; i++ {
parentId, err := binary.ReadUvarint(bRaw)
if err != nil {
return nil, err
}

fnId, err := binary.ReadUvarint(bRaw)
if err != nil {
return nil, err
}

nodeId, err := binary.ReadUvarint(bRaw)
if err != nil {
return nil, err
}

size, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

values := make([]tuple, size)
for i := range values {
size, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}
name := make([]byte, size)
_, err = bRaw.Read(name)
if err != nil {
return nil, err
}

self, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

total, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

values[i] = triples.get() // tuple{name, self, total}
values[i][0] = name
values[i][1] = self
values[i][2] = total
}
res[i] = quadruples.get() // tuple{parentId, fnId, nodeId, values}
res[i][0] = parentId
res[i][1] = fnId
res[i][2] = nodeId
res[i][3] = values
}
return res, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ require (
)

require (
github.com/go-faster/city v1.0.1
github.com/open-telemetry/opentelemetry-collector-contrib/extension/asapauthextension v0.93.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy v0.93.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.93.0
Expand Down Expand Up @@ -339,7 +340,6 @@ require (
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
5 changes: 2 additions & 3 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([
values = [2]int64{1, 0}
)

pa.jfrParser = jfr_parser.NewParser(jfr.Bytes(), jfr_parser.Options{SymbolProcessor: nopSymbolProcessor})
pa.jfrParser = jfr_parser.NewParser(jfr.Bytes(), jfr_parser.Options{SymbolProcessor: processSyms})

if md.SampleRateHertz == 0 {
period = 1
Expand Down Expand Up @@ -118,6 +118,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)
pr.prof.Profile = pr.pprof

// Calculate values_agg based on the requirements
valuesAgg := calculateValuesAgg(pr.pprof)
Expand All @@ -128,8 +129,6 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([
return ps, nil
}

func nopSymbolProcessor(ref *jfr_types.SymbolList) {}

// TODO: hash location lists, merge-sort similar samples and free unused pprof objects
func (pa *jfrPprofParser) addStacktrace(sampleType sampleType, ref jfr_types.StackTraceRef, values []int64) {
pr := pa.getProfile(sampleType)
Expand Down
45 changes: 45 additions & 0 deletions receiver/pyroscopereceiver/jfrparser/syms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package jfrparser

import (
"regexp"

"github.com/grafana/jfr-parser/parser/types"
)

// reference: https://github.com/grafana/jfr-parser/blob/main/pprof/symbols.go

var (
// jdk/internal/reflect/GeneratedMethodAccessor31
generatedMethodAccessor = regexp.MustCompile(`^(jdk/internal/reflect/GeneratedMethodAccessor)(\d+)$`)

// org/example/rideshare/OrderService$$Lambda$669.0x0000000800fd7318.run
// Fib$$Lambda.0x00007ffa600c4da0.run
lambda = regexp.MustCompile(`^(.+\$\$Lambda)(\$?\d*[./](0x)?[\da-f]+|\d+)$`)

// libzstd-jni-1.5.1-16931311898282279136.so.Java_com_github_luben_zstd_ZstdInputStreamNoFinalizer_decompressStream
libzstd = regexp.MustCompile(`^(\.?/tmp/)?(libzstd-jni-\d+\.\d+\.\d+-)(\d+)(\.so)( \(deleted\))?$`)

// ./tmp/libamazonCorrettoCryptoProvider109b39cf33c563eb.so
// ./tmp/amazonCorrettoCryptoProviderNativeLibraries.7382c2f79097f415/libcrypto.so (deleted)
libcrypto = regexp.MustCompile(`^(\.?/tmp/)?(lib)?(amazonCorrettoCryptoProvider)(NativeLibraries\.)?([0-9a-f]{16})(/libcrypto|/libamazonCorrettoCryptoProvider)?(\.so)( \(deleted\))?$`)

// libasyncProfiler-linux-arm64-17b9a1d8156277a98ccc871afa9a8f69215f92.so
libasyncProfiler = regexp.MustCompile(`^(\.?/tmp/)?(libasyncProfiler)-(linux-arm64|linux-musl-x64|linux-x64|macos)-(17b9a1d8156277a98ccc871afa9a8f69215f92)(\.so)( \(deleted\))?$`)

// TODO: ./tmp/snappy-1.1.8-6fb9393a-3093-4706-a7e4-837efe01d078-libsnappyjava.so
)

func cleanse(frame string) string {
frame = generatedMethodAccessor.ReplaceAllString(frame, "${1}_")
frame = lambda.ReplaceAllString(frame, "${1}_")
frame = libzstd.ReplaceAllString(frame, "libzstd-jni-_.so")
frame = libcrypto.ReplaceAllString(frame, "libamazonCorrettoCryptoProvider_.so")
frame = libasyncProfiler.ReplaceAllString(frame, "libasyncProfiler-_.so")
return frame
}

func processSyms(ref *types.SymbolList) {
for i := range ref.Symbol {
ref.Symbol[i].String = cleanse(ref.Symbol[i].String)
}
}
Loading

0 comments on commit a7399e2

Please sign in to comment.