Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/grafana profiles plugin #546

Merged
merged 8 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions patterns/patterns_bin/src/pattern_reg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::pattern::Pattern;
use uuid::Uuid;

pub struct PatternRegistry {
patterns: Vec<Pattern>,
}

impl PatternRegistry {
pub const fn new() -> PatternRegistry {
PatternRegistry { patterns: Vec::new() }
}

pub fn find_pattern(&mut self, str_text: &Vec<String>, i_text: &Vec<u64>, sample: String) -> &Pattern {
let mut idx: i32 = -1;
let mut mtc = 0;
for i in 0..self.patterns.len() {
mtc = self.patterns[i].match_text(&i_text);
if mtc == -1 || mtc > self.patterns[i].fluct {
continue;
}
idx = i as i32;
break;
}

if idx == -1 {
let pattern = Pattern::new(Uuid::new_v4().to_string(), &i_text, &str_text, sample);
self.patterns.push(pattern);
idx = (self.patterns.len() - 1) as i32;
} else if mtc != 0 {
self.patterns[idx as usize].adjust_pattern(&i_text);
}
return &self.patterns[idx as usize];
}

pub fn to_string(&self) -> String {
let mut s = String::new();
for i in 0..self.patterns.len() {
s += self.patterns[i].to_string().as_str();
s += "\n";
}
return s
}
}

pub static mut REGISTRY: PatternRegistry = PatternRegistry::new();
45 changes: 45 additions & 0 deletions patterns/patterns_bin/src/tokens.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use regex::{Regex, CaptureMatches, Match};

/*pub fn tokenize(re: &Regex, text: &str) -> CaptureMatches {
return re.captures_iter(text);
}*/

pub struct Tokenizer<'a> {
text: String,
pos: usize,
re: Regex,
iter: Option<CaptureMatches<'a, 'a>>
}

impl Tokenizer<'_> {
pub fn new<'a>(text: &'a str) -> Tokenizer<'a> {
let mut res = Tokenizer {
text: text.to_string(),
pos: 0,
re: Regex::new(r"([\p{L}_]+|[\d.]+|[^\p{L}_\d.]+)\s*").unwrap(),
iter: None
};
res
}
}

impl Iterator for Tokenizer<'_> {
type Item = String;

fn next(&mut self) -> Option<Self::Item> {
None
/*let cap: Option<Match> = None;
if let Some(c) = cap {
self.pos += c.get(0).unwrap().end();
Some(c.get(0).unwrap().as_str().to_string())
} else {
None
}*/
}
}

#[test]
fn test_tokenizer() {
let text = "Hello, world! 123";
let mut tokenizer = Tokenizer::new(text);
}
67 changes: 67 additions & 0 deletions pyroscope/flamebearer.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

type int64 = string;
type uint64 = string;
type units = string;

export interface Flamebearer {
version: number,
flamebearerProfileV1: flamebearerProfileV1
telemetry?: {[key: string]: any}
}

export interface flamebearerProfileV1 {
flamebearer: flamebearerV1,
metadata: flamebearerMetadataV1,
timeline: flamebearerTimelineV1,
groups: {[key: string]: flamebearerTimelineV1}
heatmap: heatmap,
leftTicks: string,
rightTicks: string,
}

export interface flamebearerV1 {
names: string,
levels: [[number]],
numTicks: number,
maxSelf: number
}

export interface flamebearerMetadataV1 {
format: string,
spyName: string,
sampleRate: number,
units: units,
name: string
}

export interface flamebearerTimelineV1 {
startTime: int64,
samples: [uint64]
durationDelta: int64,
watermarks: {[key: number]: int64}
}

export interface heatmap {
values: [[uint64]],
timeBuckets: int64,
valueBuckets: int64,
startTime: int64,
endTime: int64,
minValue: uint64,
maxValue: uint64,
minDepth: uint64,
maxDepth: uint64
}

export interface level {
values: number[]
}

