Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAWS][ENG-55567] Replace GCP collector npm library google-cloud-logging to googleapis to convert protopayload to json #372

Merged
2 changes: 1 addition & 1 deletion collectors/ciscomeraki/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class CiscomerakiCollector extends PawsCollector {
await merakiClient.uploadNetworksListInS3Bucket(keyValue, networks);
});
}
} else if (networksFromS3 && collector._isFileMissingError(networksFromS3.code)) {
} else if (networksFromS3 && collector._isFileMissingError(networksFromS3.Code)) {
AlLogger.debug(`CMRI0000026 networks ${JSON.stringify(params)} ${JSON.stringify(networks)}`);
await merakiClient.uploadNetworksListInS3Bucket(keyValue, networks);
}
Expand Down
2 changes: 1 addition & 1 deletion collectors/ciscomeraki/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ciscomeraki-collector",
"version": "1.0.1",
"version": "1.0.2",
"description": "Alert Logic AWS based Cisco Meraki Log Collector",
"repository": {},
"private": true,
Expand Down
106 changes: 49 additions & 57 deletions collectors/googlestackdriver/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,36 @@ const PawsCollector = require('@alertlogic/paws-collector').PawsCollector;
const calcNextCollectionInterval = require('@alertlogic/paws-collector').calcNextCollectionInterval;
const parse = require('@alertlogic/al-collector-js').Parse;
const AlLogger = require('@alertlogic/al-aws-collector-js').Logger;
const logging = require('@google-cloud/logging');
const packageJson = require('./package.json');
const protoFiles = require('google-proto-files');
const { auth } = require("google-auth-library");
const { google } = require("googleapis");

const API_THROTTLING_ERROR = 8;
const API_THROTTLING_STATUS_CODE = 429;
const MAX_POLL_INTERVAL = 900;
const MAX_PAGE_SIZE = 1000;
const AUDIT_PAYLOAD_TYPE_URL = 'type.googleapis.com/google.cloud.audit.AuditLog';
const SCOPES = [
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/cloud-platform.read-only',
'https://www.googleapis.com/auth/logging.admin',
'https://www.googleapis.com/auth/logging.read',
'https://www.googleapis.com/auth/logging.write',
];

const typeIdPaths = [
{path: ['jsonPayload', 'fields', 'event_type', 'stringValue']},
{path: ['protoPayload', 'type_url']},
{path: ['jsonPayload']},
{path: ['protoPayload', '@type']},
{path: ['payload']}
];

const tsPaths = [{ path: ["timestamp"] }];

