Skip to content

Commit

Permalink
Merge pull request #50 from afzalabbasi/bugfix/panic-issue
Browse files Browse the repository at this point in the history
bugfix: fix panic issue in insert batch function
  • Loading branch information
akvlad authored Jan 23, 2024
2 parents f841751 + 02e4fbb commit a92c58f
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 17 deletions.
51 changes: 46 additions & 5 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package ch
import (
"context"
"fmt"
"strconv"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"strconv"
)

// schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
Expand All @@ -18,7 +17,11 @@ type clickhouseAccessNativeColumnar struct {

logger *zap.Logger
}

type AggsTuple struct {
Key string
Sum int64
Count int32
}
type tuple []any

// Connects to clickhouse and checks the connection's health, returning a new native client
Expand Down Expand Up @@ -72,6 +75,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
timestamp_ns := make([]uint64, sz)
typ := make([]string, sz)
service_name := make([]string, sz)
values_agg := make([][]tuple, sz)
sample_types_units := make([][]tuple, sz)
period_type := make([]string, sz)
period_unit := make([]string, sz)
Expand All @@ -89,7 +93,6 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
for i := 0; i < sz; i++ {
r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0)
m = r.Attributes()

timestamp_ns[i] = uint64(r.Timestamp())

tmp, _ = m.Get("type")
Expand All @@ -99,6 +102,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
service_name[i] = tmp.AsString()

sample_types, _ := m.Get("sample_types")

sample_units, _ := m.Get("sample_units")

sample_types_array, err := valueToStringArray(sample_types)
Expand All @@ -111,12 +115,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
return err
}

values_agg_raw, _ := m.Get("values_agg")

test, err := valueToStringArray(values_agg_raw)
if err != nil {
return err
}

values_agg_array, err := valueToTuples(test)
if err != nil {
return err
}
values_agg[i] = values_agg_array

sample_types_units_item := make([]tuple, len(sample_types_array))
for i, v := range sample_types_array {

sample_types_units_item[i] = tuple{v, sample_units_array[i]}
}
sample_types_units[i] = sample_types_units_item

tmp, _ = m.Get("period_type")
period_type[i] = tmp.AsString()

Expand Down Expand Up @@ -169,13 +186,37 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
if err := b.Column(8).Append(payload_type); err != nil {
return err
}

if err := b.Column(9).Append(payload); err != nil {
return err
}
if err := b.Column(10).Append(values_agg); err != nil {
return err
}
return b.Send()
}

// Closes the clickhouse connection pool
func (ch *clickhouseAccessNativeColumnar) Shutdown() error {
return ch.conn.Close()
}

func valueToTuples(input []string) ([]tuple, error) {
var tuples []tuple
// Type assert each item to the expected types
field1 := input[0]
field2 := input[1]
field3 := input[2]

count, _ := strconv.Atoi(field2)
sum, _ := strconv.Atoi(field3)
tuple := tuple{
field1,
int64(count),
int32(sum),
}

tuples = append(tuples, tuple)

return tuples, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -593,4 +593,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 // indirect
)
)
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2247,4 +2247,4 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 h1:TY79I5Y7xRB8q5LQ+MJn7NYsYi0VL5nj1QDrUHwK7cQ=
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw=
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw=
47 changes: 40 additions & 7 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package jfrparser
import (
"bytes"
"fmt"
"io"

pprof_proto "github.com/google/pprof/profile"
jfr_parser "github.com/grafana/jfr-parser/parser"
jfr_types "github.com/grafana/jfr-parser/parser/types"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
"go.uber.org/zap"
"io"
)

type sampleType uint8
Expand All @@ -32,10 +32,10 @@ type profileWrapper struct {

type jfrPprofParser struct {
jfrParser *jfr_parser.Parser

proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
logger *zap.Logger
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
}

var typetab = []profile_types.ProfileType{
Expand Down Expand Up @@ -204,12 +204,41 @@ func (pa *jfrPprofParser) addProfile(sampleType sampleType) *profileWrapper {
pprof: &pprof_proto.Profile{},
}
pa.proftab[sampleType] = pw

// add sample types and units to keep the pprof valid for libraries
for i, t := range pw.prof.Type.SampleType {
pa.appendSampleType(pw.pprof, t, pw.prof.Type.SampleUnit[i])
}
// Calculate values_agg based on the requirements
valuesAgg := calculateValuesAgg(pw.pprof)
pw.prof.ValueAggregation = valuesAgg

return pw

}
func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.Tuple {
var valuesAgg []profile_types.Tuple
// Loop through each sample type
for j, st := range samples.SampleType {
sum, count := calculateSumAndCount(samples, j)
valuesAgg = append(valuesAgg, profile_types.Tuple{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
}

return valuesAgg
}

func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) {
var sum int64
count := int32(len(samples.Sample))

for _, sample := range samples.Sample {
// Check if the sample has a value for the specified sample type
if sampleTypeIndex < len(sample.Value) {
// Accumulate the value for the specified sample type
sum += sample.Value[sampleTypeIndex]
}
}

return sum, count
}

func (pa *jfrPprofParser) appendSampleType(prof *pprof_proto.Profile, typ, unit string) {
Expand All @@ -228,7 +257,11 @@ func (pa *jfrPprofParser) getSample(sampleType sampleType, prof *pprof_proto.Pro
if !ok {
return nil
}
//if len(prof.Sample) <= int(i) {
// return nil
//}
return prof.Sample[i]

}

func (pa *jfrPprofParser) appendSample(sampleType sampleType, prof *pprof_proto.Profile, locations []*pprof_proto.Location, values []int64, externStacktraceRef uint32) {
Expand Down
23 changes: 23 additions & 0 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,22 +361,45 @@ func stringToAnyArray(s []string) []any {
}
return res
}
func entitiesToStrings(entities []profile_types.Tuple) []any {
var result []any
for _, entity := range entities {
result = append(result,
entity.Key,
fmt.Sprintf("%v", entity.Sum),
fmt.Sprintf("%v", entity.Count),
)
}
return result
}

func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {
m.PutStr("type", prof.Type.Type)
s := m.PutEmptySlice("sample_types")

err := s.FromRaw(stringToAnyArray(prof.Type.SampleType))
if err != nil {
return err
}

s = m.PutEmptySlice("sample_units")
err = s.FromRaw(stringToAnyArray(prof.Type.SampleUnit))
if err != nil {
return err
}
// Correct type assertion for []profile.Tuple
result := prof.ValueAggregation.([]profile_types.Tuple)
s = m.PutEmptySlice("values_agg")

err = s.FromRaw(entitiesToStrings(result))

if err != nil {
return err
}
m.PutStr("period_type", prof.Type.PeriodType)
m.PutStr("period_unit", prof.Type.PeriodUnit)
m.PutStr("payload_type", fmt.Sprint(prof.PayloadType))

return nil
}

Expand Down
13 changes: 10 additions & 3 deletions receiver/pyroscopereceiver/types/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ type ProfileType struct {

// Parser IR for profile processing
type ProfileIR struct {
Type ProfileType
Payload *bytes.Buffer
PayloadType PayloadType
Type ProfileType
Payload *bytes.Buffer
PayloadType PayloadType
ValueAggregation interface{}
}

type Tuple struct {
Key string
Sum int64
Count int32
}

0 comments on commit a92c58f

Please sign in to comment.