Skip to content

Commit

Permalink
Merge pull request #534 from metrico/feature/otlp_push
Browse files Browse the repository at this point in the history
otlp logs push
  • Loading branch information
akvlad authored Jul 15, 2024
2 parents bae94b6 + 2b1fff0 commit 37097df
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 5 deletions.
106 changes: 106 additions & 0 deletions lib/handlers/otlp_log_push.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
const DATABASE = require('../db/clickhouse')
const { asyncLogError, logType, metricType, bothType, readonly } = require('../../common')
const UTILS = require('../utils')
const stringify = UTILS.stringify
const fingerPrint = UTILS.fingerPrint
const { bulk_labels, bulk, labels } = DATABASE.cache

async function handle (req, res) {
if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(500).send()
}
try {
const promises = []
const fingerprints = {}
for (const resourceLogsEntry of req.body.resourceLogs) {
const resAttrs = resource2Attrs(resourceLogsEntry.resource)
for (const scopeLogsEntry of resourceLogsEntry.scopeLogs) {
const scopeAttrs = {
...resAttrs,
...resource2Attrs(scopeLogsEntry.scope)
}
for (const logRecord of scopeLogsEntry.logRecords) {
const logAttrs = {
...scopeAttrs,
...resource2Attrs(logRecord)
}
if (logRecord.severityText) {
logAttrs.level = logRecord.severityText
}
const labels = stringify(logAttrs)
const fingerprint = fingerPrint(labels)
const ts = BigInt(logRecord.timeUnixNano)
promises.push(bulk.add([[
fingerprint,
ts,
null,
anyValueToString(logRecord.body),
logType
]]))
const date = new Date(Number(ts / BigInt(1000000))).toISOString().split('T')[0]
!fingerprints[fingerprint] && promises.push(bulk_labels.add([[
date,
fingerprint,
labels,
labels.name || '',
logType
]]))
fingerprints[fingerprint] = true
}
}
}
await Promise.all(promises)
} catch (error) {
await asyncLogError(error)
res.status(500).send({ error: 'Internal Server Error' })
}
}

function resource2Attrs (resource) {
if (!resource || !resource.attributes) {
return {}
}
const attrs = {}
for (const attribute of resource.attributes) {
attrs[normalizeAttrName(attribute.key)] = anyValueToString(attribute.value)
}
return attrs
}

function normalizeAttrName (name) {
return name.replaceAll(/[^a-zA-Z0-9_]/g, '_')
}

function anyValueToString (value) {
if (!value) {
return ''
}
if (value.stringValue) {
return value.stringValue
}
if (value.boolValue) {
return value.boolValue ? 'true' : 'false'
}
if (value.intValue) {
return value.intValue.toString()
}
if (value.doubleValue) {
return value.doubleValue.toString()
}
if (value.bytesValue) {
return Buffer.from(value.bytesValue).toString('base64')
}
if (value.arrayValue) {
return JSON.stringify(value.arrayValue.values.map(anyValueToString))
}
if (value.kvlistValue) {
return JSON.stringify(value.kvlistValue.values.reduce((agg, pair) => ({
...agg,
[pair.key]: anyValueToString(pair.value)
})))
}
return ''
}

module.exports = handle
177 changes: 177 additions & 0 deletions lib/otlp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,180 @@ message Status {
// The status code.
StatusCode code = 3;
}

// Recource logs definition

message LogsData {
// An array of ResourceLogs.
// For data coming from a single resource this array will typically contain
// one element. Intermediary nodes that receive data from multiple origins
// typically batch the data before forwarding further and in that case this
// array will contain multiple elements.
repeated ResourceLogs resource_logs = 1;
}

// A collection of ScopeLogs from a Resource.
message ResourceLogs {
reserved 1000;

// The resource for the logs in this message.
// If this field is not set then resource info is unknown.
Resource resource = 1;

// A list of ScopeLogs that originate from a resource.
repeated ScopeLogs scope_logs = 2;

// The Schema URL, if known. This is the identifier of the Schema that the resource data
// is recorded in. To learn more about Schema URL see
// https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
// This schema_url applies to the data in the "resource" field. It does not apply
// to the data in the "scope_logs" field which have their own schema_url field.
string schema_url = 3;
}