class GooglestackdriverCollector extends PawsCollector {

constructor(context, creds){
super(context, creds, packageJson.version);
this._initAuditLogDecoder();
}

_initAuditLogDecoder() {
const protoPath = protoFiles.getProtoPath('cloud', 'audit', 'audit_log.proto');
const root = protoFiles.loadSync(protoPath);
const auditLogDecoder = root.lookupType('google.cloud.audit.AuditLog');
this._auditLogDecoder = auditLogDecoder;
}

pawsInitCollectionState(event, callback) {
const startTs = process.env.paws_collection_start_ts ?
Expand Down Expand Up @@ -79,9 +80,18 @@ class GooglestackdriverCollector extends PawsCollector {
if (!state.stream) {
state = collector.setStreamToCollectionState(state);
}
const keysEnvVar = collector.secret;
if (!keysEnvVar) {
throw new Error("The $CREDS environment variable was not found!");
}
// Start API client
const client = new logging.v2.LoggingServiceV2Client({
credentials: JSON.parse(collector.secret)
const keys = JSON.parse(keysEnvVar);
const client = auth.fromJSON(keys);
client.subject = collector.clientId;
client.scopes = SCOPES;
const logging = google.logging({
version: 'v2',
auth: client,
});


Expand All @@ -92,49 +102,46 @@ class GooglestackdriverCollector extends PawsCollector {
timestamp < "${state.until}"`;

let pagesRetireved = 0;
const options = {autoPaginate: false};


const paginationCallback = (result, acc = []) => {
AlLogger.info(`Getting page: ${pagesRetireved + 1} Logs retrieved: ${result[0].length}`);
let logs = result.data.entries || [];
AlLogger.info(`Getting page: ${pagesRetireved + 1} Logs retrieved: ${logs.length}`);
pagesRetireved++;
//decode the protoPayload if it's an AuditLog message
let logs = result[0].map(function (logEntry) {
return collector.decodeProtoPayload(logEntry);
});

const nextPage = result[1];
const nextPage = { ...params, pageToken: result.data.nextPageToken };
const newAcc = [...acc, ...logs];
AlLogger.info(`Total Logs ${newAcc.length}`);

if(nextPage && pagesRetireved < process.env.paws_max_pages_per_invocation){
if (nextPage.pageToken && pagesRetireved < process.env.paws_max_pages_per_invocation) {

return client.listLogEntries(nextPage, options)
.then(res => paginationCallback(res, newAcc));
} else{
return {logs: newAcc, nextPage};
return logging.entries.list(params)
.then(res => {
return paginationCallback(res, newAcc)
});
} else {
return { logs: newAcc, nextPage };
}
};

const pageSize = state.pageSize > 0 ? state.pageSize : MAX_PAGE_SIZE;
let params = state.nextPage ?
state.nextPage:
state.nextPage :
{
filter,
pageSize: pageSize,
resourceNames:[state.stream]
resourceNames: [state.stream]
};

client.listLogEntries(params, options)
logging.entries.list(params)
.then(paginationCallback)
.then(({logs, nextPage}) => {
.then(({ logs, nextPage }) => {

const newState = collector._getNextCollectionState(state, nextPage);
AlLogger.info(`GSTA000002 Next collection in ${newState.poll_interval_sec} seconds`);

return callback(null, logs, newState, newState.poll_interval_sec);
})
.catch(err => {
AlLogger.error(`GSTA000003 err in collection ${JSON.stringify(err.details)}`);

AlLogger.error(`GSTA000003 err in collection ${JSON.stringify(err)}`);
// Stackdriver Logging api has some rate limits that we might run into.
// If we run inot a rate limit error, instead of returning the error,
// we return the state back to the queue with an additional second added, up to 15 min
Expand All @@ -147,54 +154,40 @@ timestamp < "${state.until}"`;
const interval = state.poll_interval_sec < 60 ? 60 : state.poll_interval_sec;
const nextPollInterval = state.poll_interval_sec < MAX_POLL_INTERVAL ?
interval + 60 : MAX_POLL_INTERVAL;

if (state.nextPage && state.nextPage.pageSize) {
if (state.nextPage && state.nextPage.pageToken && state.nextPage.pageSize) {
state.nextPage.pageSize = Math.ceil(state.nextPage.pageSize / 2);
AlLogger.debug(`Throttling error with nextPage: ${err.message}. Retrying with smaller pageSize.`);
} else {
if (currentInterval <= 15 && err.details.includes('Received message larger than max')) {
if (currentInterval <= 15 && err.message && err.message.indexOf('Received message larger than max') >= 0) {
state.pageSize = state.pageSize ? Math.ceil(state.pageSize / 2) : Math.ceil(params.pageSize / 2);
AlLogger.debug(`Throttling error with no nextPage and large message: ${err.message}. Reducing pageSize.`);
} else {
state.until = moment(state.since).add(Math.ceil(currentInterval / 2), 'seconds').toISOString();
AlLogger.debug(`Throttling error with no nextPage: ${err.message}. Reducing time range.`);
}
}
const backOffState = Object.assign({}, state, {poll_interval_sec:nextPollInterval});
const backOffState = Object.assign({}, state, { poll_interval_sec: nextPollInterval });
collector.reportApiThrottling(function () {
return callback(null, [], backOffState, nextPollInterval);
});
} else {
// set errorCode if not available in error object to showcase client error on DDMetrics
if (err.code) {
// set errorCode if not available in error object to showcase client error on DDMetrics
if (err.code) {
err.errorCode = err.code;
}
return callback(err);
}
});
}

decodeProtoPayload(logEntry) {
let collector = this;
if (logEntry.protoPayload && (logEntry.protoPayload.type_url === AUDIT_PAYLOAD_TYPE_URL)) {
try {
const buffer = Buffer.from(logEntry.protoPayload.value);
let decodedData = collector._auditLogDecoder.decode(buffer);
logEntry.protoPayload.value = decodedData.toJSON();
} catch(error) {
AlLogger.error(`Error decoding data ${error}`);
}
}
return logEntry;
}

_getNextCollectionState(curState, nextPage) {
// Reset the page size for the next collection if it's less than the maximum
const pageSize = Math.max(MAX_PAGE_SIZE, nextPage?.pageSize || curState.pageSize || MAX_PAGE_SIZE);

const { stream, since, until } = curState;

if (nextPage) {
if (nextPage && nextPage.pageToken) {
// Case: Continue with the next page
return {
since,
Expand Down Expand Up @@ -222,15 +215,14 @@ timestamp < "${state.until}"`;
// TODO: probably need to actually decode hte protobuf payload on these logs
pawsFormatLog(msg) {
let collector = this;

const ts = msg.timestamp ? msg.timestamp : {seconds: Date.now() / 1000};
const ts = parse.getMsgTs(msg, tsPaths);

const typeId = parse.getMsgTypeId(msg, typeIdPaths);

let formattedMsg = {
// TODO: figure out if this TS is always a string or if they API is goofy...
hostname: collector.collector_id,
messageTs: parseInt(ts.seconds),
messageTs: ts.sec,
priority: 11,
progName: 'GooglestackdriverCollector',
message: JSON.stringify(msg),
Expand Down
36 changes: 18 additions & 18 deletions collectors/googlestackdriver/local/events/event_poll.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
{
"Records": [
{
"messageId": "f67a45fc-ab43-4df1-9f2f-585bb2043122",
"receiptHandle": "AQEB78LmKaaZ+uj8DVnc/O2zQcAe+qKSi3ZGTSBFssIRSmpwwzUEi2vhPTaIBnhOoh0aoRxVjWdoXO3ZloINfMxmcjycP3KC0WXwcWokoOc3iMCdqhYg0NcOhQW1X0ixc79C9/5/XF1xGd79vLhFL7KvRjjiT4sOaSxlAv6v2fJ5eDETnp7CRa5pocCF4EO2su0M4/TnlLreGfsY+C+/tH+r19AM+d3Jt5dbNMrKMWRRZ7/PTjczkIM7U38AHPuusuBz9uzA5yMQGOMI8FPXfqgcafEN17JqKuNSd/l54v1+s9rBQSzL/MH9wZ0XpZditcpe5pTc+dGuRHOXbFK2A0YUQCvRq1Ed8tfr68uELTQK5jWHH/zPnEMWsTyco9VuNCKr",
"body": "{\n \"priv_collector_state\": {\n \"since\": \"2019-01-01T10:00:00Z\",\n \"until\": \"2020-01-01T10:30:00Z\"\n }\n}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1574159909907",
"SenderId": "some-id",
"ApproximateFirstReceiveTimestamp": "1574159909968"
},
"messageAttributes": {},
"md5OfBody": "1845296051d4cfe2e6175358a065cc38",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:352283894008:paws-state-queue",
"awsRegion": "us-east-1"
}
]
"Records": [
{
"messageId": "1b77ecca-5c1d-4c88-a6b5-f5be6d6e3947",
"receiptHandle": "AQEB78LmKaaZ+uj8DVnc/O2zQcAe+qKSi3ZGTSBFssIRSmpwwzUEi2vhPTaIBnhOoh0aoRxVjWdoXO3ZloINfMxmcjycP3KC0WXwcWokoOc3iMCdqhYg0NcOhQW1X0ixc79C9/5/XF1xGd79vLhFL7KvRjjiT4sOaSxlAv6v2fJ5eDETnp7CRa5pocCF4EO2su0M4/TnlLreGfsY+C+/tH+r19AM+d3Jt5dbNMrKMWRRZ7/PTjczkIM7U38AHPuusuBz9uzA5yMQGOMI8FPXfqgcafEN17JqKuNSd/l54v1+s9rBQSzL/MH9wZ0XpZditcpe5pTc+dGuRHOXbFK2A0YUQCvRq1Ed8tfr68uELTQK5jWHH/zPnEMWsTyco9VuNCKr",
"body": "{\n \"priv_collector_state\": {\n \"since\": \"2024-04-13T00:00:00Z\",\n \n \"stream\": \"projects/rcs-test-project-422212\",\n \"until\": \"2024-06-13T23:00:00Z\"\n }\n}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1574159909907",
"SenderId": "some-id",
"ApproximateFirstReceiveTimestamp": "1574159909968"
},
"messageAttributes": {},
"md5OfBody": "1845296051d4cfe2e6175358a065cc38",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:352283894008:paws-state-queue",
"awsRegion": "us-east-1"
}
]
}
14 changes: 14 additions & 0 deletions collectors/googlestackdriver/local/run-sam.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,20 @@ SRC_SAM_TEMPLATE="${SCRIPT_DIR}/sam-template.yaml"
SRC_ENV_FILE="${SCRIPT_DIR}/${ENV_FILE_NAME}"
SRC_EVENT_FILE="${SCRIPT_DIR}/events/${EVENT_FILE_NAME}"
RUN_DIR=${SCRIPT_DIR}/../
PROFILE_NAME=""

exists(){
command -v "$1" >/dev/null 2>&1
}

if exists jq; then
uid=`uuidgen`
LOWERUUID=$(echo "$uid" | tr '[:upper:]' '[:lower:]')
echo "generating messageId in event.json: ${LOWERUUID}"
jq --arg newRandomvalue $LOWERUUID '(.Records[].messageId) |= $newRandomvalue' ${SRC_EVENT_FILE} > tmp && mv tmp ${SRC_EVENT_FILE}
else
echo "jq does not exist please install jq to run command"
fi

command -v sam > /dev/null
if [ $? -ne 0 ]; then
Expand All @@ -26,6 +39,7 @@ ln -sf ${SRC_ENV_FILE} ${RUN_DIR}/${ENV_FILE_NAME}
ln -sf ${SRC_EVENT_FILE} ${RUN_DIR}/${EVENT_FILE_NAME}
cd ${RUN_DIR} && \
sam local invoke \
--profile ${PROFILE_NAME} \
--env-vars ${ENV_FILE_NAME} \
-t ${SAM_TEMPLATE_NAME} \
-e ${EVENT_FILE_NAME} \
Expand Down
6 changes: 4 additions & 2 deletions collectors/googlestackdriver/local/sam-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Resources:
LocalLambda:
Type: AWS::Serverless::Function
Properties:
KmsKeyArn: arn:aws:kms:us-east-1:352283894008:key/cdda86d5-615b-4dcc-9319-77ab34510473
KmsKeyArn:
Environment:
Variables:
AWS_LAMBDA_FUNCTION_NAME:
Expand All @@ -25,6 +25,7 @@ Resources:
paws_api_client_id:
paws_max_pages_per_invocation:
paws_collector_param_string_2:
paws_collector_param_string_1:
paws_endpoint:
paws_extension:
collector_id:
Expand All @@ -33,8 +34,9 @@ Resources:
customer_id:
paws_type_name:
ssm_direct:
collector_status_api: api.product.dev.alertlogic.com
CodeUri:
Runtime: nodejs20.x
Handler: index.handler
Timeout: 60
Timeout: 600
MemorySize: 1024
6 changes: 3 additions & 3 deletions collectors/googlestackdriver/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "googlestackdriver-collector",
"version": "1.2.11",
"version": "1.2.12",
"description": "Alert Logic AWS based Googlestackdriver Log Collector",
"repository": {},
"private": true,
Expand All @@ -27,10 +27,10 @@
"dependencies": {
"@alertlogic/al-collector-js": "3.0.11",
"@alertlogic/paws-collector": "2.2.3",
"@google-cloud/logging": "^11.0.0",
"google-proto-files": "^4.2.0",
"async": "^3.2.5",
"debug": "^4.3.5",
"google-auth-library": "^9.11.0",
"googleapis": "^126.0.0",
"moment": "2.30.1"
},
"author": "Alert Logic Inc."
Expand Down
Loading
Loading