Skip to content

Commit

Permalink
#feat: bun support
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 6, 2023
1 parent dd9eaa2 commit b2e906e
Show file tree
Hide file tree
Showing 37 changed files with 664 additions and 122 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/node-clickhouse.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ jobs:
CLICKHOUSE_TSDB: loki
INTEGRATION_E2E: 1
CLOKI_EXT_URL: 127.0.0.1:3100
run: node qryn.js >/dev/stdout & npm run test --forceExit
run: node qryn.mjs >/dev/stdout & npm run test --forceExit
2 changes: 2 additions & 0 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,5 @@ module.exports.isCustomSamplesOrderingRule = () => {
module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*'

module.exports.clusterName = process.env.CLUSTER_NAME

module.exports.readonly = process.env.READONLY || false
2 changes: 1 addition & 1 deletion docker/docker-compose-centos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ services:
container_name: centos
volumes:
- ../:/opt/qryn
entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.js'
entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.mjs'
167 changes: 167 additions & 0 deletions lib/bun_wrapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
const { Transform } = require('stream')
const log = require('./logger')
const { EventEmitter } = require('events')

class BodyStream extends Transform {
_transform (chunk, encoding, callback) {
callback(null, chunk)
}

once (event, listerer) {
const self = this
const _listener = (e) => {
listerer(e)
self.removeListener(event, _listener)
}
this.on(event, _listener)
}
}

const wrapper = (handler, parsers) => {
/**
* @param ctx {Request}
*/
const res = async (ctx, server) => {
let response = ''
let status = 200
let reqBody = ''
let headers = {}

const stream = new BodyStream()
setTimeout(async () => {
if (!ctx.body) {
stream.end()
return
}
for await (const chunk of ctx.body) {
stream.write(chunk)
}
stream.end()
})
const req = {
headers: Object.fromEntries(ctx.headers.entries()),
raw: stream,
log: log,
params: ctx.params || {},
query: {}
}
for (const [key, value] of (new URL(ctx.url)).searchParams) {
if (!(key in req.query)) {
req.query[key] = value
continue
}
req.query[key] = Array.isArray(req.query[key])
? [...req.query[key], value]
: [req.query[key], value]
}
const res = {
send: (msg) => {
response = msg
},
code: (code) => {
status = code
return res
},
header: (key, value) => {
headers[key] = value
return res
},
headers: (hdrs) => {
headers = { ...headers, ...hdrs }
return res
}
}

if (parsers) {
const contentType = (ctx.headers.get('Content-Type') || '')
let ok = false
for (const [type, parser] of Object.entries(parsers)) {
if (type !== '*' && contentType.indexOf(type) > -1) {
log.debug(`parsing ${type}`)
reqBody = await parser(req, stream)
ok = true
log.debug(`parsing ${type} ok`)
}
}
if (!ok && parsers['*']) {
log.debug('parsing *')
reqBody = await parsers['*'](req, stream)
ok = true
log.debug('parsing * ok')
}
if (!ok) {
throw new Error('undefined content type ' + contentType)
}
}

req.body = reqBody || stream

let result = handler(req, res)
if (result && result.then) {
result = await result
}
if (result && result.on) {
response = ''
result.on('data', (d) => {
response += d
})
await new Promise((resolve, reject) => {
result.on('end', resolve)
result.on('error', reject)
result.on('close', resolve)
})
result = null
}
if (result) {
response = result
}
if (response instanceof Object && typeof response !== 'string') {
response = JSON.stringify(response)
}
return new Response(response, { status: status, headers: headers })
}
return res
}

const wsWrapper = (handler) => {
/**
* @param ctx {Request}
*/
const res = {
open: async (ctx, server) => {
const req = {
headers: Object.fromEntries(ctx.data.ctx.headers.entries()),
log: log,
query: {}
}
for (const [key, value] of (new URL(ctx.data.ctx.url)).searchParams) {
if (!(key in req.query)) {
req.query[key] = value
continue
}
req.query[key] = Array.isArray(req.query[key])
? [...req.query[key], value]
: [req.query[key], value]
}

ctx.closeEmitter = new EventEmitter()
ctx.closeEmitter.send = ctx.send.bind(ctx)

const ws = {
socket: ctx.closeEmitter
}

const result = handler(ws, { query: req.query })
if (result && result.then) {
await result
}
},
close: (ctx) => { ctx.closeEmitter.emit('close') }
}
return res
}

module.exports = {
wrapper,
wsWrapper
}
1 change: 1 addition & 0 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ const queryTempoScanV2 = async function (query) {
}
const limit = query.limit ? `LIMIT ${parseInt(query.limit)}` : ''
const sql = `${select} ${from} WHERE ${where.join(' AND ')} ORDER BY timestamp_ns DESC ${limit} FORMAT JSON`
console.log(sql)
const resp = await rawRequest(sql, null, process.env.CLICKHOUSE_DB || 'cloki')
return resp.data.data ? resp.data.data : JSON.parse(resp.data).data
}
Expand Down
8 changes: 7 additions & 1 deletion lib/db/zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ module.exports = class {
* @returns {string}
*/
toJson () {
return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val)
const res = {
...this,
timestamp_ns: this.timestamp_ns.toString(),
duration_ns: this.duration_ns.toString()
}
return JSON.stringify(res)
//return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/handlers/404.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
function handler (req, res) {
req.log.debug('unsupported', req.url)
return res.send('404 Not Supported')
return res.code(404).send('404 Not Supported')
}

module.exports = handler
18 changes: 11 additions & 7 deletions lib/handlers/datadog_log_push.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
*/

const stringify = require('../utils').stringify
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
const { fingerPrint } = require('../utils')
const { readonly } = require('../../common')

const tagsToObject = (data, delimiter = ',') =>
Object.fromEntries(data.split(',').map(v => {
const fields = v.split(':')
return [fields[0], fields[1]]
}))

async function handler (req, res) {
const self = this
req.log.debug('Datadog Log Index Request')
if (!req.body) {
req.log.error('No Request Body or Target!')
return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
}
if (this.readonly) {
if (readonly) {
req.log.error('Readonly! No push support.')
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}
Expand Down Expand Up @@ -69,18 +73,18 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
finger = self.fingerPrint(strJson)
finger = fingerPrint(strJson)
// Store Fingerprint
promises.push(self.bulk_labels.add([[
promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
self.labels.add('_LABELS_', key)
self.labels.add(key, JSONLabels[key])
labels.add('_LABELS_', key)
labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err }, 'failed ingesting datadog log')
Expand All @@ -94,7 +98,7 @@ async function handler (req, res) {
stream.message
]
req.log.debug({ finger, values }, 'store')
promises.push(self.bulk.add([values]))
promises.push(bulk.add([values]))
})
}
await Promise.all(promises)
Expand Down
19 changes: 11 additions & 8 deletions lib/handlers/datadog_series_push.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
*/
const stringify = require('../utils').stringify
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
const { fingerPrint } = require('../utils')
const { readonly } = require('../../common')

async function handler (req, res) {
const self = this
req.log.debug('Datadog Series Index Request')
if (!req.body) {
req.log.error('No Request Body!')
res.code(500).send()
return
}
if (this.readonly) {
if (readonly) {
req.log.error('Readonly! No push support.')
res.code(500).send()
return
Expand Down Expand Up @@ -63,18 +66,18 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
finger = self.fingerPrint(strJson)
self.labels.add(finger.toString(), stream.labels)
finger = fingerPrint(strJson)
labels.add(finger.toString(), stream.labels)
// Store Fingerprint
promises.push(self.bulk_labels.add([[
promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.__name__ || 'undefined'
]]))
for (const key in JSONLabels) {
self.labels.add('_LABELS_', key)
self.labels.add(key, JSONLabels[key])
labels.add('_LABELS_', key)
labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err })
Expand All @@ -97,7 +100,7 @@ async function handler (req, res) {
entry.value,
JSONLabels.__name__ || 'undefined'
]
promises.push(self.bulk.add([values]))
promises.push(bulk.add([values]))
})
}
})
Expand Down
20 changes: 13 additions & 7 deletions lib/handlers/elastic_bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@

