Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drop debug logs; fix memory leaks #550

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 94 additions & 89 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,101 +121,106 @@ const selectSeries = async (req, res) => {
}

const selectMergeProfile = async (req, res) => {
const _req = req.body
const fromTimeSec = Math.floor(req.getStart && req.getStart()
? parseInt(req.getStart()) / 1000
: Date.now() / 1000 - HISTORY_TIMESPAN)
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
: Date.now() / 1000)
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
throw new QrynBadRequest('No type provided')
}
typeID = parseTypeId(typeID)
if (!typeID) {
throw new QrynBadRequest('Invalid type provided')
}
const dist = clusterName ? '_dist' : ''
// const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit
const labelSelector = _req.getLabelSelector && _req.getLabelSelector()
const ctx = newCtxIdx()
try {
const _req = req.body
const fromTimeSec = Math.floor(req.getStart && req.getStart()
? parseInt(req.getStart()) / 1000
: Date.now() / 1000 - HISTORY_TIMESPAN)
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
: Date.now() / 1000)
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
throw new QrynBadRequest('No type provided')
}
typeID = parseTypeId(typeID)
if (!typeID) {
throw new QrynBadRequest('Invalid type provided')
}
const dist = clusterName ? '_dist' : ''
// const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit
const labelSelector = _req.getLabelSelector && _req.getLabelSelector()

const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`))
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`))
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)

const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
typeIdSelector,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(
new Sql.Raw(
`has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`
),
1
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
typeIdSelector,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(
new Sql.Raw(
`has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`
),
1
)
)
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const mainReq = (new Sql.Select())
.with(withIdxReq)
.select([new Sql.Raw('payload'), 'payload'])
.from([`${DATABASE_NAME()}.profiles${dist}`, 'p'])
.where(Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))))
.orderBy(new Sql.Raw('timestamp_ns'))
const approxReq = (new Sql.Select())
.select(
[new Sql.Raw('sum(length(payload))'), 'size'],
[new Sql.Raw('count()'), 'count']
)
.from([new Sql.Raw('(' + mainReq.toString() + ')'), 'main'])
const approx = await clickhouse.rawRequest(
approxReq.toString() + ' FORMAT JSON', null, DATABASE_NAME()
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const mainReq = (new Sql.Select())
.with(withIdxReq)
.select([new Sql.Raw('payload'), 'payload'])
.from([`${DATABASE_NAME()}.profiles${dist}`, 'p'])
.where(Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))))
.orderBy(new Sql.Raw('timestamp_ns'))
const approxReq = (new Sql.Select())
.select(
[new Sql.Raw('sum(length(payload))'), 'size'],
[new Sql.Raw('count()'), 'count']
)
.from([new Sql.Raw('(' + mainReq.toString() + ')'), 'main'])
console.log('!!!!!' + approxReq.toString() + ' FORMAT JSON')
const approx = await clickhouse.rawRequest(
approxReq.toString() + ' FORMAT JSON', null, DATABASE_NAME()
)
const approxData = approx.data.data[0]
logger.debug(`Approximate size: ${approxData.size} bytes, profiles count: ${approxData.count}`)
const chunksCount = Math.max(Math.ceil(approxData.size / (50 * 1024 * 1024)), 1)
logger.debug(`Request is processed in: ${chunksCount} chunks`)
const chunkSize = Math.ceil(approxData.count / chunksCount)
const promises = []
require('./pprof-bin/pkg/pprof_bin').init_panic_hook()
let processNs = BigInt(0)
const start = process.hrtime.bigint()
const ctx = newCtxIdx()
for (let i = 0; i < chunksCount; i++) {
promises.push((async (i) => {
logger.debug(`Chunk ${i}: ${mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`}`)
const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`,
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
const binData = Uint8Array.from(profiles.data)
const start = process.hrtime.bigint()
pprofBin.merge_trees_pprof(ctx, binData)
const end = process.hrtime.bigint()
processNs += end - start
})(i))
}
await Promise.all(promises)
const response = pprofBin.export_trees_pprof(ctx)
const end = process.hrtime.bigint()
const approxData = approx.data.data[0]
logger.debug(`Approximate size: ${approxData.size} bytes, profiles count: ${approxData.count}`)
const chunksCount = Math.max(Math.ceil(approxData.size / (50 * 1024 * 1024)), 1)
logger.debug(`Request is processed in: ${chunksCount} chunks`)
const chunkSize = Math.ceil(approxData.count / chunksCount)
const promises = []
require('./pprof-bin/pkg/pprof_bin').init_panic_hook()
let processNs = BigInt(0)
const start = process.hrtime.bigint()

for (let i = 0; i < chunksCount; i++) {
promises.push((async (i) => {
logger.debug(`Processing chunk ${i}`)
const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`,
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
const binData = Uint8Array.from(profiles.data)
logger.debug(`Chunk ${i} - ${binData.length} bytes`)
const start = process.hrtime.bigint()
pprofBin.merge_trees_pprof(ctx, binData)
const end = process.hrtime.bigint()
processNs += end - start
})(i))
}
await Promise.all(promises)
const response = pprofBin.export_trees_pprof(ctx)
const end = process.hrtime.bigint()

logger.debug(`Pprof merge took ${processNs} nanoseconds`)
logger.debug(`Pprof load + merge took ${end - start} nanoseconds`)
return res.code(200).send(Buffer.from(response))
logger.debug(`Pprof merge took ${processNs} nanoseconds`)
logger.debug(`Pprof load + merge took ${end - start} nanoseconds`)
return res.code(200).send(Buffer.from(response))
} finally {
pprofBin.drop_tree(ctx)
}
}

const series = async (req, res) => {
Expand Down
Loading