From cefc6befc0a41a075e488739ddbdc635e0c93114 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 11 Feb 2024 13:19:32 +0200 Subject: [PATCH 1/3] fix thread state parser for async-profiler 3.0 --- exporter/clickhouseprofileexporter/ch/access_native_columnar.go | 1 + receiver/pyroscopereceiver/jfrparser/parser.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index dcdb7c5..e600241 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -171,6 +171,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { zap.String(columnServiceName, service_name[i]), zap.String(columnPeriodType, period_type[i]), zap.String(columnPeriodUnit, period_unit[i]), + zap.Any(columnSampleTypesUnits, sample_types_units[i]), zap.String(columnPayloadType, payload_type[i]), ) } diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index c0d2eb4..3bf3c90 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -81,7 +81,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([ case pa.jfrParser.TypeMap.T_EXECUTION_SAMPLE: values[0] = 1 * int64(period) ts := pa.jfrParser.GetThreadState(pa.jfrParser.ExecutionSample.State) - if ts != nil && ts.Name == "STATE_RUNNABLE" { + if ts != nil && ts.Name != "STATE_SLEEPING" { pa.addStacktrace(sampleTypeCpu, pa.jfrParser.ExecutionSample.StackTrace, values[:1]) } // TODO: this code is from github/grafana/pyroscope, need to validate that the qryn.js query simulator handles this branch as expected for wall From 23220f23973346db1d3e67cb44911ddaa1c754f8 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 11 Feb 2024 17:39:15 +0200 Subject: [PATCH 2/3] export all logs --- .../ch/access_native_columnar.go | 207 +++++++++--------- .../clickhouseprofileexporter/exporter.go | 7 +- 2 files changed, 108 insertions(+), 106 deletions(-) diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index dcdb7c5..d3d4ef4 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -71,146 +71,147 @@ func valueToStringArray(v pcommon.Value) ([]string, error) { } // Inserts a profile batch into the clickhouse server using columnar native protocol -func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { +func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) { b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input") if err != nil { - return fmt.Errorf("failed to prepare batch: %w", err) - } + return 0, fmt.Errorf("failed to prepare batch: %w", err) + } + + // this implementation is tightly coupled to how pyroscope-java and pyroscopereceiver work + timestamp_ns := make([]uint64, 0) + typ := make([]string, 0) + service_name := make([]string, 0) + values_agg := make([][]tuple, 0) + sample_types_units := make([][]tuple, 0) + period_type := make([]string, 0) + period_unit := make([]string, 0) + tags := make([][]tuple, 0) + duration_ns := make([]uint64, 0) + payload_type := make([]string, 0) + payload := make([][]byte, 0) - // this implementation is tightly coupled to how pyroscope-java and pyroscopereciver work, - // specifically receiving a single profile at a time from the agent, - // and thus each batched resource logs slice contains a single log record rl := ls.ResourceLogs() - sz := rl.Len() - - timestamp_ns := make([]uint64, sz) - typ := make([]string, sz) - service_name := make([]string, sz) - values_agg := make([][]tuple, sz) - sample_types_units := make([][]tuple, sz) - period_type := make([]string, sz) - period_unit := make([]string, sz) - tags := make([][]tuple, sz) - duration_ns := make([]uint64, sz) - payload_type := make([]string, sz) - payload := make([][]byte, sz) - var ( - r plog.LogRecord - m pcommon.Map - tmp pcommon.Value - tm map[string]any + lr plog.LogRecordSlice + r plog.LogRecord + m pcommon.Map + tmp pcommon.Value + tm map[string]any + offset int + s int + idx int ) - for i := 0; i < sz; i++ { - r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0) - m = r.Attributes() - timestamp_ns[i] = uint64(r.Timestamp()) - - tmp, _ = m.Get(columnType) - typ[i] = tmp.AsString() - - tmp, _ = m.Get(columnServiceName) - service_name[i] = tmp.AsString() - - sample_types, _ := m.Get("sample_types") - - sample_units, _ := m.Get("sample_units") - - sample_types_array, err := valueToStringArray(sample_types) - if err != nil { - return err - } - - sample_units_array, err := valueToStringArray(sample_units) - if err != nil { - return err - } - - values_agg_raw, ok := m.Get(columnValuesAgg) - if ok { - values_agg_tuple, err := valueAggToTuple(&values_agg_raw) + for i := 0; i < rl.Len(); i++ { + lr = rl.At(i).ScopeLogs().At(0).LogRecords() + for s = 0; s < lr.Len(); s++ { + r = lr.At(s) + m = r.Attributes() + timestamp_ns = append(timestamp_ns, uint64(r.Timestamp())) + + tmp, _ = m.Get(columnType) + typ = append(typ, tmp.AsString()) + + tmp, _ = m.Get(columnServiceName) + service_name = append(service_name, tmp.AsString()) + + sample_types, _ := m.Get("sample_types") + sample_units, _ := m.Get("sample_units") + sample_types_array, err := valueToStringArray(sample_types) if err != nil { - return err + return 0, err } - values_agg[i] = append(values_agg[i], values_agg_tuple...) - } - - sample_types_units_item := make([]tuple, len(sample_types_array)) - for i, v := range sample_types_array { - - sample_types_units_item[i] = tuple{v, sample_units_array[i]} - } - sample_types_units[i] = sample_types_units_item - tmp, _ = m.Get(columnPeriodType) - period_type[i] = tmp.AsString() - - tmp, _ = m.Get(columnPeriodUnit) - period_unit[i] = tmp.AsString() - - tmp, _ = m.Get(columnTags) - tm = tmp.Map().AsRaw() - tag, j := make([]tuple, len(tm)), 0 - for k, v := range tm { - tag[j] = tuple{k, v.(string)} - j++ - } - tags[i] = tag - - tmp, _ = m.Get(columnDurationNs) - duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64) + sample_units_array, err := valueToStringArray(sample_units) + if err != nil { + return 0, err + } + values_agg_raw, ok := m.Get(columnValuesAgg) + if ok { + values_agg_tuple, err := valueAggToTuple(&values_agg_raw) + if err != nil { + return 0, err + } + values_agg = append(values_agg, values_agg_tuple) + } + sample_types_units_item := make([]tuple, len(sample_types_array)) + for i, v := range sample_types_array { + sample_types_units_item[i] = tuple{v, sample_units_array[i]} + } + sample_types_units = append(sample_types_units, sample_types_units_item) - tmp, _ = m.Get(columnPayloadType) - payload_type[i] = tmp.AsString() + tmp, _ = m.Get(columnPeriodType) + period_type = append(period_type, tmp.AsString()) - payload[i] = r.Body().Bytes().AsRaw() + tmp, _ = m.Get(columnPeriodUnit) + period_unit = append(period_unit, tmp.AsString()) - ch.logger.Debug( - fmt.Sprintf("batch insert prepared row %d", i), - zap.Uint64(columnTimestampNs, timestamp_ns[i]), - zap.String(columnType, typ[i]), - zap.String(columnServiceName, service_name[i]), - zap.String(columnPeriodType, period_type[i]), - zap.String(columnPeriodUnit, period_unit[i]), - zap.String(columnPayloadType, payload_type[i]), - ) + tmp, _ = m.Get(columnTags) + tm = tmp.Map().AsRaw() + tag, j := make([]tuple, len(tm)), 0 + for k, v := range tm { + tag[j] = tuple{k, v.(string)} + j++ + } + tags = append(tags, tag) + + tmp, _ = m.Get(columnDurationNs) + dur, _ := strconv.ParseUint(tmp.Str(), 10, 64) + duration_ns = append(duration_ns, dur) + + tmp, _ = m.Get(columnPayloadType) + payload_type = append(payload_type, tmp.AsString()) + + payload = append(payload, r.Body().Bytes().AsRaw()) + + idx = offset + s + ch.logger.Debug( + fmt.Sprintf("batch insert prepared row %d", idx), + zap.Uint64(columnTimestampNs, timestamp_ns[idx]), + zap.String(columnType, typ[idx]), + zap.String(columnServiceName, service_name[idx]), + zap.String(columnPeriodType, period_type[idx]), + zap.String(columnPeriodUnit, period_unit[idx]), + zap.String(columnPayloadType, payload_type[idx]), + ) + } + offset += s } // column order here should match table column order if err := b.Column(0).Append(timestamp_ns); err != nil { - return err + return 0, err } if err := b.Column(1).Append(typ); err != nil { - return err + return 0, err } if err := b.Column(2).Append(service_name); err != nil { - return err + return 0, err } if err := b.Column(3).Append(sample_types_units); err != nil { - return err + return 0, err } if err := b.Column(4).Append(period_type); err != nil { - return err + return 0, err } if err := b.Column(5).Append(period_unit); err != nil { - return err + return 0, err } if err := b.Column(6).Append(tags); err != nil { - return err + return 0, err } if err := b.Column(7).Append(duration_ns); err != nil { - return err + return 0, err } if err := b.Column(8).Append(payload_type); err != nil { - return err + return 0, err } if err := b.Column(9).Append(payload); err != nil { - return err + return 0, err } if err := b.Column(10).Append(values_agg); err != nil { - return err + return 0, err } - return b.Send() + return offset, b.Send() } // Closes the clickhouse connection pool diff --git a/exporter/clickhouseprofileexporter/exporter.go b/exporter/clickhouseprofileexporter/exporter.go index deae555..2a94430 100644 --- a/exporter/clickhouseprofileexporter/exporter.go +++ b/exporter/clickhouseprofileexporter/exporter.go @@ -30,7 +30,7 @@ type clickhouseProfileExporter struct { type clickhouseAccess interface { // Inserts a profile batch into the clickhouse server - InsertBatch(profiles plog.Logs) error + InsertBatch(profiles plog.Logs) (int, error) // Shuts down the clickhouse connection Shutdown() error @@ -63,13 +63,14 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti // Sends the profiles to clickhouse server using the configured connection func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error { start := time.Now().UnixMilli() - if err := exp.ch.InsertBatch(logs); err != nil { + sz, err := exp.ch.InsertBatch(logs) + if err != nil { otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) - exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len())) + exp.logger.Info("inserted batch", zap.Int("size", sz)) return nil } From 9c823a1afca71ca07bea009ef616f9490b694373 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 11 Feb 2024 18:02:07 +0200 Subject: [PATCH 3/3] debug samples --- exporter/clickhouseprofileexporter/ch/access_native_columnar.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index d3d4ef4..5243c09 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -170,6 +170,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) zap.String(columnServiceName, service_name[idx]), zap.String(columnPeriodType, period_type[idx]), zap.String(columnPeriodUnit, period_unit[idx]), + zap.Any(columnSampleTypesUnits, sample_types_units[idx]), zap.String(columnPayloadType, payload_type[idx]), ) }