From 4848077b0c5d1bcab17847a18d3d9c9b0d35bb9b Mon Sep 17 00:00:00 2001 From: Cluas Date: Thu, 30 May 2024 14:50:03 +0800 Subject: [PATCH] feat: support pyroscope Push --- go.mod | 4 +- go.sum | 5 + receiver/pyroscopereceiver/config.go | 6 +- receiver/pyroscopereceiver/factory.go | 2 +- receiver/pyroscopereceiver/receiver.go | 289 +++++++++++++++----- receiver/pyroscopereceiver/receiver_test.go | 47 ++-- 6 files changed, 259 insertions(+), 94 deletions(-) diff --git a/go.mod b/go.mod index 514189e..d2b8ed7 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,15 @@ module github.com/metrico/otel-collector go 1.21 require ( + connectrpc.com/connect v1.16.2 github.com/ClickHouse/clickhouse-go/v2 v2.17.1 github.com/KimMachineGun/automemlimit v0.5.0 github.com/go-faster/city v1.0.1 github.com/go-logfmt/logfmt v0.6.0 github.com/google/pprof v0.0.0-20240320155624-b11c3daa6f07 + github.com/gorilla/mux v1.8.1 github.com/grafana/jfr-parser v0.8.0 + github.com/grafana/pyroscope/api v0.4.0 github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.98.0 github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector v0.98.0 github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector v0.98.0 @@ -368,7 +371,6 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.3 // indirect github.com/gophercloud/gophercloud v1.8.0 // indirect - github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/gosnmp/gosnmp v1.37.0 // indirect github.com/grafana/loki/pkg/push v0.0.0-20231127162423-bd505f8e2d37 // indirect diff --git a/go.sum b/go.sum index 5e7bc7c..1d6a7a0 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,8 @@ code.cloudfoundry.org/go-loggregator v7.4.0+incompatible h1:KqZYloMQWM5Zg/BQKunO code.cloudfoundry.org/go-loggregator v7.4.0+incompatible/go.mod h1:KPBTRqj+y738Nhf1+g4JHFaBU8j7dedirR5ETNHvMXU= code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 h1:mrZQaZmuDIPhSp6b96b+CRKC2uH44ifa5cjDV2epKis= code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78/go.mod h1:tkZo8GtzBjySJ7USvxm4E36lNQw1D3xM6oKHGqdaAJ4= +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -716,6 +718,9 @@ github.com/grafana/jfr-parser v0.8.0 h1:/uo2wZNXrxw7tKLFwP2omJ3EQGMkD9wzhPsRogVo github.com/grafana/jfr-parser v0.8.0/go.mod h1:M5u1ux34Qo47ZBWksbMYVk40s7dvU3WMVYpxweEu4R0= github.com/grafana/loki/pkg/push v0.0.0-20231127162423-bd505f8e2d37 h1:w59bmBeLOk4enGtyX4kTBNY3FCw/nwDTYUqcjC4vKhg= github.com/grafana/loki/pkg/push v0.0.0-20231127162423-bd505f8e2d37/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0= +github.com/grafana/pyroscope v1.5.0/go.mod h1:rg53VGcqOf3FawAcAUpkcNNF7+gV1VZFbZY9Gfxry+c= +github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo= +github.com/grafana/pyroscope/api v0.4.0/go.mod h1:MFnZNeUM4RDsDOnbgKW3GWoLSBpLzMMT9nkvhHHo81o= github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd h1:PpuIBO5P3e9hpqBD0O/HjhShYuM6XE0i/lbE6J94kww= github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= diff --git a/receiver/pyroscopereceiver/config.go b/receiver/pyroscopereceiver/config.go index d7184b8..988a755 100644 --- a/receiver/pyroscopereceiver/config.go +++ b/receiver/pyroscopereceiver/config.go @@ -10,8 +10,8 @@ import ( // Configures supported protocols type Protocols struct { - // Http.MaxRequestBodySize configures max uncompressed body size in bytes - Http *confighttp.ServerConfig `mapstructure:"http"` + // HTTP.MaxRequestBodySize configures max uncompressed body size in bytes + HTTP *confighttp.ServerConfig `mapstructure:"http"` } // Represents the receiver config within the collector's config.yaml @@ -29,7 +29,7 @@ func (cfg *Config) Validate() error { if cfg.Timeout <= 0 { return fmt.Errorf("timeout must be positive") } - if cfg.Protocols.Http.MaxRequestBodySize < 1 { + if cfg.Protocols.HTTP.MaxRequestBodySize < 1 { return fmt.Errorf("max_request_body_size must be positive") } return nil diff --git a/receiver/pyroscopereceiver/factory.go b/receiver/pyroscopereceiver/factory.go index b7f1b63..f2b887c 100644 --- a/receiver/pyroscopereceiver/factory.go +++ b/receiver/pyroscopereceiver/factory.go @@ -22,7 +22,7 @@ const ( func createDefaultConfig() component.Config { return &Config{ Protocols: Protocols{ - Http: &confighttp.ServerConfig{ + HTTP: &confighttp.ServerConfig{ Endpoint: defaultHttpAddr, MaxRequestBodySize: defaultMaxRequestBodySize, }, diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 75ec3eb..56ad23c 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -2,9 +2,11 @@ package pyroscopereceiver import ( "bytes" + "compress/gzip" "context" "errors" "fmt" + "io" "log" "mime/multipart" "net" @@ -14,6 +16,10 @@ import ( "strings" "sync" + "connectrpc.com/connect" + mux "github.com/gorilla/mux" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" + pushv1connect "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/nodeparser" @@ -44,23 +50,45 @@ const ( keyStart ctxkey = "start_time" ) +var ( + gzipReaderPool = sync.Pool{ + New: func() any { + return &gzipReader{ + reader: bytes.NewReader(nil), + } + }, + } + gzipWriterPool = sync.Pool{ + New: func() any { + return gzip.NewWriter(io.Discard) + }, + } + bufPool = sync.Pool{ + New: func() any { + return bytes.NewBuffer(nil) + }, + } +) + // avoids context key collision, need public getter/setter because should be propagated to other packages type ctxkey string type pyroscopeReceiver struct { - cfg *Config - set *receiver.CreateSettings - logger *zap.Logger - meter metric.Meter - next consumer.Logs - host component.Host - - httpMux *http.ServeMux + cfg *Config + setting *receiver.CreateSettings + logger *zap.Logger + meter metric.Meter + next consumer.Logs + host component.Host + + mux *mux.Router decompressor *compress.Decompressor httpServer *http.Server shutdownWg sync.WaitGroup uncompressedBufPool *sync.Pool + + pushv1connect.UnimplementedPusherServiceHandler } type parser interface { @@ -76,24 +104,24 @@ type params struct { } func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.CreateSettings) (*pyroscopeReceiver, error) { - recv := &pyroscopeReceiver{ + r := &pyroscopeReceiver{ cfg: cfg, - set: set, + setting: set, logger: set.Logger, meter: set.MeterProvider.Meter(typeStr), next: consumer, uncompressedBufPool: &sync.Pool{}, } - recv.decompressor = compress.NewDecompressor(recv.cfg.Protocols.Http.MaxRequestBodySize) - recv.httpMux = http.NewServeMux() - recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) { - recv.httpHandlerIngest(resp, req) + r.decompressor = compress.NewDecompressor(r.cfg.Protocols.HTTP.MaxRequestBodySize) + r.mux = mux.NewRouter() + r.mux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) { + r.httpHandlerIngest(resp, req) }) - if err := initMetrics(recv.meter); err != nil { - recv.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) - return recv, err + if err := initMetrics(r.meter); err != nil { + r.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) + return r, err } - return recv, nil + return r, nil } // TODO: rate limit clients @@ -111,7 +139,7 @@ func (recv *pyroscopeReceiver) httpHandlerIngest(resp http.ResponseWriter, req * } } -func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter, req *http.Request) <-chan struct{} { +func (r *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter, req *http.Request) <-chan struct{} { c := make(chan struct{}) go func() { // signal completion event @@ -120,18 +148,18 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri qs := req.URL.Query() pm, err := readParams(&qs) if err != nil { - recv.handleError(ctx, resp, "text/plain", http.StatusBadRequest, "bad url query", "", errorCodeError) + r.handleError(ctx, resp, "text/plain", http.StatusBadRequest, "bad url query", "", errorCodeError) return } if req.Method != http.MethodPost { - recv.handleError(ctx, resp, "text/plain", http.StatusMethodNotAllowed, fmt.Sprintf("method not allowed, supported: [%s]", http.MethodPost), pm.name, errorCodeError) + r.handleError(ctx, resp, "text/plain", http.StatusMethodNotAllowed, fmt.Sprintf("method not allowed, supported: [%s]", http.MethodPost), pm.name, errorCodeError) return } - pl, err := recv.readProfiles(ctx, req, pm) + pl, err := r.readProfiles(ctx, req, pm) if err != nil { - recv.handleError(ctx, resp, "text/plain", http.StatusBadRequest, err.Error(), pm.name, errorCodeError) + r.handleError(ctx, resp, "text/plain", http.StatusBadRequest, err.Error(), pm.name, errorCodeError) return } @@ -143,9 +171,9 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri // delegate to next consumer in the pipeline // TODO: support memorylimiter processor, apply retry policy on "oom" event, depends on https://github.com/open-telemetry/opentelemetry-collector/issues/9196 - err = recv.next.ConsumeLogs(ctx, pl) + err = r.next.ConsumeLogs(ctx, pl) if err != nil { - recv.handleError(ctx, resp, "text/plain", http.StatusInternalServerError, err.Error(), pm.name, errorCodeError) + r.handleError(ctx, resp, "text/plain", http.StatusInternalServerError, err.Error(), pm.name, errorCodeError) return } @@ -234,37 +262,98 @@ func releaseBuf(p *sync.Pool, buf *bytes.Buffer) { p.Put(buf) } -func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) { +func (r *pyroscopeReceiver) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) { + logs := plog.NewLogs() + sz := 0 + rs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for _, serie := range req.Msg.Series { + serviceName := "" + // find serviceName from label + for _, label := range serie.Labels { + if label.Name == "service_name" { + serviceName = label.Value + break + } + } + for i, sample := range serie.Samples { + err := fromBytes(sample.RawProfile, func(p *profile_types.ProfileIR) error { + record := rs.AppendEmpty() + record.SetTimestamp(pcommon.Timestamp(p.Profile.TimeNanos)) + m := record.Attributes() + m.PutStr("duration_ns", fmt.Sprint(p.DurationNano)) + m.PutStr("service_name", serviceName) + tm := m.PutEmptyMap("tags") + for _, l := range serie.Labels { + tm.PutStr(l.Name, l.Value) + } + err := setAttrsFromProfile(*p, m) + if err != nil { + return fmt.Errorf("failed to parse sample types: %v", err) + } + postProcessProf(p.Profile, &m) + record.Body().SetEmptyBytes().FromRaw(sample.RawProfile) + sz += p.Payload.Len() + r.logger.Debug( + fmt.Sprintf("parsed profile %d", i), + zap.Int64("timestamp_ns", p.Profile.DurationNanos), + zap.String("type", p.Type.Type), + zap.String("service_name", serviceName), + zap.String("period_type", p.Type.PeriodType), + zap.String("period_unit", p.Type.PeriodUnit), + zap.String("sample_types", strings.Join(p.Type.SampleType, ",")), + zap.String("sample_units", strings.Join(p.Type.SampleUnit, ",")), + zap.Uint8("payload_type", uint8(p.PayloadType)), + ) + return nil + }) + if err != nil { + return nil, err + } + } + } + + // if no profiles have been parsed, dont error but return + if logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() == 0 { + return nil, errors.New("no profiles have been parsed") + } + err := r.next.ConsumeLogs(ctx, logs) + if err != nil { + return nil, err + } + + return connect.NewResponse(&pushv1.PushResponse{}), nil +} + +func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) { var ( tmp []string ok bool - pa parser + p parser ) logs := plog.NewLogs() - recv.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery)) + r.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery)) qs := req.URL.Query() if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") { - pa = jfrparser.NewJfrPprofParser() + p = jfrparser.NewJfrPprofParser() } else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") { - pa = nodeparser.NewNodePprofParser() + p = nodeparser.NewNodePprofParser() } else { - pa = pprofparser.NewPprofParser() - + p = pprofparser.NewPprofParser() } // support only multipart/form-data - f, err := recv.openMultipart(req) + f, err := r.openMultipart(req) if err != nil { return logs, err } defer f.Close() - buf := acquireBuf(recv.uncompressedBufPool) + buf := acquireBuf(r.uncompressedBufPool) defer func() { - releaseBuf(recv.uncompressedBufPool, buf) + releaseBuf(r.uncompressedBufPool, buf) }() - err = recv.decompressor.Decompress(f, compress.Gzip, buf) + err = r.decompressor.Decompress(f, compress.Gzip, buf) if err != nil { return logs, fmt.Errorf("failed to decompress body: %w", err) } @@ -282,7 +371,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque md.SampleRateHertz = hz } - ps, err := pa.Parse(buf, md) + ps, err := p.Parse(buf, md) if err != nil { return logs, fmt.Errorf("failed to parse pprof: %w", err) } @@ -292,7 +381,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque for i, pr := range ps { var timestampNs uint64 var durationNs uint64 - r := rs.AppendEmpty() + record := rs.AppendEmpty() if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") { timestampNs = ns(pm.start) durationNs = pm.end - pm.start @@ -305,8 +394,8 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque durationNs = pm.end - pm.start durationNs = ns(durationNs) } - r.SetTimestamp(pcommon.Timestamp(timestampNs)) - m := r.Attributes() + record.SetTimestamp(pcommon.Timestamp(timestampNs)) + m := record.Attributes() m.PutStr("duration_ns", fmt.Sprint(durationNs)) m.PutStr("service_name", pm.name) tm := m.PutEmptyMap("tags") @@ -318,9 +407,9 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque return logs, fmt.Errorf("failed to parse sample types: %v", err) } postProcessProf(pr.Profile, &m) - r.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes()) + record.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes()) sz += pr.Payload.Len() - recv.logger.Debug( + r.logger.Debug( fmt.Sprintf("parsed profile %d", i), zap.Uint64("timestamp_ns", timestampNs), zap.String("type", pr.Type.Type), @@ -346,8 +435,8 @@ func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding stri return &s } -func (recv *pyroscopeReceiver) openMultipart(req *http.Request) (multipart.File, error) { - if err := req.ParseMultipartForm(recv.cfg.Protocols.Http.MaxRequestBodySize); err != nil { +func (r *pyroscopeReceiver) openMultipart(req *http.Request) (multipart.File, error) { + if err := req.ParseMultipartForm(r.cfg.Protocols.HTTP.MaxRequestBodySize); err != nil { return nil, fmt.Errorf("failed to parse multipart request: %w", err) } mf := req.MultipartForm @@ -392,6 +481,7 @@ func stringToAnyArray(s []string) []any { } return res } + func entitiesToStrings(entities []profile_types.SampleType) []any { var result []any for _, entity := range entities { @@ -403,7 +493,6 @@ func entitiesToStrings(entities []profile_types.SampleType) []any { } func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { - m.PutStr("type", prof.Type.Type) s := m.PutEmptySlice("sample_types") @@ -433,37 +522,42 @@ func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { } // Starts a http server that receives profiles of supported protocols -func (recv *pyroscopeReceiver) Start(_ context.Context, host component.Host) error { - recv.host = host +func (r *pyroscopeReceiver) Start(ctx context.Context, host component.Host) error { + r.host = host var err error - // applies an interceptor that enforces the configured request body limit - if recv.httpServer, err = recv.cfg.Protocols.Http.ToServer(host, recv.set.TelemetrySettings, recv.httpMux); err != nil { - return fmt.Errorf("failed to create http server: %w", err) - } + if r.cfg.Protocols.HTTP.Endpoint != "" { + // applies an interceptor that enforces the configured request body limit + if r.httpServer, err = r.cfg.Protocols.HTTP.ToServerContext(ctx, host, r.setting.TelemetrySettings, r.mux); err != nil { + return fmt.Errorf("failed to create http server: %w", err) + } - recv.logger.Info("server listening on", zap.String("endpoint", recv.cfg.Protocols.Http.Endpoint)) - var l net.Listener - if l, err = recv.cfg.Protocols.Http.ToListener(); err != nil { - return fmt.Errorf("failed to create tcp listener: %w", err) + var l net.Listener + if l, err = r.cfg.Protocols.HTTP.ToListenerContext(ctx); err != nil { + return fmt.Errorf("failed to create tcp listener: %w", err) + } + pushv1connect.RegisterPusherServiceHandler(r.mux, r) + r.shutdownWg.Add(1) + r.setting.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.Protocols.HTTP.Endpoint)) + go func() { + defer r.shutdownWg.Done() + if err := r.httpServer.Serve(l); !errors.Is(err, http.ErrServerClosed) && err != nil { + log.Fatalf("HTTP server failed: %v", err) + } + }() } - recv.shutdownWg.Add(1) - go func() { - defer recv.shutdownWg.Done() - if err := recv.httpServer.Serve(l); !errors.Is(err, http.ErrServerClosed) && err != nil { - log.Fatalf("HTTP server failed: %v", err) - } - }() return nil } // Shuts down the receiver, by shutting down the server -func (recv *pyroscopeReceiver) Shutdown(ctx context.Context) error { - if err := recv.httpServer.Shutdown(ctx); err != nil { - return fmt.Errorf("failed to shutdown: %w", err) +func (r *pyroscopeReceiver) Shutdown(ctx context.Context) error { + if r.httpServer != nil { + if err := r.httpServer.Shutdown(ctx); err != nil { + return fmt.Errorf("failed to shutdown: %w", err) + } } - recv.shutdownWg.Wait() + r.shutdownWg.Wait() return nil } @@ -480,3 +574,66 @@ func writeResponse(w http.ResponseWriter, contentType string, statusCode int, pa _, _ = w.Write(payload) } } + +type gzipReader struct { + gzip *gzip.Reader + reader *bytes.Reader +} + +// open gzip, create reader if required +func (r *gzipReader) gzipOpen() error { + var err error + if r.gzip == nil { + r.gzip, err = gzip.NewReader(r.reader) + } else { + err = r.gzip.Reset(r.reader) + } + return err +} + +func (r *gzipReader) openBytes(input []byte) (io.Reader, error) { + r.reader.Reset(input) + + // handle if data is not gzipped at all + if err := r.gzipOpen(); err == gzip.ErrHeader { + r.reader.Reset(input) + return r.reader, nil + } else if err != nil { + return nil, err + } + + return r.gzip, nil +} + +func fromBytes(input []byte, fn func(*profile_types.ProfileIR) error) error { + p, err := rawFromBytes(input) + if err != nil { + return err + } + return fn(p) +} + +func rawFromBytes(input []byte) (*profile_types.ProfileIR, error) { + gzipReader := gzipReaderPool.Get().(*gzipReader) + buf := bufPool.Get().(*bytes.Buffer) + defer func() { + gzipReaderPool.Put(gzipReader) + buf.Reset() + bufPool.Put(buf) + }() + + r, err := gzipReader.openBytes(input) + if err != nil { + return nil, err + } + + if _, err = io.Copy(buf, r); err != nil { + return nil, fmt.Errorf("copy to buffer: %w", err) + } + pbp, err := pprofparser.NewPprofParser().Parse(buf, profile_types.Metadata{}) + if err != nil { + return nil, fmt.Errorf("failed to parse profile: %w", err) + } + + return &pbp[0], nil +} diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index e69c708..d9223e8 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -65,7 +65,7 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { addr := getAvailableLocalTcpPort(t) cfg := &Config{ Protocols: Protocols{ - Http: &confighttp.ServerConfig{ + HTTP: &confighttp.ServerConfig{ Endpoint: addr, MaxRequestBodySize: defaultMaxRequestBodySize, }, @@ -137,30 +137,31 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "format": "jfr", }, filename: filepath.Join("testdata", "memory_alloc_live_example.jfr"), - expected: gen([]profileLog{{ - timestamp: 1700332322000000000, - attrs: map[string]any{ - "service_name": "com.example.App", - "tags": map[string]any{ - "dc": "us-east-1", - "kubernetes_pod_name": "app-abcd1234", - }, - "duration_ns": "7000000000", - "type": "memory", - "period_type": "space", - "period_unit": "bytes", - "payload_type": "0", - "sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"}, - "sample_units": []any{"count", "bytes"}, - "values_agg": []any{ - []any{"alloc_in_new_tlab_objects:count", 977, 471}, - []any{"alloc_in_new_tlab_bytes:bytes", 512229376, 471}, + expected: gen([]profileLog{ + { + timestamp: 1700332322000000000, + attrs: map[string]any{ + "service_name": "com.example.App", + "tags": map[string]any{ + "dc": "us-east-1", + "kubernetes_pod_name": "app-abcd1234", + }, + "duration_ns": "7000000000", + "type": "memory", + "period_type": "space", + "period_unit": "bytes", + "payload_type": "0", + "sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"}, + "sample_units": []any{"count", "bytes"}, + "values_agg": []any{ + []any{"alloc_in_new_tlab_objects:count", 977, 471}, + []any{"alloc_in_new_tlab_bytes:bytes", 512229376, 471}, + }, + "tree": "16287638610851960464", + "functions": "14254943256614951927", }, - "tree": "16287638610851960464", - "functions": "14254943256614951927", + body: pbAllocInNewTlab, }, - body: pbAllocInNewTlab, - }, { timestamp: 1700332322000000000, attrs: map[string]any{