From 4a65580e86581e07c0c9468a15a1e179e0f79ab7 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 26 Aug 2024 19:04:32 +0300 Subject: [PATCH] ADVANCED_PROFILES_MERGE_LIMIT to limit the merge; limit for simultaneous chunking --- pyroscope/pyroscope.js | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/pyroscope/pyroscope.js b/pyroscope/pyroscope.js index 218202ef..57e82543 100644 --- a/pyroscope/pyroscope.js +++ b/pyroscope/pyroscope.js @@ -120,6 +120,9 @@ const selectSeries = async (req, res) => { return selectSeriesImpl(fromTimeSec, toTimeSec, req.body) } +let mergeRequestsCounter = 0 +const mergeRequestsLimit = 10 + const selectMergeProfile = async (req, res) => { const ctx = newCtxIdx() try { @@ -166,7 +169,7 @@ const selectMergeProfile = async (req, res) => { ) labelSelectorQuery(idxReq, labelSelector) const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName)) - const mainReq = (new Sql.Select()) + let mainReq = (new Sql.Select()) .with(withIdxReq) .select([new Sql.Raw('payload'), 'payload']) .from([`${DATABASE_NAME()}.profiles${dist}`, 'p']) @@ -174,7 +177,10 @@ const selectMergeProfile = async (req, res) => { new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)), Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)), Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)))) - .orderBy(new Sql.Raw('timestamp_ns')) + .orderBy([new Sql.Raw('timestamp_ns'), 'DESC'], [new Sql.Raw('p.fingerprint'), 'ASC']) + if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) { + mainReq = mainReq.limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) + } const approxReq = (new Sql.Select()) .select( [new Sql.Raw('sum(length(payload))'), 'size'], @@ -196,13 +202,28 @@ const selectMergeProfile = async (req, res) => { for (let i = 0; i < chunksCount; i++) { promises.push((async (i) => { + // eslint-disable-next-line no-unmodified-loop-condition + while (mergeRequestsCounter >= mergeRequestsLimit) { + await (new Promise((resolve) => setTimeout(resolve, 50))) + } logger.debug(`Processing chunk ${i}`) - const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`, - null, - DATABASE_NAME(), - { - responseType: 'arraybuffer' - }) + mergeRequestsCounter++ + let profiles = null + try { + let end = i * chunkSize + chunkSize + if (process.env.ADVANCED_PROFILES_MERGE_LIMIT && end > process.env.ADVANCED_PROFILES_MERGE_LIMIT) { + end = process.env.ADVANCED_PROFILES_MERGE_LIMIT + } + mainReq.limit(end - i * chunkSize, i * chunkSize) + profiles = await clickhouse.rawRequest(mainReq.toString() + ' FORMAT RowBinary', + null, + DATABASE_NAME(), + { + responseType: 'arraybuffer' + }) + } finally { + mergeRequestsCounter-- + } const binData = Uint8Array.from(profiles.data) logger.debug(`Chunk ${i} - ${binData.length} bytes`) const start = process.hrtime.bigint()