Skip to content

Commit

Permalink
fix: binary data send instead of stringification; test fix; golint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Jan 23, 2024
1 parent a92c58f commit 797d27a
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 54 deletions.
51 changes: 22 additions & 29 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package ch
import (
"context"
"fmt"
"strconv"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"strconv"
)

// schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
Expand Down Expand Up @@ -115,19 +116,15 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
return err
}

values_agg_raw, _ := m.Get("values_agg")

test, err := valueToStringArray(values_agg_raw)
if err != nil {
return err
values_agg_raw, ok := m.Get("values_agg")
if ok {
values_agg_tuple, err := valueAggToTuple(&values_agg_raw)
if err != nil {
return err
}
values_agg[i] = append(values_agg[i], values_agg_tuple...)
}

values_agg_array, err := valueToTuples(test)
if err != nil {
return err
}
values_agg[i] = values_agg_array

sample_types_units_item := make([]tuple, len(sample_types_array))
for i, v := range sample_types_array {

Expand Down Expand Up @@ -201,22 +198,18 @@ func (ch *clickhouseAccessNativeColumnar) Shutdown() error {
return ch.conn.Close()
}

func valueToTuples(input []string) ([]tuple, error) {
var tuples []tuple
// Type assert each item to the expected types
field1 := input[0]
field2 := input[1]
field3 := input[2]

count, _ := strconv.Atoi(field2)
sum, _ := strconv.Atoi(field3)
tuple := tuple{
field1,
int64(count),
int32(sum),
func valueAggToTuple(value *pcommon.Value) ([]tuple, error) {
var res []tuple
for _, value_agg_any := range value.AsRaw().([]any) {
value_agg_any_array, ok := value_agg_any.([]any)
if !ok || len(value_agg_any_array) != 3 {
return nil, fmt.Errorf("failed to convert value_agg to tuples")
}
res = append(res, tuple{
value_agg_any_array[0],
value_agg_any_array[1],
int32(value_agg_any_array[2].(int64)),
})
}

tuples = append(tuples, tuple)

return tuples, nil
return res, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -593,4 +593,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 // indirect
)
)
33 changes: 16 additions & 17 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package jfrparser
import (
"bytes"
"fmt"
"io"

pprof_proto "github.com/google/pprof/profile"
jfr_parser "github.com/grafana/jfr-parser/parser"
jfr_types "github.com/grafana/jfr-parser/parser/types"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
"go.uber.org/zap"
"io"
)

type sampleType uint8
Expand All @@ -32,7 +32,6 @@ type profileWrapper struct {

type jfrPprofParser struct {
jfrParser *jfr_parser.Parser
logger *zap.Logger
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
Expand Down Expand Up @@ -112,12 +111,18 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([

ps := make([]profile_types.ProfileIR, 0)
for _, pr := range pa.proftab {
if nil != pr {
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)
ps = append(ps, pr.prof)
if nil == pr {
continue
}
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)

// Calculate values_agg based on the requirements
valuesAgg := calculateValuesAgg(pr.pprof)
pr.prof.ValueAggregation = valuesAgg

ps = append(ps, pr.prof)
}
return ps, nil
}
Expand Down Expand Up @@ -208,19 +213,16 @@ func (pa *jfrPprofParser) addProfile(sampleType sampleType) *profileWrapper {
for i, t := range pw.prof.Type.SampleType {
pa.appendSampleType(pw.pprof, t, pw.prof.Type.SampleUnit[i])
}
// Calculate values_agg based on the requirements
valuesAgg := calculateValuesAgg(pw.pprof)
pw.prof.ValueAggregation = valuesAgg

return pw

}
func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.Tuple {
var valuesAgg []profile_types.Tuple
func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.SampleType {
var valuesAgg []profile_types.SampleType
// Loop through each sample type
for j, st := range samples.SampleType {
sum, count := calculateSumAndCount(samples, j)
valuesAgg = append(valuesAgg, profile_types.Tuple{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
valuesAgg = append(valuesAgg, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
}

return valuesAgg
Expand Down Expand Up @@ -257,9 +259,6 @@ func (pa *jfrPprofParser) getSample(sampleType sampleType, prof *pprof_proto.Pro
if !ok {
return nil
}
//if len(prof.Sample) <= int(i) {
// return nil
//}
return prof.Sample[i]

}
Expand Down
10 changes: 4 additions & 6 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,11 @@ func stringToAnyArray(s []string) []any {
}
return res
}
func entitiesToStrings(entities []profile_types.Tuple) []any {
func entitiesToStrings(entities []profile_types.SampleType) []any {
var result []any
for _, entity := range entities {
result = append(result,
entity.Key,
fmt.Sprintf("%v", entity.Sum),
fmt.Sprintf("%v", entity.Count),
[]any{entity.Key, entity.Sum, entity.Count},
)
}
return result
Expand All @@ -387,8 +385,8 @@ func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {
if err != nil {
return err
}
// Correct type assertion for []profile.Tuple
result := prof.ValueAggregation.([]profile_types.Tuple)
// Correct type assertion for []profile.SampleType
result := prof.ValueAggregation.([]profile_types.SampleType)
s = m.PutEmptySlice("values_agg")

err = s.FromRaw(entitiesToStrings(result))
Expand Down
6 changes: 6 additions & 0 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) {
"payload_type": "0",
"sample_types": []any{"cpu"},
"sample_units": []any{"nanoseconds"},
"values_agg": []any{[]any{"cpu:nanoseconds", 4780000000, 370}},
},
body: pb,
}}),
Expand Down Expand Up @@ -139,6 +140,10 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) {
"payload_type": "0",
"sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"},
"sample_units": []any{"count", "bytes"},
"values_agg": []any{
[]any{"alloc_in_new_tlab_objects:count", 977, 471},
[]any{"alloc_in_new_tlab_bytes:bytes", 512229376, 471},
},
},
body: pbAllocInNewTlab,
},
Expand All @@ -157,6 +162,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) {
"payload_type": "0",
"sample_types": []any{"live"},
"sample_units": []any{"count"},
"values_agg": []any{[]any{"live:count", 976, 471}},
},
body: pbLiveObject,
},
Expand Down
2 changes: 1 addition & 1 deletion receiver/pyroscopereceiver/types/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ProfileIR struct {
ValueAggregation interface{}
}

type Tuple struct {
type SampleType struct {
Key string
Sum int64
Count int32
Expand Down

0 comments on commit 797d27a

Please sign in to comment.