Skip to content

Commit

Permalink
Merge pull request #43 from metrico/samples_types_units
Browse files Browse the repository at this point in the history
support samples_types_units column
  • Loading branch information
akvlad authored Jan 9, 2024
2 parents 4bfc421 + 6ba022a commit c3c5b20
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 8 deletions.
54 changes: 48 additions & 6 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ func NewClickhouseAccessNativeColumnar(opts *clickhouse.Options, logger *zap.Log
return nc, nil
}

func valueToStringArray(v pcommon.Value) ([]string, error) {
raw := v.AsRaw()
var (
rawArray []any
ok bool
)

if rawArray, ok = raw.([]any); !ok {
return nil, fmt.Errorf("failed to convert value to []any")
}
res := make([]string, len(rawArray))
for i, v := range rawArray {
if res[i], ok = v.(string); !ok {
return nil, fmt.Errorf("failed to convert value [%d] to string", i)
}
}
return res, nil
}

// Inserts a profile batch into the clickhouse server using columnar native protocol
func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input")
Expand All @@ -53,6 +72,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
timestamp_ns := make([]uint64, sz)
typ := make([]string, sz)
service_name := make([]string, sz)
sample_types_units := make([][]tuple, sz)
period_type := make([]string, sz)
period_unit := make([]string, sz)
tags := make([][]tuple, sz)
Expand All @@ -78,6 +98,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
tmp, _ = m.Get("service_name")
service_name[i] = tmp.AsString()

sample_types, _ := m.Get("sample_types")
sample_units, _ := m.Get("sample_units")

sample_types_array, err := valueToStringArray(sample_types)
if err != nil {
return err
}

sample_units_array, err := valueToStringArray(sample_units)
if err != nil {
return err
}

sample_types_units_item := make([]tuple, len(sample_types_array))
for i, v := range sample_types_array {
sample_types_units_item[i] = tuple{v, sample_units_array[i]}
}
sample_types_units[i] = sample_types_units_item

tmp, _ = m.Get("period_type")
period_type[i] = tmp.AsString()

Expand Down Expand Up @@ -112,22 +151,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
if err := b.Column(2).Append(service_name); err != nil {
return err
}
if err := b.Column(3).Append(period_type); err != nil {
if err := b.Column(3).Append(sample_types_units); err != nil {
return err
}
if err := b.Column(4).Append(period_type); err != nil {
return err
}
if err := b.Column(4).Append(period_unit); err != nil {
if err := b.Column(5).Append(period_unit); err != nil {
return err
}
if err := b.Column(5).Append(tags); err != nil {
if err := b.Column(6).Append(tags); err != nil {
return err
}
if err := b.Column(6).Append(duration_ns); err != nil {
if err := b.Column(7).Append(duration_ns); err != nil {
return err
}
if err := b.Column(7).Append(payload_type); err != nil {
if err := b.Column(8).Append(payload_type); err != nil {
return err
}
if err := b.Column(8).Append(payload); err != nil {
if err := b.Column(9).Append(payload); err != nil {
return err
}
return b.Send()
Expand Down
26 changes: 24 additions & 2 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
for _, l := range pm.labels {
tm.PutStr(l.Name, l.Value)
}
setAttrsFromProfile(pr, m)
err = setAttrsFromProfile(pr, m)
if err != nil {
return logs, fmt.Errorf("failed to parse sample types: %v", err)
}
r.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes())
sz += pr.Payload.Len()
}
Expand Down Expand Up @@ -329,11 +332,30 @@ func resetHeaders(req *http.Request) {
req.ContentLength = -1
}

func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) {
func stringToAnyArray(s []string) []any {
res := make([]any, len(s))
for i, v := range s {
res[i] = v
}
return res
}

func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {
m.PutStr("type", prof.Type.Type)
s := m.PutEmptySlice("sample_types")
err := s.FromRaw(stringToAnyArray(prof.Type.SampleType))
if err != nil {
return err
}
s = m.PutEmptySlice("sample_units")
err = s.FromRaw(stringToAnyArray(prof.Type.SampleUnit))
if err != nil {
return err
}
m.PutStr("period_type", prof.Type.PeriodType)
m.PutStr("period_unit", prof.Type.PeriodUnit)
m.PutStr("payload_type", fmt.Sprint(prof.PayloadType))
return nil
}

// Starts a http server that receives profiles of supported protocols
Expand Down
6 changes: 6 additions & 0 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) {
"period_type": "cpu",
"period_unit": "nanoseconds",
"payload_type": "0",
"sample_types": []any{"cpu"},
"sample_units": []any{"nanoseconds"},
},
body: pb,
}}),
Expand Down Expand Up @@ -184,6 +186,8 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) {
"period_type": "space",
"period_unit": "bytes",
"payload_type": "0",
"sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"},
"sample_units": []any{"count", "bytes"},
},
body: pbAllocInNewTlab,
},
Expand All @@ -200,6 +204,8 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) {
"period_type": "objects",
"period_unit": "count",
"payload_type": "0",
"sample_types": []any{"live"},
"sample_units": []any{"count"},
},
body: pbLiveObject,
},
Expand Down

0 comments on commit c3c5b20

Please sign in to comment.