Skip to content

Commit

Permalink
Merge pull request #42 from tomershafir/fix/pyroscope-buf
Browse files Browse the repository at this point in the history
Fix config for prealloc and refactor buf logic
  • Loading branch information
akvlad authored Jan 9, 2024
2 parents bc20882 + 086dd4a commit 4bfc421
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 49 deletions.
3 changes: 2 additions & 1 deletion receiver/pyroscopereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op

- `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6.
- `timeout`: sets the server reponse timeout. Default is 10 seconds.
- `request_body_size_expected_value`: sets the expected decompressed request body size in bytes to size pipeline buffers and optimize allocations. Default is 0.
- `request_body_uncompressed_size_bytes`: sets the expected value for uncompressed request body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.
- `parsed_body_uncompressed_size_bytes`: sets the expected value for uncompressed parsed body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.

## Example

Expand Down
12 changes: 12 additions & 0 deletions receiver/pyroscopereceiver/buf/prepare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package buf

import "bytes"

// Pre-allocates a buffer based on heuristics to minimize resize
func PrepareBuffer(uncompressedSizeBytes int64) *bytes.Buffer {
var buf bytes.Buffer
// extra space to try avoid realloc where expected size fits enough
// TODO: try internal simple statistical model to pre-allocate a buffer
buf.Grow(int(uncompressedSizeBytes) + bytes.MinRead)
return &buf
}
31 changes: 12 additions & 19 deletions receiver/pyroscopereceiver/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"compress/gzip"
"fmt"
"io"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
)

type codec uint8
Expand All @@ -15,16 +17,16 @@ const (

// Decodes compressed streams
type Decompressor struct {
decompressedSizeBytesExpectedValue int64
maxDecompressedSizeBytes int64
decoders map[codec]func(body io.Reader) (io.Reader, error)
uncompressedSizeBytes int64
maxUncompressedSizeBytes int64
decoders map[codec]func(body io.Reader) (io.Reader, error)
}

// Creates a new decompressor
func NewDecompressor(decompressedSizeBytesExpectedValue int64, maxDecompressedSizeBytes int64) *Decompressor {
func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64) *Decompressor {
return &Decompressor{
decompressedSizeBytesExpectedValue: decompressedSizeBytesExpectedValue,
maxDecompressedSizeBytes: maxDecompressedSizeBytes,
uncompressedSizeBytes: uncompressedSizeBytes,
maxUncompressedSizeBytes: maxUncompressedSizeBytes,
decoders: map[codec]func(r io.Reader) (io.Reader, error){
Gzip: func(r io.Reader) (io.Reader, error) {
gr, err := gzip.NewReader(r)
Expand All @@ -38,10 +40,10 @@ func NewDecompressor(decompressedSizeBytesExpectedValue int64, maxDecompressedSi
}

func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) {
buf := d.prepareBuffer()
buf := buf.PrepareBuffer(d.uncompressedSizeBytes)

// read max+1 to validate size via a single Read()
lr := io.LimitReader(r, d.maxDecompressedSizeBytes+1)
lr := io.LimitReader(r, d.maxUncompressedSizeBytes+1)

n, err := buf.ReadFrom(lr)
if err != nil {
Expand All @@ -50,8 +52,8 @@ func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) {
if n < 1 {
return nil, fmt.Errorf("empty profile")
}
if n > d.maxDecompressedSizeBytes {
return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxDecompressedSizeBytes)
if n > d.maxUncompressedSizeBytes {
return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes)
}
return buf, nil
}
Expand All @@ -70,12 +72,3 @@ func (d *Decompressor) Decompress(r io.Reader, c codec) (*bytes.Buffer, error) {

return d.readBytes(dr)
}

// Pre-allocates a buffer based on heuristics to minimize resize
func (d *Decompressor) prepareBuffer() *bytes.Buffer {
var buf bytes.Buffer
// extra space to try avoid realloc where expected size fits enough
// TODO: try simple statistical model to pre-allocate a buffer
buf.Grow(int(d.decompressedSizeBytesExpectedValue) + bytes.MinRead)
return &buf
}
19 changes: 13 additions & 6 deletions receiver/pyroscopereceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// Configures supported protocols
type Protocols struct {
// Http.MaxRequestBodySize configures max decompressed body size in bytes
// Http.MaxRequestBodySize configures max uncompressed body size in bytes
Http *confighttp.HTTPServerSettings `mapstructure:"http"`
}

Expand All @@ -20,8 +20,12 @@ type Config struct {

// Cofigures timeout for synchronous request handling by the receiver server
Timeout time.Duration `mapstructure:"timeout"`
// Configures expected decompressed request body size in bytes to size pipeline buffers
DecompressedRequestBodySizeBytesExpectedValue int64 `mapstructure:"request_body_size_expected_value"`
// Configures the expected value for uncompressed request body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
RequestBodyUncompressedSizeBytes int64 `mapstructure:"request_body_uncompressed_size_bytes"`
// Configures the expected value for uncompressed parsed body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
ParsedBodyUncompressedSizeBytes int64 `mapstructure:"parsed_body_uncompressed_size_bytes"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -34,11 +38,14 @@ func (cfg *Config) Validate() error {
if cfg.Protocols.Http.MaxRequestBodySize < 1 {
return fmt.Errorf("max_request_body_size must be positive")
}
if cfg.DecompressedRequestBodySizeBytesExpectedValue < 0 {
return fmt.Errorf("request_body_size_expected_value must be positive")
if cfg.RequestBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("request_body_uncompressed_size_bytes must be positive")
}
if cfg.DecompressedRequestBodySizeBytesExpectedValue > cfg.Protocols.Http.MaxRequestBodySize {
if cfg.RequestBodyUncompressedSizeBytes > cfg.Protocols.Http.MaxRequestBodySize {
return fmt.Errorf("expected value cannot be greater than max")
}
if cfg.ParsedBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("parsed_body_uncompressed_size_bytes must be positive")
}
return nil
}
8 changes: 5 additions & 3 deletions receiver/pyroscopereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const (
defaultHttpAddr = "0.0.0.0:8062"
defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata
defaultTimeout = 10 * time.Second
defaultDecompressedRequestBodySizeBytesExpectedValue = 0
defaultRequestBodyUncompressedSizeBytesExpectedValue = 0
defaultParsedBodyUncompressedSizeBytesExpectedValue = 0
)

