Skip to content

Commit

Permalink
Merge pull request #51 from metrico/pre-release
Browse files Browse the repository at this point in the history
Pre release
  • Loading branch information
akvlad authored Jan 23, 2024
2 parents c521620 + 082ad95 commit befe075
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 22 deletions.
34 changes: 32 additions & 2 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
timestamp_ns := make([]uint64, sz)
typ := make([]string, sz)
service_name := make([]string, sz)
values_agg := make([][]tuple, sz)
sample_types_units := make([][]tuple, sz)
period_type := make([]string, sz)
period_unit := make([]string, sz)
Expand All @@ -89,7 +90,6 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
for i := 0; i < sz; i++ {
r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0)
m = r.Attributes()

timestamp_ns[i] = uint64(r.Timestamp())

tmp, _ = m.Get("type")
Expand All @@ -99,6 +99,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
service_name[i] = tmp.AsString()

sample_types, _ := m.Get("sample_types")

sample_units, _ := m.Get("sample_units")

sample_types_array, err := valueToStringArray(sample_types)
Expand All @@ -111,12 +112,21 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
return err
}

values_agg_raw, ok := m.Get("values_agg")
if ok {
values_agg_tuple, err := valueAggToTuple(&values_agg_raw)
if err != nil {
return err
}
values_agg[i] = append(values_agg[i], values_agg_tuple...)
}

sample_types_units_item := make([]tuple, len(sample_types_array))
for i, v := range sample_types_array {

sample_types_units_item[i] = tuple{v, sample_units_array[i]}
}
sample_types_units[i] = sample_types_units_item

tmp, _ = m.Get("period_type")
period_type[i] = tmp.AsString()

Expand Down Expand Up @@ -169,13 +179,33 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
if err := b.Column(8).Append(payload_type); err != nil {
return err
}

if err := b.Column(9).Append(payload); err != nil {
return err
}
if err := b.Column(10).Append(values_agg); err != nil {
return err
}
return b.Send()
}

// Closes the clickhouse connection pool
func (ch *clickhouseAccessNativeColumnar) Shutdown() error {
return ch.conn.Close()
}

func valueAggToTuple(value *pcommon.Value) ([]tuple, error) {
var res []tuple
for _, value_agg_any := range value.AsRaw().([]any) {
value_agg_any_array, ok := value_agg_any.([]any)
if !ok || len(value_agg_any_array) != 3 {
return nil, fmt.Errorf("failed to convert value_agg to tuples")
}
res = append(res, tuple{
value_agg_any_array[0],
value_agg_any_array[1],
int32(value_agg_any_array[2].(int64)),
})
}
return res, nil
}
4 changes: 2 additions & 2 deletions exporter/clickhouseprofileexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti
func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error {
start := time.Now().UnixMilli()
if err := exp.ch.InsertBatch(logs); err != nil {
otelcolExporterClickhouseProfileFlushTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}
otelcolExporterClickhouseProfileFlushTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len()))
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/clickhouseprofileexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
const prefix = "exporter_clickhouse_profile_"