// A collection of Logs produced by a Scope.
message ScopeLogs {
// The instrumentation scope information for the logs in this message.
// Semantically when InstrumentationScope isn't set, it is equivalent with
// an empty instrumentation scope name (unknown).
InstrumentationScope scope = 1;

// A list of log records.
repeated LogRecord log_records = 2;

// The Schema URL, if known. This is the identifier of the Schema that the log data
// is recorded in. To learn more about Schema URL see
// https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
// This schema_url applies to all logs in the "logs" field.
string schema_url = 3;
}

// Possible values for LogRecord.SeverityNumber.
enum SeverityNumber {
// UNSPECIFIED is the default SeverityNumber, it MUST NOT be used.
SEVERITY_NUMBER_UNSPECIFIED = 0;
SEVERITY_NUMBER_TRACE = 1;
SEVERITY_NUMBER_TRACE2 = 2;
SEVERITY_NUMBER_TRACE3 = 3;
SEVERITY_NUMBER_TRACE4 = 4;
SEVERITY_NUMBER_DEBUG = 5;
SEVERITY_NUMBER_DEBUG2 = 6;
SEVERITY_NUMBER_DEBUG3 = 7;
SEVERITY_NUMBER_DEBUG4 = 8;
SEVERITY_NUMBER_INFO = 9;
SEVERITY_NUMBER_INFO2 = 10;
SEVERITY_NUMBER_INFO3 = 11;
SEVERITY_NUMBER_INFO4 = 12;
SEVERITY_NUMBER_WARN = 13;
SEVERITY_NUMBER_WARN2 = 14;
SEVERITY_NUMBER_WARN3 = 15;
SEVERITY_NUMBER_WARN4 = 16;
SEVERITY_NUMBER_ERROR = 17;
SEVERITY_NUMBER_ERROR2 = 18;
SEVERITY_NUMBER_ERROR3 = 19;
SEVERITY_NUMBER_ERROR4 = 20;
SEVERITY_NUMBER_FATAL = 21;
SEVERITY_NUMBER_FATAL2 = 22;
SEVERITY_NUMBER_FATAL3 = 23;
SEVERITY_NUMBER_FATAL4 = 24;
}

// LogRecordFlags represents constants used to interpret the
// LogRecord.flags field, which is protobuf 'fixed32' type and is to
// be used as bit-fields. Each non-zero value defined in this enum is
// a bit-mask. To extract the bit-field, for example, use an
// expression like:
//
// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK)
//
enum LogRecordFlags {
// The zero value for the enum. Should not be used for comparisons.
// Instead use bitwise "and" with the appropriate mask as shown above.
LOG_RECORD_FLAGS_DO_NOT_USE = 0;

// Bits 0-7 are used for trace flags.
LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF;

// Bits 8-31 are reserved for future use.
}

// A log record according to OpenTelemetry Log Data Model:
// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md
message LogRecord {
reserved 4;

// time_unix_nano is the time when the event occurred.
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
// Value of 0 indicates unknown or missing timestamp.
fixed64 time_unix_nano = 1;

// Time when the event was observed by the collection system.
// For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK)
// this timestamp is typically set at the generation time and is equal to Timestamp.
// For events originating externally and collected by OpenTelemetry (e.g. using
// Collector) this is the time when OpenTelemetry's code observed the event measured
// by the clock of the OpenTelemetry code. This field MUST be set once the event is
// observed by OpenTelemetry.
//
// For converting OpenTelemetry log data to formats that support only one timestamp or
// when receiving OpenTelemetry log data by recipients that support only one timestamp
// internally the following logic is recommended:
// - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano.
//
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
// Value of 0 indicates unknown or missing timestamp.
fixed64 observed_time_unix_nano = 11;

// Numerical value of the severity, normalized to values described in Log Data Model.
// [Optional].
SeverityNumber severity_number = 2;

// The severity text (also known as log level). The original string representation as
// it is known at the source. [Optional].
string severity_text = 3;

// A value containing the body of the log record. Can be for example a human-readable
// string message (including multi-line) describing the event in a free form or it can
// be a structured data composed of arrays and maps of other values. [Optional].
AnyValue body = 5;

// Additional attributes that describe the specific event occurrence. [Optional].
// Attribute keys MUST be unique (it is not allowed to have more than one
// attribute with the same key).
repeated KeyValue attributes = 6;
uint32 dropped_attributes_count = 7;

// Flags, a bit field. 8 least significant bits are the trace flags as
// defined in W3C Trace Context specification. 24 most significant bits are reserved
// and must be set to 0. Readers must not assume that 24 most significant bits
// will be zero and must correctly mask the bits when reading 8-bit trace flag (use
// flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional].
fixed32 flags = 8;

// A unique identifier for a trace. All logs from the same trace share
// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR
// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON
// is zero-length and thus is also invalid).
//
// This field is optional.
//
// The receivers SHOULD assume that the log record is not associated with a
// trace if any of the following is true:
// - the field is not present,
// - the field contains an invalid value.
bytes trace_id = 9;

// A unique identifier for a span within a trace, assigned when the span
// is created. The ID is an 8-byte array. An ID with all zeroes OR of length
// other than 8 bytes is considered invalid (empty string in OTLP/JSON
// is zero-length and thus is also invalid).
//
// This field is optional. If the sender specifies a valid span_id then it SHOULD also
// specify a valid trace_id.
//
// The receivers SHOULD assume that the log record is not associated with a
// span if any of the following is true:
// - the field is not present,
// - the field contains an invalid value.
bytes span_id = 10;
}
27 changes: 26 additions & 1 deletion parsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const path = require('path')
const WriteRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'prompb.proto')).lookupType('WriteRequest')
const PushRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'loki.proto')).lookupType('PushRequest')
const OTLPTraceData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('TracesData')
const OTLPLogsData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('LogsData')
const { parse: queryParser } = require('fast-querystring')

