Skip to content

Commit

Permalink
Merge pull request #114 from metrico/feature/support-binary-profiles
Browse files Browse the repository at this point in the history
Support binary octet/binary content type for profile ingestor
  • Loading branch information
akvlad authored Dec 11, 2024
2 parents 283b4e8 + f7a8879 commit 7afc304
Showing 1 changed file with 53 additions and 15 deletions.
68 changes: 53 additions & 15 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,44 @@ func (r *pyroscopeReceiver) Push(ctx context.Context, req *connect.Request[pushv
return connect.NewResponse(&pushv1.PushResponse{}), nil
}

func (r *pyroscopeReceiver) getProfilesBuff(req *http.Request) (*bytes.Buffer, error) {
var err error
var buf *bytes.Buffer
defer func() {
if err != nil && buf != nil {
releaseBuf(r.uncompressedBufPool, buf)
}
}()
contentType := ""
if len(req.Header["Content-Type"]) > 0 {
contentType = req.Header["Content-Type"][0]
}
if strings.HasPrefix(contentType, "multipart/form-data") {
var f multipart.File
f, err = r.openMultipart(req)
if err != nil {
return nil, err
}
defer f.Close()

buf = acquireBuf(r.uncompressedBufPool)
err = r.decompressor.Decompress(f, compress.Gzip, buf)
if err != nil {
return nil, fmt.Errorf("failed to decompress body: %w", err)
}
return buf, nil
}
if strings.HasPrefix(contentType, "binary/octet-stream") {
buf = acquireBuf(r.uncompressedBufPool)
_, err = io.Copy(buf, req.Body)
if err != nil {
return buf, fmt.Errorf("failed to read body: %w", err)
}
return buf, nil
}
return nil, fmt.Errorf("unsupported content type: %s", contentType)
}

func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) {
var (
tmp []string
Expand All @@ -342,21 +380,14 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
p = pprofparser.NewPprofParser()
}
// support only multipart/form-data
f, err := r.openMultipart(req)
buf, err := r.getProfilesBuff(req)
if err != nil {
return logs, err
}
defer f.Close()

buf := acquireBuf(r.uncompressedBufPool)
defer func() {
releaseBuf(r.uncompressedBufPool, buf)
}()

err = r.decompressor.Decompress(f, compress.Gzip, buf)
if err != nil {
return logs, fmt.Errorf("failed to decompress body: %w", err)
}
// TODO: try measure compressed size
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, "")))
resetHeaders(req)
Expand Down Expand Up @@ -384,15 +415,13 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
record := rs.AppendEmpty()
if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") {
timestampNs = ns(pm.start)
durationNs = pm.end - pm.start
durationNs = ns(durationNs)
durationNs = ns(pm.end) - ns(pm.start)
} else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") {
timestampNs = uint64(pr.TimeStampNao)
durationNs = uint64(pr.DurationNano)
} else {
timestampNs = pm.start
durationNs = pm.end - pm.start
durationNs = ns(durationNs)
timestampNs = ns(pm.start)
durationNs = ns(pm.end) - ns(pm.start)
}
record.SetTimestamp(pcommon.Timestamp(timestampNs))
m := record.Attributes()
Expand All @@ -409,7 +438,7 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
postProcessProf(pr.Profile, &m)
record.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes())
sz += pr.Payload.Len()
r.logger.Debug(
r.logger.Info(
fmt.Sprintf("parsed profile %d", i),
zap.Uint64("timestamp_ns", timestampNs),
zap.String("type", pr.Type.Type),
Expand All @@ -427,7 +456,16 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
}

func ns(sec uint64) uint64 {
return sec * 1e9
if sec < 10000000000000 {
return sec * 1e9
}
if sec < 10000000000000000 {
return sec * 1e6
}
if sec < 10000000000000000000 {
return sec * 1e3
}
return sec
}

func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding string) *attribute.Set {
Expand Down

0 comments on commit 7afc304

Please sign in to comment.