From 797d27aafb4505b1314301ebae7187bc1f3b84f3 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 23 Jan 2024 16:44:59 +0200 Subject: [PATCH] fix: binary data send instead of stringification; test fix; golint fix --- .../ch/access_native_columnar.go | 51 ++++++++----------- go.mod | 2 +- .../pyroscopereceiver/jfrparser/parser.go | 33 ++++++------ receiver/pyroscopereceiver/receiver.go | 10 ++-- receiver/pyroscopereceiver/receiver_test.go | 6 +++ receiver/pyroscopereceiver/types/profile.go | 2 +- 6 files changed, 50 insertions(+), 54 deletions(-) diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index 03a65e0..de4a986 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -3,12 +3,13 @@ package ch import ( "context" "fmt" + "strconv" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" - "strconv" ) // schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js @@ -115,19 +116,15 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { return err } - values_agg_raw, _ := m.Get("values_agg") - - test, err := valueToStringArray(values_agg_raw) - if err != nil { - return err + values_agg_raw, ok := m.Get("values_agg") + if ok { + values_agg_tuple, err := valueAggToTuple(&values_agg_raw) + if err != nil { + return err + } + values_agg[i] = append(values_agg[i], values_agg_tuple...) } - values_agg_array, err := valueToTuples(test) - if err != nil { - return err - } - values_agg[i] = values_agg_array - sample_types_units_item := make([]tuple, len(sample_types_array)) for i, v := range sample_types_array { @@ -201,22 +198,18 @@ func (ch *clickhouseAccessNativeColumnar) Shutdown() error { return ch.conn.Close() } -func valueToTuples(input []string) ([]tuple, error) { - var tuples []tuple - // Type assert each item to the expected types - field1 := input[0] - field2 := input[1] - field3 := input[2] - - count, _ := strconv.Atoi(field2) - sum, _ := strconv.Atoi(field3) - tuple := tuple{ - field1, - int64(count), - int32(sum), +func valueAggToTuple(value *pcommon.Value) ([]tuple, error) { + var res []tuple + for _, value_agg_any := range value.AsRaw().([]any) { + value_agg_any_array, ok := value_agg_any.([]any) + if !ok || len(value_agg_any_array) != 3 { + return nil, fmt.Errorf("failed to convert value_agg to tuples") + } + res = append(res, tuple{ + value_agg_any_array[0], + value_agg_any_array[1], + int32(value_agg_any_array[2].(int64)), + }) } - - tuples = append(tuples, tuple) - - return tuples, nil + return res, nil } diff --git a/go.mod b/go.mod index ab2b099..452ff3c 100644 --- a/go.mod +++ b/go.mod @@ -593,4 +593,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect sigs.k8s.io/yaml v1.3.0 // indirect skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 // indirect -) \ No newline at end of file +) diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index 0d6a655..c956c45 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -3,12 +3,12 @@ package jfrparser import ( "bytes" "fmt" + "io" + pprof_proto "github.com/google/pprof/profile" jfr_parser "github.com/grafana/jfr-parser/parser" jfr_types "github.com/grafana/jfr-parser/parser/types" profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types" - "go.uber.org/zap" - "io" ) type sampleType uint8 @@ -32,7 +32,6 @@ type profileWrapper struct { type jfrPprofParser struct { jfrParser *jfr_parser.Parser - logger *zap.Logger proftab [sampleTypeCount]*profileWrapper // samptab [sampleTypeCount]map[uint32]uint32 // loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // @@ -112,12 +111,18 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([ ps := make([]profile_types.ProfileIR, 0) for _, pr := range pa.proftab { - if nil != pr { - // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof - pr.prof.Payload = new(bytes.Buffer) - pr.pprof.WriteUncompressed(pr.prof.Payload) - ps = append(ps, pr.prof) + if nil == pr { + continue } + // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof + pr.prof.Payload = new(bytes.Buffer) + pr.pprof.WriteUncompressed(pr.prof.Payload) + + // Calculate values_agg based on the requirements + valuesAgg := calculateValuesAgg(pr.pprof) + pr.prof.ValueAggregation = valuesAgg + + ps = append(ps, pr.prof) } return ps, nil } @@ -208,19 +213,16 @@ func (pa *jfrPprofParser) addProfile(sampleType sampleType) *profileWrapper { for i, t := range pw.prof.Type.SampleType { pa.appendSampleType(pw.pprof, t, pw.prof.Type.SampleUnit[i]) } - // Calculate values_agg based on the requirements - valuesAgg := calculateValuesAgg(pw.pprof) - pw.prof.ValueAggregation = valuesAgg return pw } -func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.Tuple { - var valuesAgg []profile_types.Tuple +func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.SampleType { + var valuesAgg []profile_types.SampleType // Loop through each sample type for j, st := range samples.SampleType { sum, count := calculateSumAndCount(samples, j) - valuesAgg = append(valuesAgg, profile_types.Tuple{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count}) + valuesAgg = append(valuesAgg, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count}) } return valuesAgg @@ -257,9 +259,6 @@ func (pa *jfrPprofParser) getSample(sampleType sampleType, prof *pprof_proto.Pro if !ok { return nil } - //if len(prof.Sample) <= int(i) { - // return nil - //} return prof.Sample[i] } diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 6b868a0..5065025 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -361,13 +361,11 @@ func stringToAnyArray(s []string) []any { } return res } -func entitiesToStrings(entities []profile_types.Tuple) []any { +func entitiesToStrings(entities []profile_types.SampleType) []any { var result []any for _, entity := range entities { result = append(result, - entity.Key, - fmt.Sprintf("%v", entity.Sum), - fmt.Sprintf("%v", entity.Count), + []any{entity.Key, entity.Sum, entity.Count}, ) } return result @@ -387,8 +385,8 @@ func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { if err != nil { return err } - // Correct type assertion for []profile.Tuple - result := prof.ValueAggregation.([]profile_types.Tuple) + // Correct type assertion for []profile.SampleType + result := prof.ValueAggregation.([]profile_types.SampleType) s = m.PutEmptySlice("values_agg") err = s.FromRaw(entitiesToStrings(result)) diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index 578cecc..aac68fb 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -102,6 +102,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) { "payload_type": "0", "sample_types": []any{"cpu"}, "sample_units": []any{"nanoseconds"}, + "values_agg": []any{[]any{"cpu:nanoseconds", 4780000000, 370}}, }, body: pb, }}), @@ -139,6 +140,10 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "payload_type": "0", "sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"}, "sample_units": []any{"count", "bytes"}, + "values_agg": []any{ + []any{"alloc_in_new_tlab_objects:count", 977, 471}, + []any{"alloc_in_new_tlab_bytes:bytes", 512229376, 471}, + }, }, body: pbAllocInNewTlab, }, @@ -157,6 +162,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "payload_type": "0", "sample_types": []any{"live"}, "sample_units": []any{"count"}, + "values_agg": []any{[]any{"live:count", 976, 471}}, }, body: pbLiveObject, }, diff --git a/receiver/pyroscopereceiver/types/profile.go b/receiver/pyroscopereceiver/types/profile.go index 052f910..05f88e4 100644 --- a/receiver/pyroscopereceiver/types/profile.go +++ b/receiver/pyroscopereceiver/types/profile.go @@ -33,7 +33,7 @@ type ProfileIR struct { ValueAggregation interface{} } -type Tuple struct { +type SampleType struct { Key string Sum int64 Count int32