export interface flamegraphDiff {
name: string[],
levels: level[],
total: int64,
maxSelf: int64,
leftTicks: int64,
rightTicks: int64
}
57 changes: 57 additions & 0 deletions pyroscope/json_parsers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const { bufferize } = require('./shared')

/**
*
* @param req
*/
const series = async (req, payload) => {
let body = await bufferize(payload)
body = JSON.parse(body.toString())
req.type = 'json'
return {
getStart: () => body.start,
getEnd: () => body.end,
getMatchersList: () => body.matchers,
getLabelNamesList: () => body.labelNames
}
}

const getProfileStats = async (req, payload) => {
req.type = 'json'
return null
}

const settingsGet = async (req, payload) => {
req.type = 'json'
return {}
}

const labelNames = async (req, payload) => {
req.type = 'json'
let body = await bufferize(payload)
body = JSON.parse(body.toString())
return {
getStart: () => body.start,
getEnd: () => body.end,
getName: () => body.name
}
}

const analyzeQuery = async (req, payload) => {
req.type = 'json'
let body = await bufferize(payload)
body = JSON.parse(body.toString())
return {
getStart: () => body.start,
getEnd: () => body.end,
getQuery: () => body.query
}
}

module.exports = {
series,
getProfileStats,
labelNames,
settingsGet,
analyzeQuery
}
173 changes: 173 additions & 0 deletions pyroscope/merge_stack_traces.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
const { checkVersion, DATABASE_NAME } = require('../lib/utils')
const Sql = require('@cloki/clickhouse-sql')
const { clusterName } = require('../common')
const clickhouse = require('../lib/db/clickhouse')
const { readULeb32 } = require('./pprof')
const pprofBin = require('./pprof-bin/pkg')
const {
serviceNameSelectorQuery,
labelSelectorQuery
} = require('./shared')

const sqlWithReference = (ref) => {
const res = new Sql.WithReference(ref)
res.toString = function () {
if (this.ref.inline) {
return `(${this.ref.query.toString()}) as ${this.ref.alias}`
}
return this.ref.alias
}
return res
}

let ctxIdx = 0

const newCtxIdx = () => ++ctxIdx

const importStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log, _ctxIdx, save) => {
const dist = clusterName ? '_dist' : ''
const v2 = checkVersion('profiles_v2', (fromTimeSec - 3600) * 1000)
const serviceNameSelector = serviceNameSelectorQuery(sel)
const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`)
)
const idxSelect = (new Sql.Select())
.select('fingerprint')
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRegex.sampleType)},${Sql.quoteVal(typeRegex.sampleUnit)}))`), 1),
typeIdSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
serviceNameSelector
)
).groupBy('fingerprint')
labelSelectorQuery(idxSelect, sel)
const withIdxSelect = new Sql.With('idx', idxSelect, !!clusterName)
const rawReq = (new Sql.Select()).with(withIdxSelect)
.select([
new Sql.Raw(`arrayMap(x -> (x.1, x.2, x.3, (arrayFirst(y -> y.1 == ${Sql.quoteVal(`${typeRegex.sampleType}:${typeRegex.sampleUnit}`)}, x.4) as af).2, af.3), tree)`),
'tree'
], 'functions')
.from(`${DATABASE_NAME()}.profiles${dist}`)
.where(
Sql.And(
Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')),
Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')),
new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)),
typeIdSelector,
serviceNameSelector
))
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
rawReq.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT))
}
const withRawReq = new Sql.With('raw', rawReq, !!clusterName)
const joinedReq = (new Sql.Select()).with(withRawReq).select([
new Sql.Raw('(raw.tree.1, raw.tree.2, raw.tree.3, sum(raw.tree.4), sum(raw.tree.5))'),
'tree2'
]).from(sqlWithReference(withRawReq))
.join('raw.tree', 'array')
.groupBy(new Sql.Raw('raw.tree.1'), new Sql.Raw('raw.tree.2'), new Sql.Raw('raw.tree.3'))
.orderBy(new Sql.Raw('raw.tree.1')).limit(2000000)
const withJoinedReq = new Sql.With('joined', joinedReq, !!clusterName)
const joinedAggregatedReq = (new Sql.Select()).select(
[new Sql.Raw('groupArray(tree2)'), 'tree']).from(sqlWithReference(withJoinedReq))
const functionsReq = (new Sql.Select()).select(
[new Sql.Raw('groupUniqArray(raw.functions)'), 'functions2']
).from(sqlWithReference(withRawReq)).join('raw.functions', 'array')

