diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index ab0f149..fbff9ca 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -37,6 +37,25 @@ func NewClickhouseAccessNativeColumnar(opts *clickhouse.Options, logger *zap.Log return nc, nil } +func valueToStringArray(v pcommon.Value) ([]string, error) { + raw := v.AsRaw() + var ( + rawArray []any + ok bool + ) + + if rawArray, ok = raw.([]any); !ok { + return nil, fmt.Errorf("failed to convert value to []any") + } + res := make([]string, len(rawArray)) + for i, v := range rawArray { + if res[i], ok = v.(string); !ok { + return nil, fmt.Errorf("failed to convert value [%d] to string", i) + } + } + return res, nil +} + // Inserts a profile batch into the clickhouse server using columnar native protocol func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input") @@ -53,6 +72,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { timestamp_ns := make([]uint64, sz) typ := make([]string, sz) service_name := make([]string, sz) + sample_types_units := make([][]tuple, sz) period_type := make([]string, sz) period_unit := make([]string, sz) tags := make([][]tuple, sz) @@ -78,6 +98,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { tmp, _ = m.Get("service_name") service_name[i] = tmp.AsString() + sample_types, _ := m.Get("sample_types") + sample_units, _ := m.Get("sample_units") + + sample_types_array, err := valueToStringArray(sample_types) + if err != nil { + return err + } + + sample_units_array, err := valueToStringArray(sample_units) + if err != nil { + return err + } + + sample_types_units_item := make([]tuple, len(sample_types_array)) + for i, v := range sample_types_array { + sample_types_units_item[i] = tuple{v, sample_units_array[i]} + } + sample_types_units[i] = sample_types_units_item + tmp, _ = m.Get("period_type") period_type[i] = tmp.AsString() @@ -112,22 +151,25 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { if err := b.Column(2).Append(service_name); err != nil { return err } - if err := b.Column(3).Append(period_type); err != nil { + if err := b.Column(3).Append(sample_types_units); err != nil { + return err + } + if err := b.Column(4).Append(period_type); err != nil { return err } - if err := b.Column(4).Append(period_unit); err != nil { + if err := b.Column(5).Append(period_unit); err != nil { return err } - if err := b.Column(5).Append(tags); err != nil { + if err := b.Column(6).Append(tags); err != nil { return err } - if err := b.Column(6).Append(duration_ns); err != nil { + if err := b.Column(7).Append(duration_ns); err != nil { return err } - if err := b.Column(7).Append(payload_type); err != nil { + if err := b.Column(8).Append(payload_type); err != nil { return err } - if err := b.Column(8).Append(payload); err != nil { + if err := b.Column(9).Append(payload); err != nil { return err } return b.Send() diff --git a/receiver/pyroscopereceiver/README.md b/receiver/pyroscopereceiver/README.md index 6a5b6ab..7a20998 100644 --- a/receiver/pyroscopereceiver/README.md +++ b/receiver/pyroscopereceiver/README.md @@ -11,7 +11,8 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op - `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6. - `timeout`: sets the server reponse timeout. Default is 10 seconds. -- `request_body_size_expected_value`: sets the expected decompressed request body size in bytes to size pipeline buffers and optimize allocations. Default is 0. +- `request_body_uncompressed_size_bytes`: sets the expected value for uncompressed request body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0. +- `parsed_body_uncompressed_size_bytes`: sets the expected value for uncompressed parsed body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0. ## Example diff --git a/receiver/pyroscopereceiver/buf/prepare.go b/receiver/pyroscopereceiver/buf/prepare.go new file mode 100644 index 0000000..ad5f829 --- /dev/null +++ b/receiver/pyroscopereceiver/buf/prepare.go @@ -0,0 +1,12 @@ +package buf + +import "bytes" + +// Pre-allocates a buffer based on heuristics to minimize resize +func PrepareBuffer(uncompressedSizeBytes int64) *bytes.Buffer { + var buf bytes.Buffer + // extra space to try avoid realloc where expected size fits enough + // TODO: try internal simple statistical model to pre-allocate a buffer + buf.Grow(int(uncompressedSizeBytes) + bytes.MinRead) + return &buf +} diff --git a/receiver/pyroscopereceiver/compress/compress.go b/receiver/pyroscopereceiver/compress/compress.go index 1f9594c..9106559 100644 --- a/receiver/pyroscopereceiver/compress/compress.go +++ b/receiver/pyroscopereceiver/compress/compress.go @@ -5,6 +5,8 @@ import ( "compress/gzip" "fmt" "io" + + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf" ) type codec uint8 @@ -15,16 +17,16 @@ const ( // Decodes compressed streams type Decompressor struct { - decompressedSizeBytesExpectedValue int64 - maxDecompressedSizeBytes int64 - decoders map[codec]func(body io.Reader) (io.Reader, error) + uncompressedSizeBytes int64 + maxUncompressedSizeBytes int64 + decoders map[codec]func(body io.Reader) (io.Reader, error) } // Creates a new decompressor -func NewDecompressor(decompressedSizeBytesExpectedValue int64, maxDecompressedSizeBytes int64) *Decompressor { +func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64) *Decompressor { return &Decompressor{ - decompressedSizeBytesExpectedValue: decompressedSizeBytesExpectedValue, - maxDecompressedSizeBytes: maxDecompressedSizeBytes, + uncompressedSizeBytes: uncompressedSizeBytes, + maxUncompressedSizeBytes: maxUncompressedSizeBytes, decoders: map[codec]func(r io.Reader) (io.Reader, error){ Gzip: func(r io.Reader) (io.Reader, error) { gr, err := gzip.NewReader(r) @@ -38,10 +40,10 @@ func NewDecompressor(decompressedSizeBytesExpectedValue int64, maxDecompressedSi } func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) { - buf := d.prepareBuffer() + buf := buf.PrepareBuffer(d.uncompressedSizeBytes) // read max+1 to validate size via a single Read() - lr := io.LimitReader(r, d.maxDecompressedSizeBytes+1) + lr := io.LimitReader(r, d.maxUncompressedSizeBytes+1) n, err := buf.ReadFrom(lr) if err != nil { @@ -50,8 +52,8 @@ func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) { if n < 1 { return nil, fmt.Errorf("empty profile") } - if n > d.maxDecompressedSizeBytes { - return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxDecompressedSizeBytes) + if n > d.maxUncompressedSizeBytes { + return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes) } return buf, nil } @@ -70,12 +72,3 @@ func (d *Decompressor) Decompress(r io.Reader, c codec) (*bytes.Buffer, error) { return d.readBytes(dr) } - -// Pre-allocates a buffer based on heuristics to minimize resize -func (d *Decompressor) prepareBuffer() *bytes.Buffer { - var buf bytes.Buffer - // extra space to try avoid realloc where expected size fits enough - // TODO: try simple statistical model to pre-allocate a buffer - buf.Grow(int(d.decompressedSizeBytesExpectedValue) + bytes.MinRead) - return &buf -} diff --git a/receiver/pyroscopereceiver/config.go b/receiver/pyroscopereceiver/config.go index 66f051b..e54081a 100644 --- a/receiver/pyroscopereceiver/config.go +++ b/receiver/pyroscopereceiver/config.go @@ -10,7 +10,7 @@ import ( // Configures supported protocols type Protocols struct { - // Http.MaxRequestBodySize configures max decompressed body size in bytes + // Http.MaxRequestBodySize configures max uncompressed body size in bytes Http *confighttp.HTTPServerSettings `mapstructure:"http"` } @@ -20,8 +20,12 @@ type Config struct { // Cofigures timeout for synchronous request handling by the receiver server Timeout time.Duration `mapstructure:"timeout"` - // Configures expected decompressed request body size in bytes to size pipeline buffers - DecompressedRequestBodySizeBytesExpectedValue int64 `mapstructure:"request_body_size_expected_value"` + // Configures the expected value for uncompressed request body size in bytes to size pipeline buffers + // and optimize allocations based on exported metrics + RequestBodyUncompressedSizeBytes int64 `mapstructure:"request_body_uncompressed_size_bytes"` + // Configures the expected value for uncompressed parsed body size in bytes to size pipeline buffers + // and optimize allocations based on exported metrics + ParsedBodyUncompressedSizeBytes int64 `mapstructure:"parsed_body_uncompressed_size_bytes"` } var _ component.Config = (*Config)(nil) @@ -34,11 +38,14 @@ func (cfg *Config) Validate() error { if cfg.Protocols.Http.MaxRequestBodySize < 1 { return fmt.Errorf("max_request_body_size must be positive") } - if cfg.DecompressedRequestBodySizeBytesExpectedValue < 0 { - return fmt.Errorf("request_body_size_expected_value must be positive") + if cfg.RequestBodyUncompressedSizeBytes < 0 { + return fmt.Errorf("request_body_uncompressed_size_bytes must be positive") } - if cfg.DecompressedRequestBodySizeBytesExpectedValue > cfg.Protocols.Http.MaxRequestBodySize { + if cfg.RequestBodyUncompressedSizeBytes > cfg.Protocols.Http.MaxRequestBodySize { return fmt.Errorf("expected value cannot be greater than max") } + if cfg.ParsedBodyUncompressedSizeBytes < 0 { + return fmt.Errorf("parsed_body_uncompressed_size_bytes must be positive") + } return nil } diff --git a/receiver/pyroscopereceiver/factory.go b/receiver/pyroscopereceiver/factory.go index e6813a2..d698fdf 100644 --- a/receiver/pyroscopereceiver/factory.go +++ b/receiver/pyroscopereceiver/factory.go @@ -16,7 +16,8 @@ const ( defaultHttpAddr = "0.0.0.0:8062" defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata defaultTimeout = 10 * time.Second - defaultDecompressedRequestBodySizeBytesExpectedValue = 0 + defaultRequestBodyUncompressedSizeBytesExpectedValue = 0 + defaultParsedBodyUncompressedSizeBytesExpectedValue = 0 ) func createDefaultConfig() component.Config { @@ -27,8 +28,9 @@ func createDefaultConfig() component.Config { MaxRequestBodySize: defaultMaxRequestBodySize, }, }, - Timeout: defaultTimeout, - DecompressedRequestBodySizeBytesExpectedValue: defaultDecompressedRequestBodySizeBytesExpectedValue, + Timeout: defaultTimeout, + RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue, + ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue, } } diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index e663ab9..0db1ed7 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -8,6 +8,7 @@ import ( pprof_proto "github.com/google/pprof/profile" jfr_parser "github.com/grafana/jfr-parser/parser" jfr_types "github.com/grafana/jfr-parser/parser/types" + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf" profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types" ) @@ -54,7 +55,7 @@ func NewJfrPprofParser() *jfrPprofParser { } // Parses the jfr buffer into pprof. The returned slice may be empty without an error. -func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, maxDecompressedSizeBytes int64) ([]profile_types.ProfileIR, error) { +func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) { var ( period int64 event string @@ -114,7 +115,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, ma for _, pr := range pa.proftab { if nil != pr { // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof - pr.prof.Payload = &bytes.Buffer{} // TODO: try simple statistical model to pre-allocate a buffer + pr.prof.Payload = buf.PrepareBuffer(parsedBodyUncompressedSizeBytes) pr.pprof.WriteUncompressed(pr.prof.Payload) ps = append(ps, pr.prof) } diff --git a/receiver/pyroscopereceiver/metrics.go b/receiver/pyroscopereceiver/metrics.go index 2637dfd..d68f41a 100644 --- a/receiver/pyroscopereceiver/metrics.go +++ b/receiver/pyroscopereceiver/metrics.go @@ -9,10 +9,10 @@ import ( const prefix = "receiver_pyroscope_" var ( - otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter - otelcolReceiverPyroscopeReceivedPayloadSizeBytes metric.Int64Histogram - otelcolReceiverPyroscopeParsedPayloadSizeBytes metric.Int64Histogram - otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram + otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter + otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes metric.Int64Histogram + otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram + otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram ) func initMetrics(meter metric.Meter) error { @@ -23,15 +23,15 @@ func initMetrics(meter metric.Meter) error { ); err != nil { return err } - if otelcolReceiverPyroscopeReceivedPayloadSizeBytes, err = meter.Int64Histogram( - fmt.Sprint(prefix, "received_payload_size_bytes"), - metric.WithDescription("Pyroscope receiver received payload size in bytes"), + if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram( + fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"), + metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"), ); err != nil { return err } - if otelcolReceiverPyroscopeParsedPayloadSizeBytes, err = meter.Int64Histogram( - fmt.Sprint(prefix, "parsed_payload_size_bytes"), - metric.WithDescription("Pyroscope receiver parsed payload size in bytes"), + if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram( + fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"), + metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"), ); err != nil { return err } diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 95ffdb4..912c0aa 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -60,7 +60,7 @@ type pyroscopeReceiver struct { type parser interface { // Parses the given input buffer into the collector's profile IR - Parse(buf *bytes.Buffer, md profile_types.Metadata, maxDecompressedSizeBytes int64) ([]profile_types.ProfileIR, error) + Parse(buf *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) } type params struct { @@ -78,7 +78,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Cre meter: set.MeterProvider.Meter(typeStr), next: consumer, } - recv.decompressor = compress.NewDecompressor(recv.cfg.DecompressedRequestBodySizeBytesExpectedValue, recv.cfg.Protocols.Http.MaxRequestBodySize) + recv.decompressor = compress.NewDecompressor(recv.cfg.RequestBodyUncompressedSizeBytes, recv.cfg.Protocols.Http.MaxRequestBodySize) recv.httpMux = http.NewServeMux() recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) { recv.httpHandlerIngest(resp, req) @@ -248,7 +248,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque return logs, fmt.Errorf("failed to decompress body: %w", err) } // TODO: try measure compressed size - otelcolReceiverPyroscopeReceivedPayloadSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, ""))) + otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, ""))) resetHeaders(req) md := profile_types.Metadata{SampleRateHertz: 0} @@ -261,7 +261,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque md.SampleRateHertz = hz } - ps, err := pa.Parse(buf, md, recv.cfg.Protocols.Http.MaxRequestBodySize) + ps, err := pa.Parse(buf, md, recv.cfg.ParsedBodyUncompressedSizeBytes) if err != nil { return logs, fmt.Errorf("failed to parse pprof: %w", err) } @@ -278,12 +278,15 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque for _, l := range pm.labels { tm.PutStr(l.Name, l.Value) } - setAttrsFromProfile(pr, m) + err = setAttrsFromProfile(pr, m) + if err != nil { + return logs, fmt.Errorf("failed to parse sample types: %v", err) + } r.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes()) sz += pr.Payload.Len() } // sz may be 0 and it will be recorded - otelcolReceiverPyroscopeParsedPayloadSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, ""))) + otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, ""))) return logs, nil } @@ -324,16 +327,35 @@ func resetHeaders(req *http.Request) { // reset content-type for the new binary jfr body req.Header.Set("Content-Type", "application/octet-stream") // multipart content-types cannot have encodings so no need to Del() Content-Encoding - // reset "Content-Length" to -1 as the size of the decompressed body is unknown + // reset "Content-Length" to -1 as the size of the uncompressed body is unknown req.Header.Del("Content-Length") req.ContentLength = -1 } -func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) { +func stringToAnyArray(s []string) []any { + res := make([]any, len(s)) + for i, v := range s { + res[i] = v + } + return res +} + +func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { m.PutStr("type", prof.Type.Type) + s := m.PutEmptySlice("sample_types") + err := s.FromRaw(stringToAnyArray(prof.Type.SampleType)) + if err != nil { + return err + } + s = m.PutEmptySlice("sample_units") + err = s.FromRaw(stringToAnyArray(prof.Type.SampleUnit)) + if err != nil { + return err + } m.PutStr("period_type", prof.Type.PeriodType) m.PutStr("period_unit", prof.Type.PeriodUnit) m.PutStr("payload_type", fmt.Sprint(prof.PayloadType)) + return nil } // Starts a http server that receives profiles of supported protocols diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index 284fcb5..fa8697b 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -63,8 +63,9 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { MaxRequestBodySize: defaultMaxRequestBodySize, }, }, - Timeout: defaultTimeout, - DecompressedRequestBodySizeBytesExpectedValue: defaultDecompressedRequestBodySizeBytesExpectedValue, + Timeout: defaultTimeout, + RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue, + ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue, } sink := new(consumertest.LogsSink) set := receivertest.NewNopCreateSettings() @@ -148,6 +149,8 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) { "period_type": "cpu", "period_unit": "nanoseconds", "payload_type": "0", + "sample_types": []any{"cpu"}, + "sample_units": []any{"nanoseconds"}, }, body: pb, }}), @@ -183,6 +186,8 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "period_type": "space", "period_unit": "bytes", "payload_type": "0", + "sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"}, + "sample_units": []any{"count", "bytes"}, }, body: pbAllocInNewTlab, }, @@ -199,6 +204,8 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "period_type": "objects", "period_unit": "count", "payload_type": "0", + "sample_types": []any{"live"}, + "sample_units": []any{"count"}, }, body: pbLiveObject, },