Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix config for prealloc and refactor buf logic #42

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion receiver/pyroscopereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op

- `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6.
- `timeout`: sets the server reponse timeout. Default is 10 seconds.
- `request_body_size_expected_value`: sets the expected decompressed request body size in bytes to size pipeline buffers and optimize allocations. Default is 0.
- `request_body_uncompressed_size_bytes`: sets the expected value for uncompressed request body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.
- `parsed_body_uncompressed_size_bytes`: sets the expected value for uncompressed parsed body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.

## Example

Expand Down
12 changes: 12 additions & 0 deletions receiver/pyroscopereceiver/buf/prepare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package buf

import "bytes"

// Pre-allocates a buffer based on heuristics to minimize resize
func PrepareBuffer(uncompressedSizeBytes int64) *bytes.Buffer {
var buf bytes.Buffer
// extra space to try avoid realloc where expected size fits enough
// TODO: try internal simple statistical model to pre-allocate a buffer
buf.Grow(int(uncompressedSizeBytes) + bytes.MinRead)
return &buf
}
31 changes: 12 additions & 19 deletions receiver/pyroscopereceiver/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"compress/gzip"
"fmt"
"io"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
)

type codec uint8
Expand All @@ -15,16 +17,16 @@ const (

// Decodes compressed streams
type Decompressor struct {
decompressedSizeBytesExpectedValue int64
maxDecompressedSizeBytes int64
decoders map[codec]func(body io.Reader) (io.Reader, error)
uncompressedSizeBytes int64
maxUncompressedSizeBytes int64
decoders map[codec]func(body io.Reader) (io.Reader, error)
}

// Creates a new decompressor
func NewDecompressor(decompressedSizeBytesExpectedValue int64, maxDecompressedSizeBytes int64) *Decompressor {
func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64) *Decompressor {
return &Decompressor{
decompressedSizeBytesExpectedValue: decompressedSizeBytesExpectedValue,
maxDecompressedSizeBytes: maxDecompressedSizeBytes,
uncompressedSizeBytes: uncompressedSizeBytes,
maxUncompressedSizeBytes: maxUncompressedSizeBytes,
decoders: map[codec]func(r io.Reader) (io.Reader, error){
Gzip: func(r io.Reader) (io.Reader, error) {
gr, err := gzip.NewReader(r)
Expand All @@ -38,10 +40,10 @@ func NewDecompressor(decompressedSizeBytesExpectedValue int64, maxDecompressedSi
}

func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) {
buf := d.prepareBuffer()
buf := buf.PrepareBuffer(d.uncompressedSizeBytes)

// read max+1 to validate size via a single Read()
lr := io.LimitReader(r, d.maxDecompressedSizeBytes+1)
lr := io.LimitReader(r, d.maxUncompressedSizeBytes+1)

n, err := buf.ReadFrom(lr)
if err != nil {
Expand All @@ -50,8 +52,8 @@ func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) {
if n < 1 {
return nil, fmt.Errorf("empty profile")
}
if n > d.maxDecompressedSizeBytes {
return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxDecompressedSizeBytes)
if n > d.maxUncompressedSizeBytes {
return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes)
}
return buf, nil
}
Expand All @@ -70,12 +72,3 @@ func (d *Decompressor) Decompress(r io.Reader, c codec) (*bytes.Buffer, error) {

return d.readBytes(dr)
}

// Pre-allocates a buffer based on heuristics to minimize resize
func (d *Decompressor) prepareBuffer() *bytes.Buffer {
var buf bytes.Buffer
// extra space to try avoid realloc where expected size fits enough
// TODO: try simple statistical model to pre-allocate a buffer
buf.Grow(int(d.decompressedSizeBytesExpectedValue) + bytes.MinRead)
return &buf
}
19 changes: 13 additions & 6 deletions receiver/pyroscopereceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// Configures supported protocols
type Protocols struct {
// Http.MaxRequestBodySize configures max decompressed body size in bytes
// Http.MaxRequestBodySize configures max uncompressed body size in bytes
Http *confighttp.HTTPServerSettings `mapstructure:"http"`
}

Expand All @@ -20,8 +20,12 @@ type Config struct {

// Cofigures timeout for synchronous request handling by the receiver server
Timeout time.Duration `mapstructure:"timeout"`
// Configures expected decompressed request body size in bytes to size pipeline buffers
DecompressedRequestBodySizeBytesExpectedValue int64 `mapstructure:"request_body_size_expected_value"`
// Configures the expected value for uncompressed request body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
RequestBodyUncompressedSizeBytes int64 `mapstructure:"request_body_uncompressed_size_bytes"`
// Configures the expected value for uncompressed parsed body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
ParsedBodyUncompressedSizeBytes int64 `mapstructure:"parsed_body_uncompressed_size_bytes"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -34,11 +38,14 @@ func (cfg *Config) Validate() error {
if cfg.Protocols.Http.MaxRequestBodySize < 1 {
return fmt.Errorf("max_request_body_size must be positive")
}
if cfg.DecompressedRequestBodySizeBytesExpectedValue < 0 {
return fmt.Errorf("request_body_size_expected_value must be positive")
if cfg.RequestBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("request_body_uncompressed_size_bytes must be positive")
}
if cfg.DecompressedRequestBodySizeBytesExpectedValue > cfg.Protocols.Http.MaxRequestBodySize {
if cfg.RequestBodyUncompressedSizeBytes > cfg.Protocols.Http.MaxRequestBodySize {
return fmt.Errorf("expected value cannot be greater than max")
}
if cfg.ParsedBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("parsed_body_uncompressed_size_bytes must be positive")
}
return nil
}
8 changes: 5 additions & 3 deletions receiver/pyroscopereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const (
defaultHttpAddr = "0.0.0.0:8062"
defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata
defaultTimeout = 10 * time.Second
defaultDecompressedRequestBodySizeBytesExpectedValue = 0
defaultRequestBodyUncompressedSizeBytesExpectedValue = 0
defaultParsedBodyUncompressedSizeBytesExpectedValue = 0
)

