Skip to content

Commit

Permalink
#423 distinguish logs and metrics (#440)
Browse files Browse the repository at this point in the history
* alter tables;

* debug

* ingestion impl

* reading impl; reading test

* reading impl; reading test

* test if qryn works

* workflow fixes

* test fixes

* debug

* fix tests
  • Loading branch information
akvlad authored Jan 29, 2024
1 parent 94f049f commit 6c4e5e2
Show file tree
Hide file tree
Showing 22 changed files with 819 additions and 190 deletions.
9 changes: 2 additions & 7 deletions .github/workflows/node-clickhouse-cluster.js.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This workflow will validate qryn using nodejs + clickhouse

name: QRYN CI
name: QRYN CI CLUSTER

on:
push:
Expand Down Expand Up @@ -44,14 +44,9 @@ jobs:
- name: Workflow Telemetry
uses: runforesight/[email protected]
if: github.event_name != 'pull_request'
- env:
CLUSTER_NAME: test_cluster_two_shards
CLICKHOUSE_DB: qryn
CLICKHOUSE_TSDB: qryn
run: node qryn.mjs >/dev/stdout &
- env:
CLICKHOUSE_DB: qryn
CLICKHOUSE_TSDB: qryn
INTEGRATION_E2E: 1
CLOKI_EXT_URL: 127.0.0.1:3100
run: npm run test --forceExit
run: CLUSTER_NAME=test_cluster_two_shards node qryn.mjs >/dev/stdout & npm run test --forceExit
6 changes: 6 additions & 0 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,9 @@ module.exports.bun = () => {
return false
}
}

module.exports.logType = process.env.DISTINGUISH_LOGS_METRICS ? 1 : 0

module.exports.metricType = process.env.DISTINGUISH_LOGS_METRICS ? 2 : 0

module.exports.bothType = 0
6 changes: 4 additions & 2 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ const bulk = {
fingerprint: r[0],
timestamp_ns: r[1],
value: r[2],
string: r[3]
string: r[3],
type: r[4]
}, jsonSerializer)).join('\n'),
id: id
})
Expand All @@ -151,7 +152,8 @@ const bulkLabels = {
date: r[0],
fingerprint: r[1],
labels: r[2],
name: r[3]
name: r[3],
type: r[4]
}, jsonSerializer)).join('\n'),
id: id
})
Expand Down
50 changes: 49 additions & 1 deletion lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,47 @@ FROM samples_v3 as samples
GROUP BY fingerprint, timestamp_ns;`,

"INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update', " +
"'v3_2', toString(toUnixTimestamp(NOW())), NOW())"
"'v3_2', toString(toUnixTimestamp(NOW())), NOW())",

`ALTER TABLE {{DB}}.time_series {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS type UInt8,
MODIFY ORDER BY (fingerprint, type)`,

`ALTER TABLE {{DB}}.samples_v3 {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS type UInt8`,

`ALTER TABLE {{DB}}.time_series_gin {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS type UInt8,
MODIFY ORDER BY (key, val, fingerprint, type)`,

`ALTER TABLE {{DB}}.metrics_15s {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS type UInt8,
MODIFY ORDER BY (fingerprint, timestamp_ns, type)`,

'RENAME TABLE {{DB}}.time_series_gin_view TO time_series_gin_view_bak {{{OnCluster}}}',

`CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.time_series_gin_view {{{OnCluster}}} TO time_series_gin
AS SELECT date, pairs.1 as key, pairs.2 as val, fingerprint, type
FROM time_series ARRAY JOIN JSONExtractKeysAndValues(time_series.labels, 'String') as pairs`,

'DROP TABLE IF EXISTS {{DB}}.time_series_gin_view_bak {{{OnCluster}}}',

'RENAME TABLE {{DB}}.metrics_15s_mv TO metrics_15s_mv_bak {{{OnCluster}}}',

`CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.metrics_15s_mv {{{OnCluster}}} TO metrics_15s AS
SELECT fingerprint,
intDiv(samples.timestamp_ns, 15000000000) * 15000000000 as timestamp_ns,
argMaxState(value, samples.timestamp_ns) as last,
maxSimpleState(value) as max,
minSimpleState(value) as min,
countState() as count,
sumSimpleState(value) as sum,
sumSimpleState(length(string)) as bytes,
type
FROM samples_v3 as samples
GROUP BY fingerprint, timestamp_ns, type;`,

'DROP TABLE IF EXISTS {{DB}}.metrics_15s_mv_bak {{{OnCluster}}}'
]

module.exports.traces = [
Expand Down Expand Up @@ -195,6 +235,14 @@ module.exports.overall_dist = [
val String,
fingerprint UInt64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand());`,

'ALTER TABLE {{DB}}.metrics_15s_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8;',

'ALTER TABLE {{DB}}.samples_v3_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8',

'ALTER TABLE {{DB}}.time_series_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8;',

'ALTER TABLE {{DB}}.time_series_gin_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8;'
]

module.exports.traces_dist = [
Expand Down
4 changes: 2 additions & 2 deletions lib/db/throttler.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class TimeoutThrottler {
}

const samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string) FORMAT JSONEachRow`)
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`)
const timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name) FORMAT JSONEachRow`)
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`)
const tracesThottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
Expand Down
8 changes: 5 additions & 3 deletions lib/handlers/datadog_log_push.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const stringify = require('../utils').stringify
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
const { fingerPrint } = require('../utils')
const { readonly } = require('../../common')
const { readonly, logType } = require('../../common')