var (
otelcolExporterClickhouseProfileFlushTimeMillis metric.Int64Histogram
otelcolExporterClickhouseProfileBatchInsertTimeMillis metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
var err error
if otelcolExporterClickhouseProfileFlushTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "flush_time_millis"),
metric.WithDescription("Clickhouse profile exporter flush time in millis"),
if otelcolExporterClickhouseProfileBatchInsertTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "batch_insert_time_millis"),
metric.WithDescription("Clickhouse profile exporter batch insert time in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2247,4 +2247,4 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 h1:TY79I5Y7xRB8q5LQ+MJn7NYsYi0VL5nj1QDrUHwK7cQ=
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw=
skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw=
52 changes: 42 additions & 10 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ type profileWrapper struct {

type jfrPprofParser struct {
jfrParser *jfr_parser.Parser

proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
}

var typetab = []profile_types.ProfileType{
Expand Down Expand Up @@ -112,12 +111,18 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([

ps := make([]profile_types.ProfileIR, 0)
for _, pr := range pa.proftab {
if nil != pr {
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)
ps = append(ps, pr.prof)
if nil == pr {
continue
}
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)

// Calculate values_agg based on the requirements
valuesAgg := calculateValuesAgg(pr.pprof)
pr.prof.ValueAggregation = valuesAgg

ps = append(ps, pr.prof)
}
return ps, nil
}
Expand Down Expand Up @@ -204,12 +209,38 @@ func (pa *jfrPprofParser) addProfile(sampleType sampleType) *profileWrapper {
pprof: &pprof_proto.Profile{},
}
pa.proftab[sampleType] = pw

// add sample types and units to keep the pprof valid for libraries
for i, t := range pw.prof.Type.SampleType {
pa.appendSampleType(pw.pprof, t, pw.prof.Type.SampleUnit[i])
}

return pw

}
func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.SampleType {
var valuesAgg []profile_types.SampleType
// Loop through each sample type
for j, st := range samples.SampleType {
sum, count := calculateSumAndCount(samples, j)
valuesAgg = append(valuesAgg, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
}

return valuesAgg
}

func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) {
var sum int64
count := int32(len(samples.Sample))

for _, sample := range samples.Sample {
// Check if the sample has a value for the specified sample type
if sampleTypeIndex < len(sample.Value) {
// Accumulate the value for the specified sample type
sum += sample.Value[sampleTypeIndex]
}
}

return sum, count
}

func (pa *jfrPprofParser) appendSampleType(prof *pprof_proto.Profile, typ, unit string) {
Expand All @@ -229,6 +260,7 @@ func (pa *jfrPprofParser) getSample(sampleType sampleType, prof *pprof_proto.Pro
return nil
}
return prof.Sample[i]

}

func (pa *jfrPprofParser) appendSample(sampleType sampleType, prof *pprof_proto.Profile, locations []*pprof_proto.Location, values []int64, externStacktraceRef uint32) {
Expand Down
21 changes: 21 additions & 0 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,22 +361,43 @@ func stringToAnyArray(s []string) []any {
}
return res
}
func entitiesToStrings(entities []profile_types.SampleType) []any {
var result []any
for _, entity := range entities {
result = append(result,
[]any{entity.Key, entity.Sum, entity.Count},
)
}
return result
}

func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {
m.PutStr("type", prof.Type.Type)
s := m.PutEmptySlice("sample_types")

err := s.FromRaw(stringToAnyArray(prof.Type.SampleType))
if err != nil {
return err
}

s = m.PutEmptySlice("sample_units")
err = s.FromRaw(stringToAnyArray(prof.Type.SampleUnit))
if err != nil {
return err
}
// Correct type assertion for []profile.SampleType
result := prof.ValueAggregation.([]profile_types.SampleType)
s = m.PutEmptySlice("values_agg")

err = s.FromRaw(entitiesToStrings(result))

if err != nil {
return err
}
m.PutStr("period_type", prof.Type.PeriodType)
m.PutStr("period_unit", prof.Type.PeriodUnit)
m.PutStr("payload_type", fmt.Sprint(prof.PayloadType))

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) {
"payload_type": "0",
"sample_types": []any{"cpu"},
"sample_units": []any{"nanoseconds"},
"values_agg": []any{[]any{"cpu:nanoseconds", 4780000000, 370}},
},
body: pb,
}}),
Expand Down Expand Up @@ -139,6 +140,10 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) {
"payload_type": "0",
"sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"},
"sample_units": []any{"count", "bytes"},
"values_agg": []any{
[]any{"alloc_in_new_tlab_objects:count", 977, 471},
[]any{"alloc_in_new_tlab_bytes:bytes", 512229376, 471},
},
},
body: pbAllocInNewTlab,
},
Expand All @@ -157,6 +162,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) {
"payload_type": "0",
"sample_types": []any{"live"},
"sample_units": []any{"count"},
"values_agg": []any{[]any{"live:count", 976, 471}},
},
body: pbLiveObject,
},
Expand Down
13 changes: 10 additions & 3 deletions receiver/pyroscopereceiver/types/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ type ProfileType struct {

// Parser IR for profile processing
type ProfileIR struct {
Type ProfileType
Payload *bytes.Buffer
PayloadType PayloadType
Type ProfileType
Payload *bytes.Buffer
PayloadType PayloadType
ValueAggregation interface{}
}

type SampleType struct {
Key string
Sum int64
Count int32
}

0 comments on commit befe075

Please sign in to comment.