/**
Expand Down Expand Up @@ -202,6 +203,29 @@ function tempoNDJsonParser (req, payload) {
return parser
}

/**
*
* @param req {FastifyRequest}
* @param payload {Stream}
* @returns {*}
*/
async function otlpLogsDataParser (req, payload) {
const length = getContentLength(req, 5e6)
await shaper.register(length)
let body = []
const otelStream = stream.Readable.from(payload)
otelStream.on('data', data => {
body.push(data)
})
await new Promise(resolve => otelStream.once('end', resolve))
body = Buffer.concat(body)
body = OTLPLogsData.toObject(OTLPLogsData.decode(body), {
longs: String,
bytes: String
})
return body
}

/**
*
* @param subparsers {function(FastifyRequest): Promise<*|undefined>}
Expand Down Expand Up @@ -363,5 +387,6 @@ module.exports = {
tempoNDJsonParser,
otlpPushProtoParser,
wwwFormParser,
parsers
parsers,
otlpLogsDataParser
}
7 changes: 6 additions & 1 deletion qryn_bun.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
combinedParser,
jsonParser,
lokiPushJSONParser,
lokiPushProtoParser, otlpPushProtoParser, prometheusPushProtoParser,
lokiPushProtoParser, otlpLogsDataParser, otlpPushProtoParser, prometheusPushProtoParser,
rawStringParser,
tempoPushNDJSONParser,
tempoPushParser, wwwFormParser, yamlParser
Expand Down Expand Up @@ -56,6 +56,7 @@ import handlerPromGetRules from './lib/handlers/alerts/prom_get_rules.js'
import handlerTail from './lib/handlers/tail.js'
import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js'
import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js'
import handlerOtlpLogsPush from './lib/handlers/otlp_log_push.js'
import {init as pyroscopeInit } from './pyroscope/pyroscope.js'

import { boolEnv, readonly, readerMode, writerMode } from './common.js'
Expand Down Expand Up @@ -332,6 +333,10 @@ export default async() => {

readerMode && pyroscopeInit(fastify)

writerMode && fastify.post('/v1/logs', handlerOtlpLogsPush, {
'*': otlpLogsDataParser
})

const serveView = fs.existsSync(path.join(__dirname, 'view/index.html'))
if (serveView) {
app.plug(group(path.join(__dirname, 'view')));
Expand Down
9 changes: 6 additions & 3 deletions qryn_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ const {
shaper,
parsers,
lokiPushJSONParser, lokiPushProtoParser, jsonParser, rawStringParser, tempoPushParser, tempoPushNDJSONParser,
yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser
yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser, otlpLogsDataParser
} = require('./parsers')

const fastifyPlugin = require('fastify-plugin')



let fastify = require('fastify')({
logger,
bodyLimit: parseInt(process.env.FASTIFY_BODYLIMIT) || 5242880,
Expand Down Expand Up @@ -460,6 +458,11 @@ let fastify = require('fastify')({

readerMode && require('./pyroscope/pyroscope').init(fastify)

const handleOTLPLogs = require('./lib/handlers/otlp_log_push').bind(this)
writerMode && fastify.post('/v1/logs', handleOTLPLogs, {
'*': otlpLogsDataParser
})

// Run API Service
fastify.listen(
{
Expand Down

0 comments on commit 37097df

Please sign in to comment.