Skip to content

Commit

Permalink
ADVANCED_PROFILES_MERGE_LIMIT to limit the merge; limit for simultane…
Browse files Browse the repository at this point in the history
…ous chunking
  • Loading branch information
akvlad committed Aug 26, 2024
1 parent 28cf4c1 commit 4a65580
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ const selectSeries = async (req, res) => {
return selectSeriesImpl(fromTimeSec, toTimeSec, req.body)
}

let mergeRequestsCounter = 0
const mergeRequestsLimit = 10

const selectMergeProfile = async (req, res) => {
const ctx = newCtxIdx()
try {
Expand Down Expand Up @@ -166,15 +169,18 @@ const selectMergeProfile = async (req, res) => {
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const mainReq = (new Sql.Select())
let mainReq = (new Sql.Select())
.with(withIdxReq)
.select([new Sql.Raw('payload'), 'payload'])
.from([`${DATABASE_NAME()}.profiles${dist}`, 'p'])
.where(Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))))
.orderBy(new Sql.Raw('timestamp_ns'))
.orderBy([new Sql.Raw('timestamp_ns'), 'DESC'], [new Sql.Raw('p.fingerprint'), 'ASC'])
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
mainReq = mainReq.limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT))
}
const approxReq = (new Sql.Select())
.select(
[new Sql.Raw('sum(length(payload))'), 'size'],
Expand All @@ -196,13 +202,28 @@ const selectMergeProfile = async (req, res) => {

for (let i = 0; i < chunksCount; i++) {
promises.push((async (i) => {
// eslint-disable-next-line no-unmodified-loop-condition
while (mergeRequestsCounter >= mergeRequestsLimit) {
await (new Promise((resolve) => setTimeout(resolve, 50)))
}
logger.debug(`Processing chunk ${i}`)
const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`,
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
mergeRequestsCounter++
let profiles = null
try {
let end = i * chunkSize + chunkSize
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT && end > process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
end = process.env.ADVANCED_PROFILES_MERGE_LIMIT
}
mainReq.limit(end - i * chunkSize, i * chunkSize)
profiles = await clickhouse.rawRequest(mainReq.toString() + ' FORMAT RowBinary',
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
} finally {
mergeRequestsCounter--
}
const binData = Uint8Array.from(profiles.data)
logger.debug(`Chunk ${i} - ${binData.length} bytes`)
const start = process.hrtime.bigint()
Expand Down

0 comments on commit 4a65580

Please sign in to comment.