From dd4b63b3f0ba595b0f4d2d3be1bb403726ac4ddf Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 11 Dec 2024 19:42:43 +0200 Subject: [PATCH 1/2] support octet/binary-stream --- receiver/pyroscopereceiver/receiver.go | 76 +++++++++++++++++++++----- 1 file changed, 61 insertions(+), 15 deletions(-) diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index ce08cfc..301f4fb 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/hex" "errors" "fmt" "io" @@ -324,6 +325,51 @@ func (r *pyroscopeReceiver) Push(ctx context.Context, req *connect.Request[pushv return connect.NewResponse(&pushv1.PushResponse{}), nil } +func (r *pyroscopeReceiver) getProfilesBuff(req *http.Request) (*bytes.Buffer, error) { + var err error + var buf *bytes.Buffer + defer func() { + if err != nil && buf != nil { + releaseBuf(r.uncompressedBufPool, buf) + } + }() + contentType := "" + if len(req.Header["Content-Type"]) > 0 { + contentType = req.Header["Content-Type"][0] + } + if strings.HasPrefix(contentType, "multipart/form-data") { + var f multipart.File + f, err = r.openMultipart(req) + if err != nil { + fmt.Println(req.URL.String()) + for k, v := range req.Header { + fmt.Printf("Header: %s: %v", k, v) + } + b, _ := io.ReadAll(req.Body) + //TODO: encode b to hex + fmt.Printf("Body: %s\n", hex.EncodeToString(b)) + return nil, err + } + defer f.Close() + + buf = acquireBuf(r.uncompressedBufPool) + err = r.decompressor.Decompress(f, compress.Gzip, buf) + if err != nil { + return nil, fmt.Errorf("failed to decompress body: %w", err) + } + return buf, nil + } + if strings.HasPrefix(contentType, "binary/octet-stream") { + buf = acquireBuf(r.uncompressedBufPool) + _, err = io.Copy(buf, req.Body) + if err != nil { + return buf, fmt.Errorf("failed to read body: %w", err) + } + return buf, nil + } + return nil, fmt.Errorf("unsupported content type: %s", contentType) +} + func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) { var ( tmp []string @@ -342,21 +388,14 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, p = pprofparser.NewPprofParser() } // support only multipart/form-data - f, err := r.openMultipart(req) + buf, err := r.getProfilesBuff(req) if err != nil { return logs, err } - defer f.Close() - - buf := acquireBuf(r.uncompressedBufPool) defer func() { releaseBuf(r.uncompressedBufPool, buf) }() - err = r.decompressor.Decompress(f, compress.Gzip, buf) - if err != nil { - return logs, fmt.Errorf("failed to decompress body: %w", err) - } // TODO: try measure compressed size otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, ""))) resetHeaders(req) @@ -384,15 +423,13 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, record := rs.AppendEmpty() if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") { timestampNs = ns(pm.start) - durationNs = pm.end - pm.start - durationNs = ns(durationNs) + durationNs = ns(pm.end) - ns(pm.start) } else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") { timestampNs = uint64(pr.TimeStampNao) durationNs = uint64(pr.DurationNano) } else { - timestampNs = pm.start - durationNs = pm.end - pm.start - durationNs = ns(durationNs) + timestampNs = ns(pm.start) + durationNs = ns(pm.end) - ns(pm.start) } record.SetTimestamp(pcommon.Timestamp(timestampNs)) m := record.Attributes() @@ -409,7 +446,7 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, postProcessProf(pr.Profile, &m) record.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes()) sz += pr.Payload.Len() - r.logger.Debug( + r.logger.Info( fmt.Sprintf("parsed profile %d", i), zap.Uint64("timestamp_ns", timestampNs), zap.String("type", pr.Type.Type), @@ -427,7 +464,16 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, } func ns(sec uint64) uint64 { - return sec * 1e9 + if sec < 10000000000000 { + return sec * 1e9 + } + if sec < 10000000000000000 { + return sec * 1e6 + } + if sec < 10000000000000000000 { + return sec * 1e3 + } + return sec } func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding string) *attribute.Set { From f7a88796b5f05b68864212b664a09525016b5cfa Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 11 Dec 2024 19:43:48 +0200 Subject: [PATCH 2/2] drop logging --- receiver/pyroscopereceiver/receiver.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 301f4fb..5dca8fa 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "context" - "encoding/hex" "errors" "fmt" "io" @@ -341,13 +340,6 @@ func (r *pyroscopeReceiver) getProfilesBuff(req *http.Request) (*bytes.Buffer, e var f multipart.File f, err = r.openMultipart(req) if err != nil { - fmt.Println(req.URL.String()) - for k, v := range req.Header { - fmt.Printf("Header: %s: %v", k, v) - } - b, _ := io.ReadAll(req.Body) - //TODO: encode b to hex - fmt.Printf("Body: %s\n", hex.EncodeToString(b)) return nil, err } defer f.Close()