diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index ab0f149..fbff9ca 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -37,6 +37,25 @@ func NewClickhouseAccessNativeColumnar(opts *clickhouse.Options, logger *zap.Log return nc, nil } +func valueToStringArray(v pcommon.Value) ([]string, error) { + raw := v.AsRaw() + var ( + rawArray []any + ok bool + ) + + if rawArray, ok = raw.([]any); !ok { + return nil, fmt.Errorf("failed to convert value to []any") + } + res := make([]string, len(rawArray)) + for i, v := range rawArray { + if res[i], ok = v.(string); !ok { + return nil, fmt.Errorf("failed to convert value [%d] to string", i) + } + } + return res, nil +} + // Inserts a profile batch into the clickhouse server using columnar native protocol func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input") @@ -53,6 +72,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { timestamp_ns := make([]uint64, sz) typ := make([]string, sz) service_name := make([]string, sz) + sample_types_units := make([][]tuple, sz) period_type := make([]string, sz) period_unit := make([]string, sz) tags := make([][]tuple, sz) @@ -78,6 +98,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { tmp, _ = m.Get("service_name") service_name[i] = tmp.AsString() + sample_types, _ := m.Get("sample_types") + sample_units, _ := m.Get("sample_units") + + sample_types_array, err := valueToStringArray(sample_types) + if err != nil { + return err + } + + sample_units_array, err := valueToStringArray(sample_units) + if err != nil { + return err + } + + sample_types_units_item := make([]tuple, len(sample_types_array)) + for i, v := range sample_types_array { + sample_types_units_item[i] = tuple{v, sample_units_array[i]} + } + sample_types_units[i] = sample_types_units_item + tmp, _ = m.Get("period_type") period_type[i] = tmp.AsString() @@ -112,22 +151,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { if err := b.Column(2).Append(service_name); err != nil { return err } - if err := b.Column(3).Append(period_type); err != nil { + if err := b.Column(3).Append(sample_types_units); err != nil { + return err + } + if err := b.Column(4).Append(period_type); err != nil { return err } - if err := b.Column(4).Append(period_unit); err != nil { + if err := b.Column(5).Append(period_unit); err != nil { return err } - if err := b.Column(5).Append(tags); err != nil { + if err := b.Column(6).Append(tags); err != nil { return err } - if err := b.Column(6).Append(duration_ns); err != nil { + if err := b.Column(7).Append(duration_ns); err != nil { return err } - if err := b.Column(7).Append(payload_type); err != nil { + if err := b.Column(8).Append(payload_type); err != nil { return err } - if err := b.Column(8).Append(payload); err != nil { + if err := b.Column(9).Append(payload); err != nil { return err } return b.Send() diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 95ffdb4..91ea28a 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -278,7 +278,10 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque for _, l := range pm.labels { tm.PutStr(l.Name, l.Value) } - setAttrsFromProfile(pr, m) + err = setAttrsFromProfile(pr, m) + if err != nil { + return logs, fmt.Errorf("failed to parse sample types: %v", err) + } r.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes()) sz += pr.Payload.Len() } @@ -329,11 +332,30 @@ func resetHeaders(req *http.Request) { req.ContentLength = -1 } -func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) { +func stringToAnyArray(s []string) []any { + res := make([]any, len(s)) + for i, v := range s { + res[i] = v + } + return res +} + +func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { m.PutStr("type", prof.Type.Type) + s := m.PutEmptySlice("sample_types") + err := s.FromRaw(stringToAnyArray(prof.Type.SampleType)) + if err != nil { + return err + } + s = m.PutEmptySlice("sample_units") + err = s.FromRaw(stringToAnyArray(prof.Type.SampleUnit)) + if err != nil { + return err + } m.PutStr("period_type", prof.Type.PeriodType) m.PutStr("period_unit", prof.Type.PeriodUnit) m.PutStr("payload_type", fmt.Sprint(prof.PayloadType)) + return nil } // Starts a http server that receives profiles of supported protocols diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index 284fcb5..b08c943 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -148,6 +148,8 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) { "period_type": "cpu", "period_unit": "nanoseconds", "payload_type": "0", + "sample_types": []any{"cpu"}, + "sample_units": []any{"nanoseconds"}, }, body: pb, }}), @@ -183,6 +185,8 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "period_type": "space", "period_unit": "bytes", "payload_type": "0", + "sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"}, + "sample_units": []any{"count", "bytes"}, }, body: pbAllocInNewTlab, }, @@ -199,6 +203,8 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "period_type": "objects", "period_unit": "count", "payload_type": "0", + "sample_types": []any{"live"}, + "sample_units": []any{"count"}, }, body: pbLiveObject, },