diff --git a/pyroscope/pyroscope.js b/pyroscope/pyroscope.js index 218202ef..522503bd 100644 --- a/pyroscope/pyroscope.js +++ b/pyroscope/pyroscope.js @@ -120,15 +120,20 @@ const selectSeries = async (req, res) => { return selectSeriesImpl(fromTimeSec, toTimeSec, req.body) } +let mergeRequestsCounter = 0 +const mergeRequestsLimit = 10 + const selectMergeProfile = async (req, res) => { const ctx = newCtxIdx() try { const _req = req.body - const fromTimeSec = Math.floor(req.getStart && req.getStart() - ? parseInt(req.getStart()) / 1000 - : Date.now() / 1000 - HISTORY_TIMESPAN) - const toTimeSec = Math.floor(req.getEnd && req.getEnd() - ? parseInt(req.getEnd()) / 1000 + const fromTimeSec = + Math.floor(_req && _req.getStart + ? parseInt(_req.getStart()) / 1000 + : (Date.now() - HISTORY_TIMESPAN) / 1000) + const toTimeSec = + Math.floor(_req && _req.getEnd + ? parseInt(_req.getEnd()) / 1000 : Date.now() / 1000) let typeID = _req.getProfileTypeid && _req.getProfileTypeid() if (!typeID) { @@ -166,7 +171,7 @@ const selectMergeProfile = async (req, res) => { ) labelSelectorQuery(idxReq, labelSelector) const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName)) - const mainReq = (new Sql.Select()) + let mainReq = (new Sql.Select()) .with(withIdxReq) .select([new Sql.Raw('payload'), 'payload']) .from([`${DATABASE_NAME()}.profiles${dist}`, 'p']) @@ -174,7 +179,10 @@ const selectMergeProfile = async (req, res) => { new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)), Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)), Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)))) - .orderBy(new Sql.Raw('timestamp_ns')) + .orderBy([new Sql.Raw('timestamp_ns'), 'DESC'], [new Sql.Raw('p.fingerprint'), 'ASC']) + if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) { + mainReq = mainReq.limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) + } const approxReq = (new Sql.Select()) .select( [new Sql.Raw('sum(length(payload))'), 'size'], @@ -196,13 +204,28 @@ const selectMergeProfile = async (req, res) => { for (let i = 0; i < chunksCount; i++) { promises.push((async (i) => { + // eslint-disable-next-line no-unmodified-loop-condition + while (mergeRequestsCounter >= mergeRequestsLimit) { + await (new Promise((resolve) => setTimeout(resolve, 50))) + } logger.debug(`Processing chunk ${i}`) - const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`, - null, - DATABASE_NAME(), - { - responseType: 'arraybuffer' - }) + mergeRequestsCounter++ + let profiles = null + try { + let end = i * chunkSize + chunkSize + if (process.env.ADVANCED_PROFILES_MERGE_LIMIT && end > process.env.ADVANCED_PROFILES_MERGE_LIMIT) { + end = process.env.ADVANCED_PROFILES_MERGE_LIMIT + } + mainReq.limit(end - i * chunkSize, i * chunkSize) + profiles = await clickhouse.rawRequest(mainReq.toString() + ' FORMAT RowBinary', + null, + DATABASE_NAME(), + { + responseType: 'arraybuffer' + }) + } finally { + mergeRequestsCounter-- + } const binData = Uint8Array.from(profiles.data) logger.debug(`Chunk ${i} - ${binData.length} bytes`) const start = process.hrtime.bigint()