diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index fbff9ca..03a65e0 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -3,13 +3,12 @@ package ch import ( "context" "fmt" - "strconv" - "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" + "strconv" ) // schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js @@ -18,7 +17,11 @@ type clickhouseAccessNativeColumnar struct { logger *zap.Logger } - +type AggsTuple struct { + Key string + Sum int64 + Count int32 +} type tuple []any // Connects to clickhouse and checks the connection's health, returning a new native client @@ -72,6 +75,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { timestamp_ns := make([]uint64, sz) typ := make([]string, sz) service_name := make([]string, sz) + values_agg := make([][]tuple, sz) sample_types_units := make([][]tuple, sz) period_type := make([]string, sz) period_unit := make([]string, sz) @@ -89,7 +93,6 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { for i := 0; i < sz; i++ { r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0) m = r.Attributes() - timestamp_ns[i] = uint64(r.Timestamp()) tmp, _ = m.Get("type") @@ -99,6 +102,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { service_name[i] = tmp.AsString() sample_types, _ := m.Get("sample_types") + sample_units, _ := m.Get("sample_units") sample_types_array, err := valueToStringArray(sample_types) @@ -111,12 +115,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { return err } + values_agg_raw, _ := m.Get("values_agg") + + test, err := valueToStringArray(values_agg_raw) + if err != nil { + return err + } + + values_agg_array, err := valueToTuples(test) + if err != nil { + return err + } + values_agg[i] = values_agg_array + sample_types_units_item := make([]tuple, len(sample_types_array)) for i, v := range sample_types_array { + sample_types_units_item[i] = tuple{v, sample_units_array[i]} } sample_types_units[i] = sample_types_units_item - tmp, _ = m.Get("period_type") period_type[i] = tmp.AsString() @@ -169,9 +186,13 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { if err := b.Column(8).Append(payload_type); err != nil { return err } + if err := b.Column(9).Append(payload); err != nil { return err } + if err := b.Column(10).Append(values_agg); err != nil { + return err + } return b.Send() } @@ -179,3 +200,23 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { func (ch *clickhouseAccessNativeColumnar) Shutdown() error { return ch.conn.Close() } + +func valueToTuples(input []string) ([]tuple, error) { + var tuples []tuple + // Type assert each item to the expected types + field1 := input[0] + field2 := input[1] + field3 := input[2] + + count, _ := strconv.Atoi(field2) + sum, _ := strconv.Atoi(field3) + tuple := tuple{ + field1, + int64(count), + int32(sum), + } + + tuples = append(tuples, tuple) + + return tuples, nil +} diff --git a/go.mod b/go.mod index 452ff3c..ab2b099 100644 --- a/go.mod +++ b/go.mod @@ -593,4 +593,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect sigs.k8s.io/yaml v1.3.0 // indirect skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 // indirect -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index f436bc9..43bb863 100644 --- a/go.sum +++ b/go.sum @@ -2247,4 +2247,4 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 h1:TY79I5Y7xRB8q5LQ+MJn7NYsYi0VL5nj1QDrUHwK7cQ= -skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw= +skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw= \ No newline at end of file diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index 396b8f8..0d6a655 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -3,12 +3,12 @@ package jfrparser import ( "bytes" "fmt" - "io" - pprof_proto "github.com/google/pprof/profile" jfr_parser "github.com/grafana/jfr-parser/parser" jfr_types "github.com/grafana/jfr-parser/parser/types" profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types" + "go.uber.org/zap" + "io" ) type sampleType uint8 @@ -32,10 +32,10 @@ type profileWrapper struct { type jfrPprofParser struct { jfrParser *jfr_parser.Parser - - proftab [sampleTypeCount]*profileWrapper // - samptab [sampleTypeCount]map[uint32]uint32 // - loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // + logger *zap.Logger + proftab [sampleTypeCount]*profileWrapper // + samptab [sampleTypeCount]map[uint32]uint32 // + loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // } var typetab = []profile_types.ProfileType{ @@ -204,12 +204,41 @@ func (pa *jfrPprofParser) addProfile(sampleType sampleType) *profileWrapper { pprof: &pprof_proto.Profile{}, } pa.proftab[sampleType] = pw - // add sample types and units to keep the pprof valid for libraries for i, t := range pw.prof.Type.SampleType { pa.appendSampleType(pw.pprof, t, pw.prof.Type.SampleUnit[i]) } + // Calculate values_agg based on the requirements + valuesAgg := calculateValuesAgg(pw.pprof) + pw.prof.ValueAggregation = valuesAgg + return pw + +} +func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.Tuple { + var valuesAgg []profile_types.Tuple + // Loop through each sample type + for j, st := range samples.SampleType { + sum, count := calculateSumAndCount(samples, j) + valuesAgg = append(valuesAgg, profile_types.Tuple{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count}) + } + + return valuesAgg +} + +func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) { + var sum int64 + count := int32(len(samples.Sample)) + + for _, sample := range samples.Sample { + // Check if the sample has a value for the specified sample type + if sampleTypeIndex < len(sample.Value) { + // Accumulate the value for the specified sample type + sum += sample.Value[sampleTypeIndex] + } + } + + return sum, count } func (pa *jfrPprofParser) appendSampleType(prof *pprof_proto.Profile, typ, unit string) { @@ -228,7 +257,11 @@ func (pa *jfrPprofParser) getSample(sampleType sampleType, prof *pprof_proto.Pro if !ok { return nil } + //if len(prof.Sample) <= int(i) { + // return nil + //} return prof.Sample[i] + } func (pa *jfrPprofParser) appendSample(sampleType sampleType, prof *pprof_proto.Profile, locations []*pprof_proto.Location, values []int64, externStacktraceRef uint32) { diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index cd4f348..6b868a0 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -361,22 +361,45 @@ func stringToAnyArray(s []string) []any { } return res } +func entitiesToStrings(entities []profile_types.Tuple) []any { + var result []any + for _, entity := range entities { + result = append(result, + entity.Key, + fmt.Sprintf("%v", entity.Sum), + fmt.Sprintf("%v", entity.Count), + ) + } + return result +} func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { m.PutStr("type", prof.Type.Type) s := m.PutEmptySlice("sample_types") + err := s.FromRaw(stringToAnyArray(prof.Type.SampleType)) if err != nil { return err } + s = m.PutEmptySlice("sample_units") err = s.FromRaw(stringToAnyArray(prof.Type.SampleUnit)) + if err != nil { + return err + } + // Correct type assertion for []profile.Tuple + result := prof.ValueAggregation.([]profile_types.Tuple) + s = m.PutEmptySlice("values_agg") + + err = s.FromRaw(entitiesToStrings(result)) + if err != nil { return err } m.PutStr("period_type", prof.Type.PeriodType) m.PutStr("period_unit", prof.Type.PeriodUnit) m.PutStr("payload_type", fmt.Sprint(prof.PayloadType)) + return nil } diff --git a/receiver/pyroscopereceiver/types/profile.go b/receiver/pyroscopereceiver/types/profile.go index 0e38ce3..052f910 100644 --- a/receiver/pyroscopereceiver/types/profile.go +++ b/receiver/pyroscopereceiver/types/profile.go @@ -27,7 +27,14 @@ type ProfileType struct { // Parser IR for profile processing type ProfileIR struct { - Type ProfileType - Payload *bytes.Buffer - PayloadType PayloadType + Type ProfileType + Payload *bytes.Buffer + PayloadType PayloadType + ValueAggregation interface{} +} + +type Tuple struct { + Key string + Sum int64 + Count int32 }