const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
const DATABASE = require('../db/clickhouse')
const { bulk_labels, bulk, labels } = DATABASE.cache
const { fingerPrint } = require('../utils')
const { readonly } = require('../../common')

async function handler (req, res) {
const self = this
req.log.debug('ELASTIC Bulk Request')
if (!req.body) {
asyncLogError('No Request Body or Target!' + req.body, req.log)
return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
}
if (this.readonly) {
if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}
Expand All @@ -38,6 +41,9 @@ async function handler (req, res) {
const promises = []
if (streams) {
streams.forEach(function (stream) {
if (!stream) {
return
}
try {
stream = JSON.parse(stream)
} catch (err) { asyncLogError(err, req.log); return };
Expand Down Expand Up @@ -67,19 +73,19 @@ async function handler (req, res) {
}
// Calculate Fingerprint
const strJson = stringify(JSONLabels)
finger = self.fingerPrint(strJson)
finger = fingerPrint(strJson)
req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT')
// Store Fingerprint
promises.push(self.bulk_labels.add([[
promises.push(bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
self.labels.add('_LABELS_', key)
self.labels.add(key, JSONLabels[key])
labels.add('_LABELS_', key)
labels.add(key, JSONLabels[key])
}
} catch (err) {
asyncLogError(err, req.log)
Expand All @@ -93,7 +99,7 @@ async function handler (req, res) {
JSON.stringify(stream) || stream
]
req.log.debug({ finger, values }, 'store')
promises.push(self.bulk.add([values]))
promises.push(bulk.add([values]))

// Reset State, Expect Command
lastTags = false
Expand Down
Loading

0 comments on commit b2e906e

Please sign in to comment.