Skip to content

Commit

Permalink
Process dl (#11)
Browse files Browse the repository at this point in the history
* Add skeleton for DLBlob function.

Fix typo

Randomize DLBlob timer trigger

Stringify blob json

List blobs with prefix

Refactor. Add DLBlob function implementation.

Delete processed DL blob

Fixes. Add DLBlob test skeleton

Add DLBlob readme section.

Fixes. Unit tests unfinished.

* Change the way processed and skipped records are calculated.

* Append unprocessed records.

* Stringify DL. Add common ehub collector skeleton.

* Add more DLBlob function tests

* Address comments

* Dedup tests.
  • Loading branch information
kkuzmin authored Jan 29, 2019
1 parent 630d478 commit dc7a482
Show file tree
Hide file tree
Showing 22 changed files with 630 additions and 146 deletions.
11 changes: 11 additions & 0 deletions DLBlob/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"bindings": [
{
"name": "AlertlogicDLBlobTimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */15 * * * *"
}
],
"disabled": false
}
89 changes: 89 additions & 0 deletions DLBlob/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/* ----------------------------------------------------------------------------
* @copyright (C) 2018, Alert Logic, Inc
* @doc
*
* The purpose of this function is to process dead letter blobs generated by
* EHubActivityLogs and EhubGeneral functions.
* Dead letter blobs are located in 'alertlogic-dl' container located in the
* web application storage account.
*
* @end
* ----------------------------------------------------------------------------
*/

const async = require('async');
const azure = require('azure');

const ehubCollector = require('../common/ehub_collector');
const ehubActivityLogsFormat = require('../EHubActivityLogs/format').logRecord;
const ehubGeneralFormat = require('../EHubGeneral/format').logRecord;

const CONCURRENT_BLOB_PROCESS_NUM = 20;

function getCollectorFunName(blobName) {
return blobName.split('/')[1];
}

var collectorProcessError = function(context, err, messages) {
context.log.error('Error processing batch:', err);
var skipped = messages.records ? messages.records.length : messages.length;
return skipped;
};

function processDLBlob(blobService, context, blob, callback) {
context.log.verbose('Processing blob: ', blob.name);
var collectorFormatFun;

switch(getCollectorFunName(blob.name)) {
case 'ehubactivitylogs':
collectorFormatFun = ehubActivityLogsFormat;
break;
default:
collectorFormatFun = ehubGeneralFormat;
break;
}

async.waterfall([
function(callback) {
return blobService.getBlobToText(process.env.APP_DL_CONTAINER_NAME, blob.name, callback);
},
function(blobData, blobReq, blobResp, callback) {
try {
return ehubCollector(context, JSON.parse(blobData), collectorFormatFun, collectorProcessError, callback);
} catch (ex) {
return callback(ex);
}
},
function(result, callback) {
if (result.skipped === 0) {
return blobService.deleteBlob(process.env.APP_DL_CONTAINER_NAME, blob.name, callback);
} else {
return callback(null, result);
}
}
], callback);
}

module.exports = function (context, AlertlogicDLBlobTimer) {
const blobService = azure.createBlobService(process.env.AzureWebJobsStorage);
const options = {
maxResults: parseInt(process.env.DL_BLOB_PAGE_SIZE)
};
blobService.listBlobsSegmentedWithPrefix(
process.env.APP_DL_CONTAINER_NAME,
process.env.WEBSITE_SITE_NAME,
null, options,
function(listErr, data) {
if (listErr) {
context.done(listErr);
} else {
context.log.verbose('Listed blobs: ', data.entries.length);
async.eachLimit(data.entries, CONCURRENT_BLOB_PROCESS_NUM, function(blob, callback) {
return processDLBlob(blobService, context, blob, callback);
}, function(processErr) {
context.done(processErr);
});
}
});
};

29 changes: 29 additions & 0 deletions EHubActivityLogs/format.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* -----------------------------------------------------------------------------
* @copyright (C) 2018, Alert Logic, Inc
* @doc
*
* The function to format records from 'insights-operational-logs'
* @end
* -----------------------------------------------------------------------------
*/

const parse = require('../common/parse');

const logRecord = function(msg) {
const ts = parse.getMsgTs(msg);
const typeId = parse.getMsgTypeId(msg);
return {
messageTs: ts.sec,
priority: 11,
progName: 'EHubActivityLogs',
pid: undefined,
message: JSON.stringify(msg),
messageType: 'json/azure.ehub',
messageTypeId: typeId,
messageTsUs: ts.usec
};
};