func createDefaultConfig() component.Config {
Expand All @@ -27,8 +28,9 @@ func createDefaultConfig() component.Config {
MaxRequestBodySize: defaultMaxRequestBodySize,
},
},
Timeout: defaultTimeout,
DecompressedRequestBodySizeBytesExpectedValue: defaultDecompressedRequestBodySizeBytesExpectedValue,
Timeout: defaultTimeout,
RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue,
ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue,
}
}

Expand Down
5 changes: 3 additions & 2 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
pprof_proto "github.com/google/pprof/profile"
jfr_parser "github.com/grafana/jfr-parser/parser"
jfr_types "github.com/grafana/jfr-parser/parser/types"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func NewJfrPprofParser() *jfrPprofParser {
}

// Parses the jfr buffer into pprof. The returned slice may be empty without an error.
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, maxDecompressedSizeBytes int64) ([]profile_types.ProfileIR, error) {
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) {
var (
period int64
event string
Expand Down Expand Up @@ -114,7 +115,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, ma
for _, pr := range pa.proftab {
if nil != pr {
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = &bytes.Buffer{} // TODO: try simple statistical model to pre-allocate a buffer
pr.prof.Payload = buf.PrepareBuffer(parsedBodyUncompressedSizeBytes)
pr.pprof.WriteUncompressed(pr.prof.Payload)
ps = append(ps, pr.prof)
}
Expand Down
20 changes: 10 additions & 10 deletions receiver/pyroscopereceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
const prefix = "receiver_pyroscope_"

var (
otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter
otelcolReceiverPyroscopeReceivedPayloadSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeParsedPayloadSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram
otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram
otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
Expand All @@ -23,15 +23,15 @@ func initMetrics(meter metric.Meter) error {
); err != nil {
return err
}
if otelcolReceiverPyroscopeReceivedPayloadSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "received_payload_size_bytes"),
metric.WithDescription("Pyroscope receiver received payload size in bytes"),
if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"),
); err != nil {
return err
}
if otelcolReceiverPyroscopeParsedPayloadSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "parsed_payload_size_bytes"),
metric.WithDescription("Pyroscope receiver parsed payload size in bytes"),
if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"),
); err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type pyroscopeReceiver struct {

type parser interface {
// Parses the given input buffer into the collector's profile IR
Parse(buf *bytes.Buffer, md profile_types.Metadata, maxDecompressedSizeBytes int64) ([]profile_types.ProfileIR, error)
Parse(buf *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error)
}

type params struct {
Expand All @@ -78,7 +78,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Cre
meter: set.MeterProvider.Meter(typeStr),
next: consumer,
}
recv.decompressor = compress.NewDecompressor(recv.cfg.DecompressedRequestBodySizeBytesExpectedValue, recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.decompressor = compress.NewDecompressor(recv.cfg.RequestBodyUncompressedSizeBytes, recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.httpMux = http.NewServeMux()
recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) {
recv.httpHandlerIngest(resp, req)
Expand Down Expand Up @@ -248,7 +248,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
return logs, fmt.Errorf("failed to decompress body: %w", err)
}
// TODO: try measure compressed size
otelcolReceiverPyroscopeReceivedPayloadSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, "")))
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, "")))
resetHeaders(req)

md := profile_types.Metadata{SampleRateHertz: 0}
Expand All @@ -261,7 +261,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
md.SampleRateHertz = hz
}

ps, err := pa.Parse(buf, md, recv.cfg.Protocols.Http.MaxRequestBodySize)
ps, err := pa.Parse(buf, md, recv.cfg.ParsedBodyUncompressedSizeBytes)
if err != nil {
return logs, fmt.Errorf("failed to parse pprof: %w", err)
}
Expand All @@ -283,7 +283,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
sz += pr.Payload.Len()
}
// sz may be 0 and it will be recorded
otelcolReceiverPyroscopeParsedPayloadSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, "")))
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, "")))
return logs, nil
}

Expand Down Expand Up @@ -324,7 +324,7 @@ func resetHeaders(req *http.Request) {
// reset content-type for the new binary jfr body
req.Header.Set("Content-Type", "application/octet-stream")
// multipart content-types cannot have encodings so no need to Del() Content-Encoding
// reset "Content-Length" to -1 as the size of the decompressed body is unknown
// reset "Content-Length" to -1 as the size of the uncompressed body is unknown
req.Header.Del("Content-Length")
req.ContentLength = -1
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) {
MaxRequestBodySize: defaultMaxRequestBodySize,
},
},
Timeout: defaultTimeout,
DecompressedRequestBodySizeBytesExpectedValue: defaultDecompressedRequestBodySizeBytesExpectedValue,
Timeout: defaultTimeout,
RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue,
ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue,
}
sink := new(consumertest.LogsSink)
set := receivertest.NewNopCreateSettings()
Expand Down
Loading