Skip to content

Commit

Permalink
chunking on pprof merge request
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Aug 26, 2024
1 parent fd82464 commit d13503f
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 27 deletions.
7 changes: 6 additions & 1 deletion pyroscope/pprof-bin/pkg/pprof_bin.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ export function diff_tree(id1: number, id2: number, sample_type: string): Uint8A
*/
export function export_tree(id: number, sample_type: string): Uint8Array;
/**
* @param {number} id
* @param {Uint8Array} payload
*/
export function merge_trees_pprof(id: number, payload: Uint8Array): void;
/**
* @param {number} id
* @returns {Uint8Array}
*/
export function export_trees_pprof(payload: Uint8Array): Uint8Array;
export function export_trees_pprof(id: number): Uint8Array;
/**
* @param {number} id
*/
Expand Down
20 changes: 14 additions & 6 deletions pyroscope/pprof-bin/pkg/pprof_bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,28 @@ module.exports.export_tree = function(id, sample_type) {
};

/**
* @param {number} id
* @param {Uint8Array} payload
*/
module.exports.merge_trees_pprof = function(id, payload) {
const ptr0 = passArray8ToWasm0(payload, wasm.__wbindgen_malloc);
const len0 = WASM_VECTOR_LEN;
wasm.merge_trees_pprof(id, ptr0, len0);
};

