Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/node-pprof #67

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions receiver/pyroscopereceiver/nodeparser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package nodeparser

import (
"bytes"
"fmt"

pprof_proto "github.com/google/pprof/profile"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

type sampleType uint8

const (
sampleTypeCpu sampleType = iota
sampleTypeCount
)

type profileWrapper struct {
pprof *pprof_proto.Profile
prof profile_types.ProfileIR
}

type nodePprofParser struct {
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
}

// Creates a pprof parser that parse the accepted node buffer
func NewNodePprofParser() *nodePprofParser {
return &nodePprofParser{}
}

func (pa *nodePprofParser) Parse(data *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) {
// Parse pprof data
pProfData, err := pprof_proto.Parse(data)
if err != nil {
return nil, err
}

// Process pprof data and create SampleType slice
var sampleTypes []string
var sampleUnits []string
var valueAggregates []profile_types.SampleType

for i, st := range pProfData.SampleType {
sampleTypes = append(sampleTypes, pProfData.SampleType[i].Type)
sampleUnits = append(sampleUnits, pProfData.SampleType[i].Unit)
sum, count := calculateSumAndCount(pProfData, i)
valueAggregates = append(valueAggregates, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
}

var profiles []profile_types.ProfileIR
var profileType string
switch pProfData.PeriodType.Type {
case "cpu":
profileType = "process_cpu"
case "wall":
profileType = "wall"
case "mutex", "contentions":
profileType = "mutex"
case "goroutine":
profileType = "goroutines"
case "objects", "space", "alloc_space", "alloc_objects", "inuse", "inuse_space", "inuse_objects":
profileType = "memory"
case "block":
profileType = "block"
}

profileTypeInfo := profile_types.ProfileType{
PeriodType: pProfData.PeriodType.Type,
PeriodUnit: pProfData.PeriodType.Unit,
SampleType: sampleTypes,
SampleUnit: sampleUnits,
Type: profileType,
}

// Create a new ProfileIR instance
profile := profile_types.ProfileIR{
ValueAggregation: valueAggregates,
Type: profileTypeInfo,
TimeStampNao: pProfData.TimeNanos,
DurationNano: pProfData.DurationNanos,
}
profile.Payload = new(bytes.Buffer)
pProfData.WriteUncompressed(profile.Payload)
// Append the profile to the result
profiles = append(profiles, profile)

return profiles, nil
}

func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) {
var sum int64
count := int32(len(samples.Sample))
for _, sample := range samples.Sample {
// Check if the sample has a value for the specified sample type
if sampleTypeIndex < len(sample.Value) {
// Accumulate the value for the specified sample type
sum += sample.Value[sampleTypeIndex]
}
}

return sum, count
}
51 changes: 30 additions & 21 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"strings"
"sync"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/nodeparser"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/pprofparser"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -168,15 +168,14 @@ func readParams(qs *url.Values) (params, error) {
p params = params{}
)

if tmp, ok = qsv["from"]; !ok {
return p, fmt.Errorf("required start time is missing")
}
start, err := strconv.ParseUint(tmp[0], 10, 64)
if err != nil {
return p, fmt.Errorf("failed to parse start time: %w", err)
tmp, ok = qsv["from"]
if ok {
start, err := strconv.ParseUint(tmp[0], 10, 64)
if err != nil {
return p, fmt.Errorf("failed to parse start time: %w", err)
}
p.start = start
}
p.start = start

if tmp, ok = qsv["name"]; !ok {
return p, fmt.Errorf("required labels are missing")
}
Expand All @@ -200,15 +199,15 @@ func readParams(qs *url.Values) (params, error) {
}
// required app name
p.name = tmp[0][:i]

if tmp, ok = qsv["until"]; !ok {
return p, fmt.Errorf("required end time is missing")
}
end, err := strconv.ParseUint(tmp[0], 10, 64)
if err != nil {
return p, fmt.Errorf("failed to parse end time: %w", err)
tmp, ok = qsv["until"]
if ok {
end, err := strconv.ParseUint(tmp[0], 10, 64)
if err != nil {
return p, fmt.Errorf("failed to parse end time: %w", err)
}
p.end = end
}
p.end = end

return p, nil
}

Expand Down Expand Up @@ -246,6 +245,8 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
qs := req.URL.Query()
if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") {
pa = jfrparser.NewJfrPprofParser()
} else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") {
pa = nodeparser.NewNodePprofParser()
} else {
pa = pprofparser.NewPprofParser()

Expand Down Expand Up @@ -289,15 +290,23 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
rs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
for i, pr := range ps {
var timestampNs uint64
var durationNs uint64
r := rs.AppendEmpty()
if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") {
timestampNs = ns(pm.start)
durationNs = pm.end - pm.start
durationNs = ns(durationNs)
} else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") {
timestampNs = uint64(pr.TimeStampNao)
durationNs = uint64(pr.DurationNano)
} else {
timestampNs = pm.start
durationNs = pm.end - pm.start
durationNs = ns(durationNs)
}
r.SetTimestamp(pcommon.Timestamp(timestampNs))
m := r.Attributes()
m.PutStr("duration_ns", fmt.Sprint(ns(pm.end-pm.start)))
m.PutStr("duration_ns", fmt.Sprint(durationNs))
m.PutStr("service_name", pm.name)
tm := m.PutEmptyMap("tags")
for _, l := range pm.labels {
Expand Down Expand Up @@ -352,10 +361,10 @@ func (recv *pyroscopeReceiver) openMultipart(req *http.Request) (multipart.File,
}
}
if part == nil {
return nil, fmt.Errorf("required jfr/pprof part is missing")
return nil, fmt.Errorf("required jfr/pprof/node part is missing")
}
fh := part[0]
if fh.Filename != formatJfr && fh.Filename != filePprof {
if fh.Filename != formatJfr && fh.Filename != filePprof && fh.Filename != formatPprof {
return nil, fmt.Errorf("filename is not '%s or %s'", formatJfr, formatPprof)
}
f, err := fh.Open()
Expand Down
2 changes: 2 additions & 0 deletions receiver/pyroscopereceiver/types/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ProfileType struct {
// Parser IR for profile processing
type ProfileIR struct {
Type ProfileType
DurationNano int64
TimeStampNao int64
Payload *bytes.Buffer
PayloadType PayloadType
ValueAggregation interface{}
Expand Down
Loading