diff --git a/receiver/pyroscopereceiver/README.md b/receiver/pyroscopereceiver/README.md index 7a20998..e0aa639 100644 --- a/receiver/pyroscopereceiver/README.md +++ b/receiver/pyroscopereceiver/README.md @@ -11,8 +11,6 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op - `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6. - `timeout`: sets the server reponse timeout. Default is 10 seconds. -- `request_body_uncompressed_size_bytes`: sets the expected value for uncompressed request body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0. -- `parsed_body_uncompressed_size_bytes`: sets the expected value for uncompressed parsed body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0. ## Example diff --git a/receiver/pyroscopereceiver/buf/prepare.go b/receiver/pyroscopereceiver/buf/prepare.go deleted file mode 100644 index ad5f829..0000000 --- a/receiver/pyroscopereceiver/buf/prepare.go +++ /dev/null @@ -1,12 +0,0 @@ -package buf - -import "bytes" - -// Pre-allocates a buffer based on heuristics to minimize resize -func PrepareBuffer(uncompressedSizeBytes int64) *bytes.Buffer { - var buf bytes.Buffer - // extra space to try avoid realloc where expected size fits enough - // TODO: try internal simple statistical model to pre-allocate a buffer - buf.Grow(int(uncompressedSizeBytes) + bytes.MinRead) - return &buf -} diff --git a/receiver/pyroscopereceiver/compress/compress.go b/receiver/pyroscopereceiver/compress/decompress.go similarity index 57% rename from receiver/pyroscopereceiver/compress/compress.go rename to receiver/pyroscopereceiver/compress/decompress.go index 9106559..0381c56 100644 --- a/receiver/pyroscopereceiver/compress/compress.go +++ b/receiver/pyroscopereceiver/compress/decompress.go @@ -5,8 +5,6 @@ import ( "compress/gzip" "fmt" "io" - - "github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf" ) type codec uint8 @@ -17,15 +15,13 @@ const ( // Decodes compressed streams type Decompressor struct { - uncompressedSizeBytes int64 maxUncompressedSizeBytes int64 decoders map[codec]func(body io.Reader) (io.Reader, error) } // Creates a new decompressor -func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64) *Decompressor { +func NewDecompressor(maxUncompressedSizeBytes int64) *Decompressor { return &Decompressor{ - uncompressedSizeBytes: uncompressedSizeBytes, maxUncompressedSizeBytes: maxUncompressedSizeBytes, decoders: map[codec]func(r io.Reader) (io.Reader, error){ Gzip: func(r io.Reader) (io.Reader, error) { @@ -39,36 +35,34 @@ func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64 } } -func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) { - buf := buf.PrepareBuffer(d.uncompressedSizeBytes) - +func (d *Decompressor) readBytes(r io.Reader, out *bytes.Buffer) error { // read max+1 to validate size via a single Read() lr := io.LimitReader(r, d.maxUncompressedSizeBytes+1) - n, err := buf.ReadFrom(lr) + n, err := out.ReadFrom(lr) if err != nil { - return nil, err + return err } if n < 1 { - return nil, fmt.Errorf("empty profile") + return fmt.Errorf("empty profile") } if n > d.maxUncompressedSizeBytes { - return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes) + return fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes) } - return buf, nil + return nil } // Decodes the accepted reader, applying the configured size limit to avoid oom by compression bomb -func (d *Decompressor) Decompress(r io.Reader, c codec) (*bytes.Buffer, error) { +func (d *Decompressor) Decompress(r io.Reader, c codec, out *bytes.Buffer) error { decoder, ok := d.decoders[c] if !ok { - return nil, fmt.Errorf("unsupported encoding") + return fmt.Errorf("unsupported encoding") } dr, err := decoder(r) if err != nil { - return nil, err + return err } - return d.readBytes(dr) + return d.readBytes(dr, out) } diff --git a/receiver/pyroscopereceiver/config.go b/receiver/pyroscopereceiver/config.go index e54081a..d7e0b2f 100644 --- a/receiver/pyroscopereceiver/config.go +++ b/receiver/pyroscopereceiver/config.go @@ -20,12 +20,6 @@ type Config struct { // Cofigures timeout for synchronous request handling by the receiver server Timeout time.Duration `mapstructure:"timeout"` - // Configures the expected value for uncompressed request body size in bytes to size pipeline buffers - // and optimize allocations based on exported metrics - RequestBodyUncompressedSizeBytes int64 `mapstructure:"request_body_uncompressed_size_bytes"` - // Configures the expected value for uncompressed parsed body size in bytes to size pipeline buffers - // and optimize allocations based on exported metrics - ParsedBodyUncompressedSizeBytes int64 `mapstructure:"parsed_body_uncompressed_size_bytes"` } var _ component.Config = (*Config)(nil) @@ -38,14 +32,5 @@ func (cfg *Config) Validate() error { if cfg.Protocols.Http.MaxRequestBodySize < 1 { return fmt.Errorf("max_request_body_size must be positive") } - if cfg.RequestBodyUncompressedSizeBytes < 0 { - return fmt.Errorf("request_body_uncompressed_size_bytes must be positive") - } - if cfg.RequestBodyUncompressedSizeBytes > cfg.Protocols.Http.MaxRequestBodySize { - return fmt.Errorf("expected value cannot be greater than max") - } - if cfg.ParsedBodyUncompressedSizeBytes < 0 { - return fmt.Errorf("parsed_body_uncompressed_size_bytes must be positive") - } return nil } diff --git a/receiver/pyroscopereceiver/factory.go b/receiver/pyroscopereceiver/factory.go index d698fdf..4839e3e 100644 --- a/receiver/pyroscopereceiver/factory.go +++ b/receiver/pyroscopereceiver/factory.go @@ -13,11 +13,9 @@ import ( const ( typeStr = "pyroscopereceiver" - defaultHttpAddr = "0.0.0.0:8062" - defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata - defaultTimeout = 10 * time.Second - defaultRequestBodyUncompressedSizeBytesExpectedValue = 0 - defaultParsedBodyUncompressedSizeBytesExpectedValue = 0 + defaultHttpAddr = "0.0.0.0:8062" + defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata + defaultTimeout = 10 * time.Second ) func createDefaultConfig() component.Config { @@ -28,9 +26,7 @@ func createDefaultConfig() component.Config { MaxRequestBodySize: defaultMaxRequestBodySize, }, }, - Timeout: defaultTimeout, - RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue, - ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue, + Timeout: defaultTimeout, } } diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index 0db1ed7..308b3fb 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -8,7 +8,6 @@ import ( pprof_proto "github.com/google/pprof/profile" jfr_parser "github.com/grafana/jfr-parser/parser" jfr_types "github.com/grafana/jfr-parser/parser/types" - "github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf" profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types" ) @@ -55,7 +54,7 @@ func NewJfrPprofParser() *jfrPprofParser { } // Parses the jfr buffer into pprof. The returned slice may be empty without an error. -func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) { +func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) { var ( period int64 event string @@ -115,7 +114,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, pa for _, pr := range pa.proftab { if nil != pr { // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof - pr.prof.Payload = buf.PrepareBuffer(parsedBodyUncompressedSizeBytes) + pr.prof.Payload = &bytes.Buffer{} pr.pprof.WriteUncompressed(pr.prof.Payload) ps = append(ps, pr.prof) } diff --git a/receiver/pyroscopereceiver/pool_alloc_test.go b/receiver/pyroscopereceiver/pool_alloc_test.go new file mode 100644 index 0000000..820e9e7 --- /dev/null +++ b/receiver/pyroscopereceiver/pool_alloc_test.go @@ -0,0 +1,49 @@ +package pyroscopereceiver + +import ( + "bytes" + "compress/gzip" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress" + "github.com/stretchr/testify/assert" +) + +func TestAllocDecompress(t *testing.T) { + dist := []string{ + filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"), + filepath.Join("testdata", "memory_alloc_live_example.jfr"), + } + compressed := []*bytes.Buffer{ + loadCompressed(t, dist[0]), + loadCompressed(t, dist[1]), + } + d := compress.NewDecompressor(1024 * 1024 * 1024) + j := 0 + p := &sync.Pool{} + + n := testing.AllocsPerRun(100, func() { + buf := acquireBuf(p) + d.Decompress(compressed[j], compress.Gzip, buf) + releaseBuf(p, buf) + j = (j + 1) % len(dist) + }) + t.Logf("\naverage alloc count: %f", n) +} + +func loadCompressed(t *testing.T, jfr string) *bytes.Buffer { + uncompressed, err := os.ReadFile(jfr) + if err != nil { + assert.NoError(t, err, "failed to load jfr") + } + compressed := new(bytes.Buffer) + gw := gzip.NewWriter(compressed) + if _, err := gw.Write(uncompressed); err != nil { + assert.NoError(t, err, "failed to compress jfr") + } + gw.Close() + return compressed +} diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 912c0aa..cd4f348 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -56,11 +56,13 @@ type pyroscopeReceiver struct { decompressor *compress.Decompressor httpServer *http.Server shutdownWg sync.WaitGroup + + uncompressedBufPool *sync.Pool } type parser interface { // Parses the given input buffer into the collector's profile IR - Parse(buf *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) + Parse(buf *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) } type params struct { @@ -72,13 +74,14 @@ type params struct { func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.CreateSettings) (*pyroscopeReceiver, error) { recv := &pyroscopeReceiver{ - cfg: cfg, - set: set, - logger: set.Logger, - meter: set.MeterProvider.Meter(typeStr), - next: consumer, + cfg: cfg, + set: set, + logger: set.Logger, + meter: set.MeterProvider.Meter(typeStr), + next: consumer, + uncompressedBufPool: &sync.Pool{}, } - recv.decompressor = compress.NewDecompressor(recv.cfg.RequestBodyUncompressedSizeBytes, recv.cfg.Protocols.Http.MaxRequestBodySize) + recv.decompressor = compress.NewDecompressor(recv.cfg.Protocols.Http.MaxRequestBodySize) recv.httpMux = http.NewServeMux() recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) { recv.httpHandlerIngest(resp, req) @@ -151,7 +154,7 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri } otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) - otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().Unix()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) + otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().UnixMilli()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) writeResponseNoContent(resp) }() return c @@ -221,6 +224,20 @@ func newOtelcolAttrSetHttp(service string, errorCode string) *attribute.Set { return &s } +func acquireBuf(p *sync.Pool) *bytes.Buffer { + v := p.Get() + if v == nil { + v = new(bytes.Buffer) + } + buf := v.(*bytes.Buffer) + return buf +} + +func releaseBuf(p *sync.Pool, buf *bytes.Buffer) { + buf.Reset() + p.Put(buf) +} + func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) { var ( tmp []string @@ -243,7 +260,12 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque } defer f.Close() - buf, err := recv.decompressor.Decompress(f, compress.Gzip) + buf := acquireBuf(recv.uncompressedBufPool) + defer func() { + releaseBuf(recv.uncompressedBufPool, buf) + }() + + err = recv.decompressor.Decompress(f, compress.Gzip, buf) if err != nil { return logs, fmt.Errorf("failed to decompress body: %w", err) } @@ -261,7 +283,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque md.SampleRateHertz = hz } - ps, err := pa.Parse(buf, md, recv.cfg.ParsedBodyUncompressedSizeBytes) + ps, err := pa.Parse(buf, md) if err != nil { return logs, fmt.Errorf("failed to parse pprof: %w", err) }