module.exports = {
logRecord: logRecord
};
2 changes: 1 addition & 1 deletion EHubActivityLogs/function.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
{
"name": "dlBlob",
"type": "blob",
"path": "alertlogic-dl/%WEBSITE_SITE_NAME%/ehubactivitylogs/{DateTime}",
"path": "%APP_DL_CONTAINER_NAME%/%WEBSITE_SITE_NAME%/ehubactivitylogs/{DateTime}",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
Expand Down
20 changes: 4 additions & 16 deletions EHubActivityLogs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,12 @@
*/

const ehubCollector = require('../common/ehub_collector');
const parse = require('../common/parse');
const formatLogs = require('./format').logRecord;

var formatActivityLogRecord = function(msg) {
const ts = parse.getMsgTs(msg);
const typeId = parse.getMsgTypeId(msg);
return {
messageTs: ts.sec,
priority: 11,
progName: 'EHubActivityLogs',
pid: undefined,
message: JSON.stringify(msg),
messageType: 'json/azure.ehub',
messageTypeId: typeId,
messageTsUs: ts.usec
};
};

module.exports = function (context, eventHubMessages) {
return ehubCollector(context, eventHubMessages, formatActivityLogRecord);
return ehubCollector(context, eventHubMessages, formatLogs, null, function(err) {
context.done(err);
});
};

34 changes: 34 additions & 0 deletions EHubGeneral/format.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* -----------------------------------------------------------------------------
* @copyright (C) 2018, Alert Logic, Inc
* @doc
*
* The function to format 'alertlogic-log' records.
*
* @end
* -----------------------------------------------------------------------------
*/

const parse = require('../common/parse');

const logRecord = function(msg) {
const ts = parse.getMsgTs(msg);
const typeId = parse.getMsgTypeId(msg);
return {
messageTs: ts.sec,
priority: 11,
progName: 'EHubGeneral',
pid: undefined,
message: JSON.stringify(msg),
messageType: 'json/azure.ehub',
messageTypeId: typeId,
messageTsUs: ts.usec
};
};

module.exports = {
logRecord: logRecord
};




2 changes: 1 addition & 1 deletion EHubGeneral/function.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
{
"name": "dlBlob",
"type": "blob",
"path": "alertlogic-dl/%WEBSITE_SITE_NAME%/ehubgeneral/{DateTime}",
"path": "%APP_DL_CONTAINER_NAME%/%WEBSITE_SITE_NAME%/ehubgeneral/{DateTime}",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
Expand Down
22 changes: 4 additions & 18 deletions EHubGeneral/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,11 @@


const ehubCollector = require('../common/ehub_collector');
const parse = require('../common/parse');

var formatGeneralLogRecord = function(msg) {
const ts = parse.getMsgTs(msg);
const typeId = parse.getMsgTypeId(msg);
return {
messageTs: ts.sec,
priority: 11,
progName: 'EHubGeneral',
pid: undefined,
message: JSON.stringify(msg),
messageType: 'json/azure.ehub',
messageTypeId: typeId,
messageTsUs: ts.usec
};
};
const formatLogs = require('./format').logRecord;

module.exports = function (context, eventHubMessages) {
return ehubCollector(context, eventHubMessages, formatGeneralLogRecord);
return ehubCollector(context, eventHubMessages, formatLogs, null, function(err) {
context.done(err);
});
};


2 changes: 2 additions & 0 deletions Master/healthcheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* Various Event hub collector health checks.
* The last error code is EHUB000006
*
* TODO: check 'alertlogic-dl' container exists.
*
* @end
* ----------------------------------------------------------------------------
*/
Expand Down
2 changes: 1 addition & 1 deletion Master/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const pkg = require('../package.json');
const AlAzureMaster = require('al-azure-collector-js').AlAzureMaster;
const healthcheck = require('./healthcheck');

const APP_FUNCTIONS = ['Master', 'Updater', 'EHubGeneral', 'EHubActivityLogs'];
const APP_FUNCTIONS = ['Master', 'Updater', 'EHubGeneral', 'EHubActivityLogs', 'DLBlob'];

module.exports = function (context, AlertlogicMasterTimer) {
const healthFuns = [
Expand Down
8 changes: 8 additions & 0 deletions PostDeploymentActions/updateDLBlobTimer.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
$date = Get-Date
$min = ($date.Minute + 1) % 15
$sec = $date.Second
$new_schedule = "$sec $min-59/15 * * * *"
Write-Output "Updating DLBlob timer trigger with ($new_schedule)."
$dlblob_function = Get-Content '..\\wwwroot\\DLBlob\\function.json' -raw | ConvertFrom-Json
$dlblob_function.bindings | % {if($_.name -eq 'AlertlogicDLBlobTimer'){$_.schedule=$new_schedule}}
$dlblob_function | ConvertTo-Json | set-content '..\\wwwroot\\DLBlob\\function.json'
6 changes: 3 additions & 3 deletions PostDeploymentActions/updateUpdaterTimer.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ $randS = Get-Random -minimum 0 -maximum 59
$randH12 = $randH + 12
$new_schedule = "$randS $randM $randH,$randH12 * * *"
Write-Output "Updating Updater timer trigger with ($new_schedule)".
$master_function = Get-Content '..\\wwwroot\\Updater\\function.json' -raw | ConvertFrom-Json
$master_function.bindings | % {if($_.name -eq 'AlertlogicUpdaterTimer'){$_.schedule=$new_schedule}}
$master_function | ConvertTo-Json | set-content '..\\wwwroot\\Updater\\function.json'
$updater_function = Get-Content '..\\wwwroot\\Updater\\function.json' -raw | ConvertFrom-Json
$updater_function.bindings | % {if($_.name -eq 'AlertlogicUpdaterTimer'){$_.schedule=$new_schedule}}
$updater_function | ConvertTo-Json | set-content '..\\wwwroot\\Updater\\function.json'
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ Collected JSON objects are wrapped into the protobuf [structure](https://github.
The `EHubGeneral` function listens to `alertlogic-log` which is created during [collector setup](#deploy-with-the-custom-arm-template-in-an-azure-subscription). The `alertlogicloghub` event hub can be used for integration with, for example, [diagnostic logs](https://docs.microsoft.com/en-us/azure/azure-monitor/platform/diagnostic-logs-stream-event-hubs) or [Azure AD logs](https://docs.microsoft.com/en-us/azure/active-directory/reports-monitoring/tutorial-azure-monitor-stream-logs-to-event-hub).
Collected JSON objects are wrapped into the protobuf [structure](https://github.com/alertlogic/al-collector-js/blob/master/proto/common_proto.piqi.proto) and then are forwarded to the Alert Logic Ingestion service.
## DLBlob Function
Both `EHubActivityLogs` and `EHubGeneral` may not be able to process incoming event hub records. If that happens then unprocessed messages are saved as blobs to the `alertlogic-dl` container so that collection can be retried at a later time. The `alertlogic-dl` container is located in the collector web application storage account which is created durign collector setup.
The `DLBlob` function processes dead letter blobs very 15 minutes. The `DLBlob` function lists all blobs located in `alertlogic-dl` container and processes them according to the function which dead letter blob belongs to. Once a blob is processed it gets removed from the container.
# Local Development
1. Clone the repo `git clone [email protected]:alertlogic/ehub-collector.git`.
Expand Down
48 changes: 32 additions & 16 deletions common/ehub_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,53 @@ const async = require('async');
const pkg = require('../package.json');
const AlAzureCollector = require('al-azure-collector-js').AlAzureCollector;

var processError = function(context, err, messages) {
const defaultProcessError = function(context, err, messages) {
context.log.error('Error processing batch:', err);
var skipped = messages.records ? messages.records.length : messages.length;
context.log.error('Records skipped:', skipped);
context.bindings.dlBlob = JSON.stringify(messages);
return context;
const skipped = messages.records ? messages.records.length : messages.length;
if (context.bindings.dlBlob && context.bindings.dlBlob instanceof Array) {
context.bindings.dlBlob.append([messages]);
} else {
context.bindings.dlBlob = [messages];
}
return skipped;
};

module.exports = function (context, eventHubMessages, parseFun) {
module.exports = function (context, eventHubMessages, parseFun, processErrorFun, callback) {
var processError = processErrorFun ? processErrorFun : defaultProcessError;
var collector = new AlAzureCollector(context, 'ehub', pkg.version);
async.filter(eventHubMessages,
function(msgArray, callback) {
async.reduce(eventHubMessages, {processed: 0, skipped: 0},
function(acc, msgArray, callback) {
try {
collector.processLog(msgArray.records, parseFun, null,
function(err) {
if (err) {
processError(context, err, msgArray);
acc.skipped += processError(context, err, msgArray);
} else {
acc.processed += msgArray.records.length;
}
return callback(null, !err);
return callback(null, acc);
});
} catch (exception) {
processError(context, exception, msgArray);
return callback(null, false);
acc.skipped += processError(context, exception, msgArray);
return callback(null, acc);
}
},
function(err, mapResult) {
function(err, redResult) {
if (err) {
processError(context, err, eventHubMessages);
const skipped = processError(context, err, eventHubMessages);
context.log.error('Records skipped:', skipped);
return callback(err);
} else {
context.log.info('Processed:', mapResult.reduce((a, b) => a + b.records.length, 0));
context.log.info('Processed:', redResult.processed);
if (redResult.skipped) {
context.log.info('Records skipped:', redResult.skipped);
}

}
context.done();

if (context.bindings.dlBlob) {
context.bindings.dlBlob = JSON.stringify(context.bindings.dlBlob);
}
return callback(null, redResult);
});
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
"nyc": "^11.3.0",
"pre-commit": "^1.2.2",
"rewire": "^2.5.2",
"sinon": "^3.3.0"
"sinon": "^7.2.3"
}
}
Loading

0 comments on commit dc7a482

Please sign in to comment.