func createDefaultConfig() component.Config {
Expand All @@ -27,8 +28,9 @@ func createDefaultConfig() component.Config {
MaxRequestBodySize: defaultMaxRequestBodySize,
},
},
Timeout: defaultTimeout,
DecompressedRequestBodySizeBytesExpectedValue: defaultDecompressedRequestBodySizeBytesExpectedValue,
Timeout: defaultTimeout,
RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue,
ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue,
}
}

Expand Down
5 changes: 3 additions & 2 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
pprof_proto "github.com/google/pprof/profile"
jfr_parser "github.com/grafana/jfr-parser/parser"
jfr_types "github.com/grafana/jfr-parser/parser/types"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func NewJfrPprofParser() *jfrPprofParser {
}

// Parses the jfr buffer into pprof. The returned slice may be empty without an error.
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, maxDecompressedSizeBytes int64) ([]profile_types.ProfileIR, error) {
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) {
var (
period int64
event string
Expand Down Expand Up @@ -114,7 +115,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, ma
for _, pr := range pa.proftab {
if nil != pr {
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = &bytes.Buffer{} // TODO: try simple statistical model to pre-allocate a buffer
pr.prof.Payload = buf.PrepareBuffer(parsedBodyUncompressedSizeBytes)
pr.pprof.WriteUncompressed(pr.prof.Payload)
ps = append(ps, pr.prof)
}
Expand Down
20 changes: 10 additions & 10 deletions receiver/pyroscopereceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
const prefix = "receiver_pyroscope_"

var (
otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter
otelcolReceiverPyroscopeReceivedPayloadSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeParsedPayloadSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram
otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
Expand All @@ -23,15 +23,15 @@ func initMetrics(meter metric.Meter) error {
); err != nil {
return err
}
if otelcolReceiverPyroscopeReceivedPayloadSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "received_payload_size_bytes"),
metric.WithDescription("Pyroscope receiver received payload size in bytes"),
if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"),
); err != nil {
return err
}
if otelcolReceiverPyroscopeParsedPayloadSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "parsed_payload_size_bytes"),
metric.WithDescription("Pyroscope receiver parsed payload size in bytes"),
if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"),
); err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type pyroscopeReceiver struct {

type parser interface {
// Parses the given input buffer into the collector's profile IR
Parse(buf *bytes.Buffer, md profile_types.Metadata, maxDecompressedSizeBytes int64) ([]profile_types.ProfileIR, error)
Parse(buf *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error)
}

type params struct {
Expand All @@ -78,7 +78,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Cre
meter: set.MeterProvider.Meter(typeStr),
next: consumer,
}
recv.decompressor = compress.NewDecompressor(recv.cfg.DecompressedRequestBodySizeBytesExpectedValue, recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.decompressor = compress.NewDecompressor(recv.cfg.RequestBodyUncompressedSizeBytes, recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.httpMux = http.NewServeMux()
recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) {
recv.httpHandlerIngest(resp, req)
Expand Down Expand Up @@ -248,7 +248,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
return logs, fmt.Errorf("failed to decompress body: %w", err)
}
// TODO: try measure compressed size
otelcolReceiverPyroscopeReceivedPayloadSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, "")))
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, "")))
resetHeaders(req)

md := profile_types.Metadata{SampleRateHertz: 0}
Expand All @@ -261,7 +261,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
md.SampleRateHertz = hz
}

ps, err := pa.Parse(buf, md, recv.cfg.Protocols.Http.MaxRequestBodySize)
ps, err := pa.Parse(buf, md, recv.cfg.ParsedBodyUncompressedSizeBytes)
if err != nil {
return logs, fmt.Errorf("failed to parse pprof: %w", err)
}
Expand All @@ -283,7 +283,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
sz += pr.Payload.Len()
}
// sz may be 0 and it will be recorded
otelcolReceiverPyroscopeParsedPayloadSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, "")))
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, "")))
return logs, nil
}

Expand Down Expand Up @@ -324,7 +324,7 @@ func resetHeaders(req *http.Request) {
// reset content-type for the new binary jfr body
req.Header.Set("Content-Type", "application/octet-stream")
// multipart content-types cannot have encodings so no need to Del() Content-Encoding
// reset "Content-Length" to -1 as the size of the decompressed body is unknown
// reset "Content-Length" to -1 as the size of the uncompressed body is unknown
req.Header.Del("Content-Length")
req.ContentLength = -1
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) {
MaxRequestBodySize: defaultMaxRequestBodySize,
},
},
Timeout: defaultTimeout,
DecompressedRequestBodySizeBytesExpectedValue: defaultDecompressedRequestBodySizeBytesExpectedValue,
Timeout: defaultTimeout,
RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue,
ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue,
}
sink := new(consumertest.LogsSink)
set := receivertest.NewNopCreateSettings()
Expand Down

0 comments on commit 4bfc421

Please sign in to comment.