diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index dcdb7c5..5243c09 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -71,146 +71,148 @@ func valueToStringArray(v pcommon.Value) ([]string, error) { } // Inserts a profile batch into the clickhouse server using columnar native protocol -func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { +func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) { b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input") if err != nil { - return fmt.Errorf("failed to prepare batch: %w", err) - } + return 0, fmt.Errorf("failed to prepare batch: %w", err) + } + + // this implementation is tightly coupled to how pyroscope-java and pyroscopereceiver work + timestamp_ns := make([]uint64, 0) + typ := make([]string, 0) + service_name := make([]string, 0) + values_agg := make([][]tuple, 0) + sample_types_units := make([][]tuple, 0) + period_type := make([]string, 0) + period_unit := make([]string, 0) + tags := make([][]tuple, 0) + duration_ns := make([]uint64, 0) + payload_type := make([]string, 0) + payload := make([][]byte, 0) - // this implementation is tightly coupled to how pyroscope-java and pyroscopereciver work, - // specifically receiving a single profile at a time from the agent, - // and thus each batched resource logs slice contains a single log record rl := ls.ResourceLogs() - sz := rl.Len() - - timestamp_ns := make([]uint64, sz) - typ := make([]string, sz) - service_name := make([]string, sz) - values_agg := make([][]tuple, sz) - sample_types_units := make([][]tuple, sz) - period_type := make([]string, sz) - period_unit := make([]string, sz) - tags := make([][]tuple, sz) - duration_ns := make([]uint64, sz) - payload_type := make([]string, sz) - payload := make([][]byte, sz) - var ( - r plog.LogRecord - m pcommon.Map - tmp pcommon.Value - tm map[string]any + lr plog.LogRecordSlice + r plog.LogRecord + m pcommon.Map + tmp pcommon.Value + tm map[string]any + offset int + s int + idx int ) - for i := 0; i < sz; i++ { - r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0) - m = r.Attributes() - timestamp_ns[i] = uint64(r.Timestamp()) - - tmp, _ = m.Get(columnType) - typ[i] = tmp.AsString() - - tmp, _ = m.Get(columnServiceName) - service_name[i] = tmp.AsString() - - sample_types, _ := m.Get("sample_types") - - sample_units, _ := m.Get("sample_units") - - sample_types_array, err := valueToStringArray(sample_types) - if err != nil { - return err - } - - sample_units_array, err := valueToStringArray(sample_units) - if err != nil { - return err - } - - values_agg_raw, ok := m.Get(columnValuesAgg) - if ok { - values_agg_tuple, err := valueAggToTuple(&values_agg_raw) + for i := 0; i < rl.Len(); i++ { + lr = rl.At(i).ScopeLogs().At(0).LogRecords() + for s = 0; s < lr.Len(); s++ { + r = lr.At(s) + m = r.Attributes() + timestamp_ns = append(timestamp_ns, uint64(r.Timestamp())) + + tmp, _ = m.Get(columnType) + typ = append(typ, tmp.AsString()) + + tmp, _ = m.Get(columnServiceName) + service_name = append(service_name, tmp.AsString()) + + sample_types, _ := m.Get("sample_types") + sample_units, _ := m.Get("sample_units") + sample_types_array, err := valueToStringArray(sample_types) if err != nil { - return err + return 0, err } - values_agg[i] = append(values_agg[i], values_agg_tuple...) - } - - sample_types_units_item := make([]tuple, len(sample_types_array)) - for i, v := range sample_types_array { - - sample_types_units_item[i] = tuple{v, sample_units_array[i]} - } - sample_types_units[i] = sample_types_units_item - tmp, _ = m.Get(columnPeriodType) - period_type[i] = tmp.AsString() - - tmp, _ = m.Get(columnPeriodUnit) - period_unit[i] = tmp.AsString() - - tmp, _ = m.Get(columnTags) - tm = tmp.Map().AsRaw() - tag, j := make([]tuple, len(tm)), 0 - for k, v := range tm { - tag[j] = tuple{k, v.(string)} - j++ - } - tags[i] = tag - - tmp, _ = m.Get(columnDurationNs) - duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64) + sample_units_array, err := valueToStringArray(sample_units) + if err != nil { + return 0, err + } + values_agg_raw, ok := m.Get(columnValuesAgg) + if ok { + values_agg_tuple, err := valueAggToTuple(&values_agg_raw) + if err != nil { + return 0, err + } + values_agg = append(values_agg, values_agg_tuple) + } + sample_types_units_item := make([]tuple, len(sample_types_array)) + for i, v := range sample_types_array { + sample_types_units_item[i] = tuple{v, sample_units_array[i]} + } + sample_types_units = append(sample_types_units, sample_types_units_item) - tmp, _ = m.Get(columnPayloadType) - payload_type[i] = tmp.AsString() + tmp, _ = m.Get(columnPeriodType) + period_type = append(period_type, tmp.AsString()) - payload[i] = r.Body().Bytes().AsRaw() + tmp, _ = m.Get(columnPeriodUnit) + period_unit = append(period_unit, tmp.AsString()) - ch.logger.Debug( - fmt.Sprintf("batch insert prepared row %d", i), - zap.Uint64(columnTimestampNs, timestamp_ns[i]), - zap.String(columnType, typ[i]), - zap.String(columnServiceName, service_name[i]), - zap.String(columnPeriodType, period_type[i]), - zap.String(columnPeriodUnit, period_unit[i]), - zap.String(columnPayloadType, payload_type[i]), - ) + tmp, _ = m.Get(columnTags) + tm = tmp.Map().AsRaw() + tag, j := make([]tuple, len(tm)), 0 + for k, v := range tm { + tag[j] = tuple{k, v.(string)} + j++ + } + tags = append(tags, tag) + + tmp, _ = m.Get(columnDurationNs) + dur, _ := strconv.ParseUint(tmp.Str(), 10, 64) + duration_ns = append(duration_ns, dur) + + tmp, _ = m.Get(columnPayloadType) + payload_type = append(payload_type, tmp.AsString()) + + payload = append(payload, r.Body().Bytes().AsRaw()) + + idx = offset + s + ch.logger.Debug( + fmt.Sprintf("batch insert prepared row %d", idx), + zap.Uint64(columnTimestampNs, timestamp_ns[idx]), + zap.String(columnType, typ[idx]), + zap.String(columnServiceName, service_name[idx]), + zap.String(columnPeriodType, period_type[idx]), + zap.String(columnPeriodUnit, period_unit[idx]), + zap.Any(columnSampleTypesUnits, sample_types_units[idx]), + zap.String(columnPayloadType, payload_type[idx]), + ) + } + offset += s } // column order here should match table column order if err := b.Column(0).Append(timestamp_ns); err != nil { - return err + return 0, err } if err := b.Column(1).Append(typ); err != nil { - return err + return 0, err } if err := b.Column(2).Append(service_name); err != nil { - return err + return 0, err } if err := b.Column(3).Append(sample_types_units); err != nil { - return err + return 0, err } if err := b.Column(4).Append(period_type); err != nil { - return err + return 0, err } if err := b.Column(5).Append(period_unit); err != nil { - return err + return 0, err } if err := b.Column(6).Append(tags); err != nil { - return err + return 0, err } if err := b.Column(7).Append(duration_ns); err != nil { - return err + return 0, err } if err := b.Column(8).Append(payload_type); err != nil { - return err + return 0, err } if err := b.Column(9).Append(payload); err != nil { - return err + return 0, err } if err := b.Column(10).Append(values_agg); err != nil { - return err + return 0, err } - return b.Send() + return offset, b.Send() } // Closes the clickhouse connection pool diff --git a/exporter/clickhouseprofileexporter/exporter.go b/exporter/clickhouseprofileexporter/exporter.go index deae555..2a94430 100644 --- a/exporter/clickhouseprofileexporter/exporter.go +++ b/exporter/clickhouseprofileexporter/exporter.go @@ -30,7 +30,7 @@ type clickhouseProfileExporter struct { type clickhouseAccess interface { // Inserts a profile batch into the clickhouse server - InsertBatch(profiles plog.Logs) error + InsertBatch(profiles plog.Logs) (int, error) // Shuts down the clickhouse connection Shutdown() error @@ -63,13 +63,14 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti // Sends the profiles to clickhouse server using the configured connection func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error { start := time.Now().UnixMilli() - if err := exp.ch.InsertBatch(logs); err != nil { + sz, err := exp.ch.InsertBatch(logs) + if err != nil { otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) - exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len())) + exp.logger.Info("inserted batch", zap.Int("size", sz)) return nil } diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index c0d2eb4..3bf3c90 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -81,7 +81,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([ case pa.jfrParser.TypeMap.T_EXECUTION_SAMPLE: values[0] = 1 * int64(period) ts := pa.jfrParser.GetThreadState(pa.jfrParser.ExecutionSample.State) - if ts != nil && ts.Name == "STATE_RUNNABLE" { + if ts != nil && ts.Name != "STATE_SLEEPING" { pa.addStacktrace(sampleTypeCpu, pa.jfrParser.ExecutionSample.StackTrace, values[:1]) } // TODO: this code is from github/grafana/pyroscope, need to validate that the qryn.js query simulator handles this branch as expected for wall diff --git a/receiver/pyroscopereceiver/pprofparser/parser.go b/receiver/pyroscopereceiver/pprofparser/parser.go new file mode 100644 index 0000000..fcec082 --- /dev/null +++ b/receiver/pyroscopereceiver/pprofparser/parser.go @@ -0,0 +1,103 @@ +package pprofparser + +import ( + "bytes" + "fmt" + + pprof_proto "github.com/google/pprof/profile" + profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types" +) + +type sampleType uint8 + +const ( + sampleTypeCpu sampleType = iota + sampleTypeCount +) + +type profileWrapper struct { + pprof *pprof_proto.Profile + prof profile_types.ProfileIR +} + +type pProfParser struct { + proftab [sampleTypeCount]*profileWrapper // + samptab [sampleTypeCount]map[uint32]uint32 // + loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // +} + +// Creates a pprof parser that parse the accepted jfr buffer +func NewPprofParser() *pProfParser { + return &pProfParser{} +} + +func (pa *pProfParser) Parse(data *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) { + // Parse pprof data + pProfData, err := pprof_proto.Parse(data) + if err != nil { + return nil, err + } + + // Process pprof data and create SampleType slice + var sampleTypes []string + var sampleUnits []string + var valueAggregates []profile_types.SampleType + + for i, st := range pProfData.SampleType { + sampleTypes = append(sampleTypes, pProfData.SampleType[i].Type) + sampleUnits = append(sampleUnits, pProfData.SampleType[i].Unit) + sum, count := calculateSumAndCount(pProfData, i) + valueAggregates = append(valueAggregates, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count}) + } + + var profiles []profile_types.ProfileIR + var profileType string + + switch pProfData.PeriodType.Type { + case "cpu": + profileType = "process_cpu" + case "wall": + profileType = "wall" + case "mutex", "contentions": + profileType = "mutex" + case "goroutine": + profileType = "goroutines" + case "objects", "space", "alloc", "inuse": + profileType = "memory" + case "block": + profileType = "block" + } + + profileTypeInfo := profile_types.ProfileType{ + PeriodType: pProfData.PeriodType.Type, + PeriodUnit: pProfData.PeriodType.Unit, + SampleType: sampleTypes, + SampleUnit: sampleUnits, + Type: profileType, + } + + // Create a new ProfileIR instance + profile := profile_types.ProfileIR{ + ValueAggregation: valueAggregates, + Type: profileTypeInfo, + } + profile.Payload = new(bytes.Buffer) + pProfData.WriteUncompressed(profile.Payload) + // Append the profile to the result + profiles = append(profiles, profile) + return profiles, nil +} + +func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) { + var sum int64 + count := int32(len(samples.Sample)) + for _, sample := range samples.Sample { + // Check if the sample has a value for the specified sample type + if sampleTypeIndex < len(sample.Value) { + // Accumulate the value for the specified sample type + sum += sample.Value[sampleTypeIndex] + } + } + + return sum, count +} diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 37931c1..34ad2b3 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -13,8 +13,10 @@ import ( "strings" "sync" - "github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser" + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/pprofparser" + + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress" profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types" "github.com/prometheus/prometheus/model/labels" "go.opentelemetry.io/collector/component" @@ -31,7 +33,8 @@ const ( ingestPath = "/ingest" formatJfr = "jfr" - formatPprof = "pprof" + formatPprof = "profile" + filePprof = "profile.pprof" errorCodeError = "1" errorCodeSuccess = "" @@ -240,16 +243,15 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque logs := plog.NewLogs() recv.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery)) - qs := req.URL.Query() - if tmp, ok = qs["format"]; ok && tmp[0] == "jfr" { + if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") { pa = jfrparser.NewJfrPprofParser() } else { - return logs, fmt.Errorf("unsupported format, supported: [jfr]") - } + pa = pprofparser.NewPprofParser() + } // support only multipart/form-data - f, err := recv.openMultipartJfr(req) + f, err := recv.openMultipart(req) if err != nil { return logs, err } @@ -286,8 +288,13 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque sz := 0 rs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() for i, pr := range ps { + var timestampNs uint64 r := rs.AppendEmpty() - timestampNs := ns(pm.start) + if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") { + timestampNs = ns(pm.start) + } else { + timestampNs = pm.start + } r.SetTimestamp(pcommon.Timestamp(timestampNs)) m := r.Attributes() m.PutStr("duration_ns", fmt.Sprint(ns(pm.end-pm.start))) @@ -328,7 +335,7 @@ func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding stri return &s } -func (recv *pyroscopeReceiver) openMultipartJfr(req *http.Request) (multipart.File, error) { +func (recv *pyroscopeReceiver) openMultipart(req *http.Request) (multipart.File, error) { if err := req.ParseMultipartForm(recv.cfg.Protocols.Http.MaxRequestBodySize); err != nil { return nil, fmt.Errorf("failed to parse multipart request: %w", err) } @@ -336,18 +343,24 @@ func (recv *pyroscopeReceiver) openMultipartJfr(req *http.Request) (multipart.Fi defer func() { _ = mf.RemoveAll() }() - - part, ok := mf.File[formatJfr] - if !ok { - return nil, fmt.Errorf("required jfr part is missing") + formats := []string{formatJfr, formatPprof} + var part []*multipart.FileHeader // Replace YourPartType with the actual type of your 'part' variable + for _, f := range formats { + if p, ok := mf.File[f]; ok { + part = p + break + } + } + if part == nil { + return nil, fmt.Errorf("required jfr/pprof part is missing") } fh := part[0] - if fh.Filename != formatJfr { - return nil, fmt.Errorf("jfr filename is not '%s'", formatJfr) + if fh.Filename != formatJfr && fh.Filename != filePprof { + return nil, fmt.Errorf("filename is not '%s or %s'", formatJfr, formatPprof) } f, err := fh.Open() if err != nil { - return nil, fmt.Errorf("failed to open jfr file") + return nil, fmt.Errorf("failed to open file") } return f, nil } @@ -379,6 +392,7 @@ func entitiesToStrings(entities []profile_types.SampleType) []any { } func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { + m.PutStr("type", prof.Type.Type) s := m.PutEmptySlice("sample_types") diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index aac68fb..77f58e5 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -21,10 +21,10 @@ import ( "go.uber.org/zap" ) -type jfrtest struct { +type datatest struct { name string urlParams map[string]string - jfr string + filename string expected plog.Logs } @@ -40,10 +40,10 @@ func loadTestData(t *testing.T, filename string) []byte { return b } -func run(t *testing.T, tests []jfrtest, collectorAddr string, sink *consumertest.LogsSink) { +func run(t *testing.T, tests []datatest, collectorAddr string, sink *consumertest.LogsSink) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed") + assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.filename), "send shouldn't have been failed") actual := sink.AllLogs() assert.NoError(t, plogtest.CompareLogs(tt.expected, actual[0])) sink.Reset() @@ -75,9 +75,9 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { } func TestPyroscopeIngestJfrCpu(t *testing.T) { - tests := make([]jfrtest, 1) + tests := make([]datatest, 1) pb := loadTestData(t, "cortex-dev-01__kafka-0__cpu__0.pb") - tests[0] = jfrtest{ + tests[0] = datatest{ name: "send labeled multipart form data gzipped cpu jfr to http ingest endpoint", urlParams: map[string]string{ "name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}", @@ -86,7 +86,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) { "format": "jfr", "sampleRate": "100", }, - jfr: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"), + filename: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"), expected: gen([]profileLog{{ timestamp: 1700332322000000000, attrs: map[string]any{ @@ -113,10 +113,10 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) { } func TestPyroscopeIngestJfrMemory(t *testing.T) { - tests := make([]jfrtest, 1) + tests := make([]datatest, 1) pbAllocInNewTlab := loadTestData(t, "memory_example_alloc_in_new_tlab.pb") pbLiveObject := loadTestData(t, "memory_example_live_object.pb") - tests[0] = jfrtest{ + tests[0] = datatest{ name: "send labeled multipart form data gzipped memory jfr to http ingest endpoint", urlParams: map[string]string{ "name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}", @@ -124,7 +124,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "until": "1700332329", "format": "jfr", }, - jfr: filepath.Join("testdata", "memory_alloc_live_example.jfr"), + filename: filepath.Join("testdata", "memory_alloc_live_example.jfr"), expected: gen([]profileLog{{ timestamp: 1700332322000000000, attrs: map[string]any{ diff --git a/receiver/pyroscopereceiver/testclient/ingest.go b/receiver/pyroscopereceiver/testclient/ingest.go index 096e0e5..6de08d4 100644 --- a/receiver/pyroscopereceiver/testclient/ingest.go +++ b/receiver/pyroscopereceiver/testclient/ingest.go @@ -7,6 +7,7 @@ import ( "mime/multipart" "net/http" "os" + "strings" ) func Ingest(addr string, urlParams map[string]string, jfr string) error { @@ -17,8 +18,16 @@ func Ingest(addr string, urlParams map[string]string, jfr string) error { body := new(bytes.Buffer) + var fieldName, filename string + if strings.Contains(jfr, "profile") { + fieldName = "profile" + filename = "profile.pprof" + } else { + fieldName = "jfr" + filename = "jfr" + } mw := multipart.NewWriter(body) - part, err := mw.CreateFormFile("jfr", "jfr") + part, err := mw.CreateFormFile(fieldName, filename) if err != nil { return fmt.Errorf("failed to create form file: %w", err) } diff --git a/receiver/pyroscopereceiver/testdata/profile.pprof b/receiver/pyroscopereceiver/testdata/profile.pprof new file mode 100644 index 0000000..d6a7646 Binary files /dev/null and b/receiver/pyroscopereceiver/testdata/profile.pprof differ