/**
* @param {number} id
* @returns {Uint8Array}
*/
module.exports.export_trees_pprof = function(payload) {
module.exports.export_trees_pprof = function(id) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passArray8ToWasm0(payload, wasm.__wbindgen_malloc);
const len0 = WASM_VECTOR_LEN;
wasm.export_trees_pprof(retptr, ptr0, len0);
wasm.export_trees_pprof(retptr, id);
var r0 = getInt32Memory0()[retptr / 4 + 0];
var r1 = getInt32Memory0()[retptr / 4 + 1];
var v2 = getArrayU8FromWasm0(r0, r1).slice();
var v1 = getArrayU8FromWasm0(r0, r1).slice();
wasm.__wbindgen_free(r0, r1 * 1, 1);
return v2;
return v1;
} finally {
wasm.__wbindgen_add_to_stack_pointer(16);
}
Expand Down
Binary file modified pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm
Binary file not shown.
3 changes: 2 additions & 1 deletion pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ export function merge_prof(a: number, b: number, c: number, d: number, e: number
export function merge_tree(a: number, b: number, c: number, d: number, e: number): void;
export function diff_tree(a: number, b: number, c: number, d: number, e: number): void;
export function export_tree(a: number, b: number, c: number, d: number): void;
export function export_trees_pprof(a: number, b: number, c: number): void;
export function merge_trees_pprof(a: number, b: number, c: number): void;
export function export_trees_pprof(a: number, b: number): void;
export function drop_tree(a: number): void;
export function init_panic_hook(): void;
export function __wbindgen_malloc(a: number, b: number): number;
Expand Down
42 changes: 37 additions & 5 deletions pyroscope/pprof-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct Tree {
sample_types: Vec<String>,
max_self: Vec<i64>,
nodes_num: i32,
pprof: Profile,
}

impl Tree {
Expand Down Expand Up @@ -357,6 +358,7 @@ fn upsert_tree(ctx: &mut HashMap<u32, Mutex<Tree>>, id: u32, sample_types: Vec<S
sample_types,
max_self: vec![0; _len],
nodes_num: 1,
pprof: Profile::default(),
}),
);
}
Expand Down Expand Up @@ -384,7 +386,7 @@ impl TrieReader {
fn read_size(&mut self) -> usize {
let res = read_uleb128(&self.bytes[self.offs..]);
self.offs += res.1;
res.0
res.0.clone()
}

fn read_string(&mut self) -> String {
Expand Down Expand Up @@ -423,6 +425,24 @@ impl TrieReader {
}
res
}
fn read_blob(&mut self) -> &[u8] {
let size = self.read_size();
let string = &self.bytes[self.offs..self.offs + size];
self.offs += size;
string
}
fn read_blob_list(&mut self) -> Vec<&[u8]> {
let mut res = Vec::new();
while self.offs < self.bytes.len() {
let uleb = read_uleb128(&self.bytes[self.offs..]);
self.offs += uleb.1;
let _size = uleb.0;
let string = &self.bytes[self.offs..self.offs + _size];
self.offs += _size;
res.push(string);
}
res
}
/*fn end(&self) -> bool {
self.offs >= self.bytes.len()
}*/
Expand Down Expand Up @@ -917,11 +937,15 @@ pub fn export_tree(id: u32, sample_type: String) -> Vec<u8> {
}

#[wasm_bindgen]
pub fn export_trees_pprof(payload: &[u8]) -> Vec<u8> {
pub fn merge_trees_pprof(id: u32, payload: &[u8]) {
let p = panic::catch_unwind(|| {
let mut ctx = CTX.lock().unwrap();
upsert_tree(&mut ctx, id, vec![]);
let mut tree = ctx.get_mut(&id).unwrap().lock().unwrap();
let mut reader = TrieReader::new(payload);
let bin_profs = reader.read_blob_vec();
let bin_profs = reader.read_blob_list();
let mut merger = merge::ProfileMerge::new();
merger.merge(&mut tree.pprof);
for bin_prof in bin_profs {
if bin_prof.len() >= 2 && bin_prof[0] == 0x1f && bin_prof[1] == 0x8b {
let mut decompressed = Vec::new();
Expand All @@ -936,14 +960,22 @@ pub fn export_trees_pprof(payload: &[u8]) -> Vec<u8> {

}
let res = merger.profile();
res.encode_to_vec()
tree.pprof = res;
});
match p {
Ok(res) => return res,
Ok(_) => {}
Err(err) => panic!("{:?}", err),
}
}

#[wasm_bindgen]
pub fn export_trees_pprof(id: u32) -> Vec<u8> {
let mut ctx = CTX.lock().unwrap();
upsert_tree(&mut ctx, id, vec![]);
let tree = ctx.get_mut(&id).unwrap().lock().unwrap();
tree.pprof.encode_to_vec()
}

#[wasm_bindgen]
pub fn drop_tree(id: u32) {
let mut ctx = CTX.lock().unwrap();
Expand Down
56 changes: 43 additions & 13 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const {
HISTORY_TIMESPAN
} = require('./shared')
const settings = require('./settings')
const { mergeStackTraces } = require('./merge_stack_traces')
const { mergeStackTraces, newCtxIdx } = require('./merge_stack_traces')
const { selectSeriesImpl } = require('./select_series')
const render = require('./render')

Expand Down Expand Up @@ -166,25 +166,55 @@ const selectMergeProfile = async (req, res) => {
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const mainReq = (new Sql.Select())
.with(withIdxReq)
.select([new Sql.Raw('groupArray(payload)'), 'payload'])
.select([new Sql.Raw('payload'), 'payload'])
.from([`${DATABASE_NAME()}.profiles${dist}`, 'p'])
.where(Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))))

const profiles = await clickhouse.rawRequest(mainReq.toString() + ' FORMAT RowBinary',
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
const binData = Uint8Array.from(profiles.data)

.orderBy(new Sql.Raw('timestamp_ns'))
const approxReq = (new Sql.Select())
.select(
[new Sql.Raw('sum(length(payload))'), 'size'],
[new Sql.Raw('count()'), 'count']
)
.from([new Sql.Raw('(' + mainReq.toString() + ')'), 'main'])
console.log('!!!!!' + approxReq.toString() + ' FORMAT JSON')
const approx = await clickhouse.rawRequest(
approxReq.toString() + ' FORMAT JSON', null, DATABASE_NAME()
)
const approxData = approx.data.data[0]
logger.debug(`Approximate size: ${approxData.size} bytes, profiles count: ${approxData.count}`)
const chunksCount = Math.max(Math.ceil(approxData.size / (50 * 1024)), 1)
logger.debug(`Request is processed in: ${chunksCount} chunks`)
const chunkSize = Math.ceil(approxData.count / chunksCount)
const promises = []
require('./pprof-bin/pkg/pprof_bin').init_panic_hook()
let processNs = BigInt(0)
const start = process.hrtime.bigint()
const response = pprofBin.export_trees_pprof(binData)
logger.debug(`Pprof export took ${process.hrtime.bigint() - start} nanoseconds`)
const ctx = newCtxIdx()
for (let i = 0; i < chunksCount; i++) {
promises.push((async (i) => {
logger.debug(`Chunk ${i}: ${mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`}`)
const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`,
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
const binData = Uint8Array.from(profiles.data)
const start = process.hrtime.bigint()
pprofBin.merge_trees_pprof(ctx, binData)
const end = process.hrtime.bigint()
processNs += end - start
})(i))
}
await Promise.all(promises)
const response = pprofBin.export_trees_pprof(ctx)
const end = process.hrtime.bigint()

logger.debug(`Pprof merge took ${processNs} nanoseconds`)
logger.debug(`Pprof load + merge took ${end - start} nanoseconds`)
return res.code(200).send(Buffer.from(response))
}

Expand Down
2 changes: 1 addition & 1 deletion pyroscope/render_diff.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const renderDiff = async (req, res) => {
parseParams(req.query.leftQuery, req.query.leftFrom, req.query.leftUntil);
const [rightQuery, rightFromTimeSec, rightToTimeSec] =
parseParams(req.query.rightQuery, req.query.rightFrom, req.query.rightUntil);
if (leftQuery.typeId != rightQuery.typeId) {
if (leftQuery.typeId !== rightQuery.typeId) {
res.code(400).send('Different type IDs')
return
}
Expand Down

0 comments on commit d13503f

Please sign in to comment.