Skip to content

Commit

Permalink
Add log data type support (#11)
Browse files Browse the repository at this point in the history
* Add PBs for logs.

Fix compile warnings.

Add compiled protos.

Switch to precompiled protobufjs. Update readme.

Add protobuf build helpers. Add request retry support.

Fix tests.

Readme typo fixes

* Change default retry timeouts.
  • Loading branch information
kkuzmin authored Nov 26, 2018
1 parent ea068ab commit fc85bc5
Show file tree
Hide file tree
Showing 25 changed files with 7,820 additions and 60 deletions.
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
.PHONY: test
.PHONY: test deps all compile clean pb pb-clean

PROTO_DIR=./proto


all: test

deps: node_modules
deps: node_modules pb

node_modules:
npm install
Expand All @@ -18,3 +21,10 @@ clean:
rm -f package-lock.json
rm -f test/report.xml
rm -rf ./coverage/

pb: $(PROTO_DIR)
cd $(PROTO_DIR) && make compile

pb-clean: $(PROTO_DIR)
cd $(PROTO_DIR) && make clean

11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ For example:
}
```

# Compiling proto

You will need `pbjs` tool in order to compile protobuf definitions located in [proto](proto):

```
npm install -g protobufjs
make pb-clean
make pb
```

# Debugging

To get a debug trace, set an Node.js environment variable called DEBUG and
Expand Down Expand Up @@ -77,3 +87,4 @@ See [debug](https://www.npmjs.com/package/debug) for further details.
- [Node.js static code analysis tool](http://jshint.com/install/)
- [Node.js rewire testing tool](https://github.com/jhnns/rewire)
- [Node.js sinon testing tool](http://sinonjs.org/)
- [pbjs manual](http://dcode.io/protobuf.js/#pbjs-for-javascript)
247 changes: 247 additions & 0 deletions al_log.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/* -----------------------------------------------------------------------------
* @copyright (C) 2018, Alert Logic, Inc
* @doc
*
* Helper utilities for Alert Logic log collector.
*
* @end
* -----------------------------------------------------------------------------
*/
const crypto = require('crypto');
const async = require('async');
const zlib = require('zlib');

const alcHealthPb = require('./proto/alc_health.piqi_pb').alc_health;
const commonProtoPb = require('./proto/common_proto.piqi_pb').common_proto;
const dictPb = require('./proto/dict.piqi_pb').alc_dict;
const hostMetadataPb = require('./proto/host_metadata.piqi_pb').host_metadata;

const PAYLOAD_BATCH_SIZE = 700000;

/**
* @function builds incoming log messages into protobuf and compresses it. The payload returned is
* ready to be passed to Ingest client for transport.
*
* @param hostId - host uuid obtained at collector registration
* @param sourceId - source/collector id obtained at collector registration
* @param hostmetaElems - a list of hostmeta JSON objects. For example,
* var hostTypeElem = {
* key: 'host_type',
* value: {str: 'azure_fun'}
* };
* var localHostnameElem = {
* key: 'local_hostname',
* value: {str: process.env.WEBSITE_HOSTNAME}
* };
* var hostmetaElems = [hostTypeElem, localHostnameElem];
* Consult 'metadata' definition in ./proto/host_metadata.piqi.proto
* @param content - a list of log messages to be ingested. Content should be batched on the caller level.
* @param parseCallback(message) - a function to parse a log message into a JSON object which is converted into protobuf.
* The parse callback is expected to construct the following object out of each log message:
* var parsedMessage = {
* messageTs: 1542138053,
* priority: 11,
* progName: 'o365webhook',
* pid: undefined,
* message: 'some message string',
* messageType: 'json/azure.o365',
* messageTypeId: 'AzureActiveDirectory',
* messageTsUs: undefined
* };
* Consult 'collected_message' definition in proto/common_proto.piqi.proto
* @param callback
*
* @return callback - (error, builtPayload, payloadSize)
* @NOTE: Batch size should be tweaked on a caller level in order to avoid "Maximum payload size exceeded" errors.
* For an Azure function consult eventHub.maxBatchSize property in host.json.
* For an AWS Lambda via kinesis trigger batch size configuration.
*/

var buildPayload = function (hostId, sourceId, hostmetaElems, content, parseCallback, callback) {
async.waterfall([
function(callback) {
buildMessages(content, parseCallback, function(err, msg) {
return callback(err, msg);
});
},
function(msg, callback) {
buildHostmeta(hostId, hostmetaElems, function(err, meta) {
return callback(err, meta, msg);
});
},
function(meta, msg, callback) {
buildBatch(sourceId, meta, msg, function(err, batch) {
return callback(err, batch);
});
},
function(batchBuf, callback) {
buildBatchList(batchBuf, function(err, batchList) {
return callback(err, batchList);
});
},
function(batchList, callback) {
var batchListType = commonProtoPb.collected_batch_list;
var buf = batchListType.encode(batchList).finish();
return callback(null, buf);
}],
function(err, result) {
if (err) {
return callback(err);
} else {
zlib.deflate(result, function(err, compressed) {
if (err) {
return callback(err);
} else {
var payloadSize = compressed.byteLength;
if (payloadSize > PAYLOAD_BATCH_SIZE) {
return callback(`Maximum payload size exceeded: ${payloadSize}`, compressed);
} else {
return callback(null, compressed);
}
}
});
}
});
};

/**
*
* Private functions
*
*/

function buildType(type, payload, callback) {
var verify = type.verify(payload);
if (verify)
return callback(verify);

var payloadCreated = type.create(payload);

return callback(null, payloadCreated);
}


function buildTypeSync(type, payload) {
var verify = type.verify(payload);
if (verify)
throw(verify);

return type.create(payload);
}

/**
* @function build hostmeta protobuf out of a list of {key, value} metadata pairs.
*
* @param hostId - a host uuid obtain at collector registration.
* @param hostmetaElems - a list of metadata JSON objects. For example,
* var hostTypeElem = {
* key: 'host_type',
* value: {str: 'azure_fun'}
* };
* var localHostnameElem = {
* key: 'local_hostname',
* value: {str: process.env.WEBSITE_HOSTNAME}
* };
* var hostmetaElems = [hostTypeElem, localHostnameElem];
*
* @param callback
* @returns callback
*/

function buildHostmeta(hostId, hostmetaElems, callback) {
var hostmetaType = hostMetadataPb.metadata;
var hostmeta = {
elem : hostmetaElems
};
buildType(dictPb.dict, hostmeta, function(err, hostmetaData){
if (err) {
return callback(err);
} else {
var meta = {
hostUuid : hostId,
data : hostmetaData,
dataChecksum : new Buffer('')
};
var sha = crypto.createHash('sha1');
var hashPayload = hostmetaType.encode(meta).finish();
hashValue = sha.update(hashPayload).digest();

var metadataPayload = {
hostUuid : hostId,
dataChecksum : hashValue,
timestamp : Math.floor(Date.now() / 1000),
data : hostmetaData
};

return buildType(hostmetaType, metadataPayload, callback);
}
});
}

/**
* @function builds protobuf out of JSON definition of a log message
* @param content - raw log messages retrieved from a source.
* @param parseContentFun - function to parse a single message into a json structure. For example,
* var parsedMessage = {
* messageTs: 1542138053,
* priority: 11,
* progName: 'o365webhook',
* pid: undefined,
* message: 'some message string',
* messageType: 'json/azure.o365',
* messageTypeId: 'AzureActiveDirectory',
* messageTsUs: undefined
* };
* Consult 'collected_message' definition in proto/common_proto.piqi.proto
* @param callback
* @returns
*/

function buildMessages(content, parseContentFun, callback) {
async.reduce(content, [], function(memo, item, callback) {
var messageType = commonProtoPb.collected_message;
var messagePayload = parseContentFun(item);
buildType(messageType, messagePayload, function(err, buf) {
if (err) {
return callback(err);
} else {
memo.push(buf);
return callback(err, memo);
}
});
},
callback);
}

function buildBatch(sId, metadata, messages, callback) {
var batchType = commonProtoPb.collected_batch;

var batchPayload = {
sourceId: sId,
metadata: metadata,
message: messages
};

buildType(batchType, batchPayload, callback);
}

function buildBatchList(batches, callback) {
var batchListType = commonProtoPb.collected_batch_list;

var batchListPayload = {
elem: [batches]
};

buildType(batchListType, batchListPayload, callback);
}


module.exports = {
AlcHealthPb : alcHealthPb,
CommonProto : commonProtoPb,
DictPb : dictPb,
HostMetadataPb : hostMetadataPb,
buildPayload : buildPayload,
PAYLOAD_BATCH_SIZE : PAYLOAD_BATCH_SIZE
};

Loading

0 comments on commit fc85bc5

Please sign in to comment.