let brackLegacy = (new Sql.Select()).select(
[new Sql.Raw('[]::Array(String)'), 'legacy']
)
let withLegacy = null
if (!v2) {
const legacy = (new Sql.Select()).with(withIdxSelect)
.select('payload')
.from(`${DATABASE_NAME()}.profiles${dist}`)
.where(
Sql.And(
Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')),
Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')),
new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)),
Sql.Eq(new Sql.Raw('empty(tree)'), 1),
typeIdSelector,
serviceNameSelector
))
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
legacy.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT))
}
withLegacy = new Sql.With('legacy', legacy, !!clusterName)
brackLegacy = (new Sql.Select())
.select([new Sql.Raw('groupArray(payload)'), 'payloads'])
.from(sqlWithReference(withLegacy))
}
brackLegacy = new Sql.Raw(`(${brackLegacy.toString()})`)
const brack1 = new Sql.Raw(`(${joinedAggregatedReq.toString()})`)
const brack2 = new Sql.Raw(`(${functionsReq.toString()})`)

const sqlReq = (new Sql.Select())
.select(
[brackLegacy, 'legacy'],
[brack2, 'functions'],
[brack1, 'tree']
)
if (v2) {
sqlReq.with(withJoinedReq, withRawReq)
} else {
sqlReq.with(withJoinedReq, withRawReq, withLegacy)
}

let start = Date.now()
const profiles = await clickhouse.rawRequest(sqlReq.toString() + ' FORMAT RowBinary',
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
const binData = Uint8Array.from(profiles.data)
log.debug(`selectMergeStacktraces: profiles downloaded: ${binData.length / 1025}kB in ${Date.now() - start}ms`)
require('./pprof-bin/pkg/pprof_bin').init_panic_hook()
const [legacyLen, shift] = readULeb32(binData, 0)
let ofs = shift
let mergePprofLat = BigInt(0)
for (let i = 0; i < legacyLen; i++) {
const [profLen, shift] = readULeb32(binData, ofs)
ofs += shift
start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
pprofBin.merge_prof(_ctxIdx,
Uint8Array.from(profiles.data.slice(ofs, ofs + profLen)),
`${typeRegex.sampleType}:${typeRegex.sampleUnit}`)
mergePprofLat += (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
ofs += profLen
}
start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
save && require('fs').writeFileSync(`/home/hromozeka/QXIP/qryn/data.${Date.now()}.bin`,
Buffer.from(Uint8Array.from(profiles.data.slice(ofs))))
pprofBin.merge_tree(_ctxIdx, Uint8Array.from(profiles.data.slice(ofs)),
typeRegex.sampleType + ':' + typeRegex.sampleUnit)
const mergeTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
log.debug(`merge_pprof: ${mergePprofLat / BigInt(1000000)}ms`)
log.debug(`merge_tree: ${mergeTreeLat / BigInt(1000000)}ms`)
}

const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) => {
const _ctxIdx = newCtxIdx()
try {
await importStackTraces(typeRegex, sel, fromTimeSec, toTimeSec, log, _ctxIdx)
const start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
const resp = pprofBin.export_tree(_ctxIdx, typeRegex.sampleType + ':' + typeRegex.sampleUnit)
const exportTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
log.debug(`export_tree: ${exportTreeLat / BigInt(1000000)}ms`)
return Buffer.from(resp)
} finally {
try { pprofBin.drop_tree(_ctxIdx) } catch (e) {}
}
}

module.exports = {
mergeStackTraces,
importStackTraces,
newCtxIdx
}
Loading
Loading