const tagsToObject = (data, delimiter = ',') =>
Object.fromEntries(data.split(',').map(v => {
Expand Down Expand Up @@ -79,7 +79,8 @@ async function handler (req, res) {
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
JSONLabels.target || '',
logType
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
Expand All @@ -95,7 +96,8 @@ async function handler (req, res) {
finger,
BigInt((new Date().getTime() * 1000) + '000'),
null,
stream.message
stream.message,
logType
]
req.log.debug({ finger, values }, 'store')
promises.push(bulk.add([values]))
Expand Down
8 changes: 5 additions & 3 deletions lib/handlers/datadog_series_push.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const stringify = require('../utils').stringify
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
const { fingerPrint } = require('../utils')
const { readonly } = require('../../common')
const { readonly, metricType } = require('../../common')

async function handler (req, res) {
req.log.debug('Datadog Series Index Request')
Expand Down Expand Up @@ -73,7 +73,8 @@ async function handler (req, res) {
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.__name__ || 'undefined'
JSONLabels.__name__ || 'undefined',
metricType
]]))
for (const key in JSONLabels) {
labels.add('_LABELS_', key)
Expand All @@ -98,7 +99,8 @@ async function handler (req, res) {
finger,
BigInt(pad('0000000000000000000', entry.timestamp)),
entry.value,
JSONLabels.__name__ || 'undefined'
JSONLabels.__name__ || 'undefined',
metricType
]
promises.push(bulk.add([values]))
})
Expand Down
8 changes: 5 additions & 3 deletions lib/handlers/elastic_bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

const { asyncLogError } = require('../../common')
const { asyncLogError, logType } = require('../../common')
const stringify = require('../utils').stringify
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
Expand Down Expand Up @@ -80,7 +80,8 @@ async function handler (req, res) {
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
JSONLabels.target || '',
logType
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
Expand All @@ -96,7 +97,8 @@ async function handler (req, res) {
finger,
BigInt((new Date().getTime() * 1000) + '000'),
null,
JSON.stringify(stream) || stream
JSON.stringify(stream) || stream,
logType
]
req.log.debug({ finger, values }, 'store')
promises.push(bulk.add([values]))
Expand Down
8 changes: 5 additions & 3 deletions lib/handlers/elastic_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/

const { asyncLogError } = require('../../common')
const { asyncLogError, logType } = require('../../common')
const stringify = require('../utils').stringify
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
Expand Down Expand Up @@ -67,7 +67,8 @@ async function handler (req, res) {
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
JSONLabels.target || '',
logType
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
Expand All @@ -88,7 +89,8 @@ async function handler (req, res) {
finger,
BigInt((new Date().getTime() * 1000) + '000'),
null,
JSON.stringify(stream) || stream
JSON.stringify(stream) || stream,
logType
]
req.log.debug({ finger, values }, 'store')
promises.push(bulk.add([values]))
Expand Down
34 changes: 22 additions & 12 deletions lib/handlers/influx_write.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

const stringify = require('../utils').stringify
const influxParser = require('../influx')
const { asyncLogError, errors } = require('../../common')
const { asyncLogError, errors, bothType, logType, metricType } = require('../../common')
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
const { fingerPrint } = require('../utils')
Expand Down Expand Up @@ -69,6 +69,7 @@ async function handler (req, res) {
let JSONLabels = {}
let JSONFields = {}
let finger = null
let strLabels = ''
try {
if (stream.tags) {
JSONLabels = stream.tags
Expand All @@ -80,16 +81,10 @@ async function handler (req, res) {
JSONLabels.__name__ = stream.measurement || 'null'
}
// Calculate Fingerprint
const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort()))
strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort()))
finger = fingerPrint(strLabels)
labels.add(finger.toString(), stream.labels)
// Store Fingerprint
bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strLabels,
stream.measurement || ''
]])
for (const key in JSONLabels) {
// req.log.debug({ key, data: JSONLabels[key] }, 'Storing label');
labels.add('_LABELS_', key)
Expand All @@ -98,6 +93,7 @@ async function handler (req, res) {
} catch (err) {
asyncLogError(err, req.log)
}
let type = bothType
const timestamp = stream.timestamp || JSONFields.timestamp
/* metrics */
if (stream.fields && stream.measurement !== 'syslog' && !JSONFields.message) {
Expand All @@ -123,6 +119,7 @@ async function handler (req, res) {
]
bulk.add([values])
}
type = metricType
/* logs or syslog */
} else if (stream.measurement === 'syslog' || JSONFields.message) {
// Send fields as a JSON object for qryn to parse
Expand All @@ -134,7 +131,16 @@ async function handler (req, res) {
JSONFields.message
]
bulk.add([values])
type = logType
}

bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strLabels,
stream.measurement || '',
type
]])
})
}
await Promise.all(promises)
Expand All @@ -156,13 +162,15 @@ function telegrafPrometheusV1 (stream) {
new Date().toISOString().split('T')[0],
fp,
strLabels,
entry.measurement || ''
entry.measurement || '',
logType
]]))
const values = [
fp,
timestamp,
0,
entry.fields.message || ''
entry.fields.message || '',
logType
]
promises.push(bulk.add([values]))
}
Expand All @@ -182,13 +190,15 @@ function telegrafPrometheusV1 (stream) {
new Date().toISOString().split('T')[0],
fp,
strLabels,
entry.measurement || ''
entry.measurement || '',
metricType
]]))
const values = [
fp,
timestamp,
iValue || 0,
key || ''
key || '',
metricType
]
promises.push(bulk.add([values]))
}
Expand Down
6 changes: 4 additions & 2 deletions lib/handlers/label.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@

const clickhouse = require('../db/clickhouse')
const utils = require('../utils')
const { clusterName } = require('../../common')
const { clusterName, bothType, logType } = require('../../common')
const dist = clusterName ? '_dist' : ''

async function handler (req, res) {
req.log.debug('GET /loki/api/v1/label')
const types = req.types || [bothType, logType]
let where = [
req.query.start && !isNaN(parseInt(req.query.start)) ? `date >= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.start)}, 1000000000)))` : null,
req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null
req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null,
`type IN (${types.map(t => `${t}`).join(',')})`
].filter(w => w)
where = where.length ? `WHERE ${where.join(' AND ')}` : ''
const q = `SELECT DISTINCT key FROM time_series_gin${dist} ${where} FORMAT JSON`
Expand Down
6 changes: 4 additions & 2 deletions lib/handlers/label_values.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
const clickhouse = require('../db/clickhouse')
const Sql = require('@cloki/clickhouse-sql')
const utils = require('../utils')
const { clusterName } = require('../../common')
const { clusterName, bothType, logType } = require('../../common')
const dist = clusterName ? '_dist' : ''

async function handler (req, res) {
req.log.debug(`GET /api/prom/label/${req.params.name}/values`)
const types = req.types || [bothType, logType]
let where = [
`key = ${Sql.val(req.params.name)}`,
req.query.start && !isNaN(parseInt(req.query.start)) ? `date >= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.start)}, 1000000000)))` : null,
req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null
req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null,
`type IN (${types.map(t => `${t}`).join(',')})`
].filter(w => w)
where = `WHERE ${where.join(' AND ')}`
const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} FORMAT JSON`
Expand Down
Loading

0 comments on commit 6c4e5e2

Please sign in to comment.