diff --git a/bin/akamai-etp b/bin/akamai-etp index 753b158..45a276b 100755 --- a/bin/akamai-etp +++ b/bin/akamai-etp @@ -23,7 +23,7 @@ import json.decoder import ipaddress import time import signal -from threading import Event +from threading import Event, Lock from enum import Enum import logging from urllib.parse import parse_qs @@ -31,6 +31,7 @@ import csv import math from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from datetime import timedelta +import io # 3rd party modules @@ -40,7 +41,7 @@ from requests.compat import urljoin from akamai.edgegrid import EdgeGridAuth, EdgeRc from config import EdgeGridConfig -__version__ = "0.4.6" +__version__ = "0.4.7" #: Window span in ad-hoc mode, default is 3 min span_duration_min = 3 @@ -105,10 +106,10 @@ class ETPEventFetchStats(object): #: Timedelta spent in network operation (Request/Response) for the poll interval poll_elapsed = timedelta() - def inc_event(self): - "Increment the number of event by 1." - self.events += 1 - self.poll_events += 1 + def inc_event(self, increment: int=1): + "Increment the number of event by increment." + self.events += increment + self.poll_events += increment def inc_api_call(self): self.api_calls += 1 @@ -125,6 +126,44 @@ class ETPEventFetchStats(object): self.poll_elapsed += d +class SortedPageOutput(object): + """ + Accumulates security events (individually dict), indexed per page number. + This is meant to used by // threads. + If some pages are missing, we hold off in flushing this to the output + """ + + def __init__(self, output: io.TextIOWrapper) -> None: + self.eventByPage = dict() + self.output = output + self.last_flushed_page = 0 + self.lock = Lock() + + def append(self, page_number: int, events: list[dict]) -> None: + LOG.debug(f"Adding {len(events)} events from page #{page_number}") + self.eventByPage[page_number] = events + self.flush() + + def flush(self) -> None: + """ + Flush available events from page arrived. + If a gap like page 2 and page 4 are arrived, we don't flush anything + """ + LOG.debug(f"flush() called last_flushed_page={self.last_flushed_page} {sorted(self.eventByPage.keys())}...") + with self.lock: # Critical section concurrent thread flushing will have to wait + for page_num in sorted(self.eventByPage.keys()): + if page_num == self.last_flushed_page + 1: + for event in self.eventByPage[page_num]: + self.output.write("%s\n" % json.dumps(event)) + self.output.flush() + self.last_flushed_page = page_num + del self.eventByPage[page_num] + LOG.debug(f"Flushed till page {self.last_flushed_page}") + else: + LOG.debug(f"Current page {page_num} too ahead, waiting for page {self.last_flushed_page + 1}") + break + + def exit_fromresponse(response): """ Convert an HTTP Code into an CLI return code. @@ -248,7 +287,7 @@ def exit_gracefully(signum, frame): stop_event.set() -def fetch_event_page(start, end, page_number, thread_pool, pool_futures, stats, output): +def fetch_event_page(start, end, page_number, thread_pool, pool_futures, stats, output: SortedPageOutput): """ Fetch a single page of result from ETP API It runs the API call and write the output @@ -288,17 +327,20 @@ def fetch_event_page(start, end, page_number, thread_pool, pool_futures, stats, if r.status_code != 200: stats.api_calls_fail += 1 - LOG.error(f"API call failed with HTTP/{r.status_code}: {r.content}") + LOG.error(f"API call failed with HTTP/{r.status_code}: {r.content}. URL was {r.url}") + # Handle more gracefully 401 (auth error) on page 1 and + # abort the cli right away with return code 1 + if page_number == 1 and r.status_code == 401: + exit(4) # 4xx translates into RC 4 stats.inc_poll_elapsed(r.elapsed) LOG.info("{OPEN} API call took %.2f seconds, page #%s" % (r.elapsed.total_seconds(), page_number)) response_data = r.json() stats.bytes += len(r.content) - for e in response_data.get('dataRows', []): - output.write("%s\n" % json.dumps(e)) - stats.inc_event() - output.flush() + rows = response_data.get('dataRows', []) + output.append(page_number, rows) + stats.inc_event(len(rows)) if page_number == 1: page_info = response_data.get("pageInfo", {}) @@ -347,11 +389,11 @@ def events_summary(start, end, config): LOG.exception(f"Error fetching {config.event_type} summary...") -def fetch_events_concurrent(config, output): +def fetch_events_concurrent(config, output: io.TextIOWrapper): """ Fetch ETP security events Unlike the old fetch_events (prior to cli-etp 0.4.1) this version is optimized - to fetch the pages with concurrency leveraging a pool of thread. + to fetch the pages with concurrency leveraging a pool of threads. """ stats = ETPEventFetchStats() @@ -368,6 +410,7 @@ def fetch_events_concurrent(config, output): if config.end: end = config.end + output_pages = SortedPageOutput(output) pool_futures = [] concurrent_fetch = ThreadPoolExecutor( max_workers=min(config.concurrent, MAX_FETCH_CONCURRENCY), @@ -378,10 +421,11 @@ def fetch_events_concurrent(config, output): expected_event_count = events_summary(start, end, config) # 1st page fetch is executed in the main cli thread, optional subsequent # ones are executed within the thread pool - fetch_event_page(start, end, 1, concurrent_fetch, pool_futures, stats, output) + fetch_event_page(start, end, 1, concurrent_fetch, pool_futures, stats, output_pages) # Wait until all the API calls fetching pages are processed try: wait(pool_futures, return_when=ALL_COMPLETED) + output_pages.flush() if not config.tail: break else: diff --git a/cli.json b/cli.json index a4a18f6..69fe424 100755 --- a/cli.json +++ b/cli.json @@ -5,7 +5,7 @@ "commands": [ { "name": "etp", - "version": "0.4.6", + "version": "0.4.7", "description": "Akamai CLI for Secure Internet Access Enterprise (f.k.a. Enterprise Threat Protector)" } ] diff --git a/test/test.py b/test/test.py index bb21fce..e88676b 100644 --- a/test/test.py +++ b/test/test.py @@ -38,6 +38,8 @@ import os import random import re +import json + # Global variables encoding = 'utf-8' @@ -58,6 +60,8 @@ def cli_command(self, *args): if os.environ.get('EDGERC_SECTION'): command.extend(["--section", os.environ['EDGERC_SECTION']]) command.extend(*args) + + print(f"\nEDGERC_SECTION={os.environ.get('EDGERC_SECTION', 'default')}") print("\nCOMMAND: ", " ".join(command)) return command @@ -83,21 +87,41 @@ def duplicate_count(filename): return total_count -class TestEvents(CliETPTest): +class BaseTestEvents(CliETPTest): - after = int(time.time() - 15 * 60) + after = int(time.time() - 30 * 60) before = int(time.time()) + @staticmethod + def is_sorted(l): + return all(a <= b for a, b in zip(l, l[1:])) + + @staticmethod + def is_seq_sorted(events_list): + time_seq = [] + for e in events_list: + event = json.loads(e) + time_seq.append(event.get('query', {}).get("time")) + + return TestEvents.is_sorted(time_seq) + + def test_event_threat(self): """ Fetch threat events """ cmd = self.cli_run("event", "threat", "--start", self.after, "--end", self.before) - stdout, stderr = cmd.communicate(timeout=60) + stdout, stderr = cmd.communicate(timeout=360) events = stdout.decode(encoding) - event_count = len(events.splitlines()) - self.assertGreater(event_count, 0, "We expect at least one threat event") - self.assertEqual(cmd.returncode, 0, 'return code must be 0') + events_list = events.splitlines() + + if cmd.returncode != 0: + print(stderr.decode(encoding)) + + self.assertEqual(cmd.returncode, 0, f'cli-etp return code must be 0, {cmd.returncode} returned') + self.assertGreater(len(events_list), 0, "We expect at least one threat event") + self.assertTrue(TestEvents.is_seq_sorted(events_list), "The list of events is not sorted by query.time") + def test_event_aup(self): """ @@ -106,9 +130,48 @@ def test_event_aup(self): cmd = self.cli_run("event", "aup", "--start", self.after, "--end", self.before) stdout, stderr = cmd.communicate(timeout=120) events = stdout.decode(encoding) - event_count = len(events.splitlines()) - self.assertGreater(event_count, 0, "We expect at least one AUP event") + events_list = events.splitlines() + event_count = len(events_list) self.assertEqual(cmd.returncode, 0, 'return code must be 0') + self.assertGreater(event_count, 0, "We expect at least one AUP event") + self.assertTrue(TestEvents.is_seq_sorted(events_list), "The list of events is not sorted by query.time") + + def test_is_seq_sorted(self): + """ + Test built-in function to ensure a sequence of event is sorted. + """ + self.assertTrue( + TestEvents.is_seq_sorted( + [ + '{"query": {"time": "1"}}', + '{"query": {"time": "2"}}', + '{"query": {"time": "3"}}' + ]), "Positive Test") + self.assertFalse( + TestEvents.is_seq_sorted( + [ + '{"query": {"time": "2"}}', + '{"query": {"time": "1"}}', + '{"query": {"time": "3"}}' + ]), "Negative Test") + + def test_event_dns(self): + """ + Fetch DNS events (the most chatty one) + """ + cmd = self.cli_run("event", "dns", "--start", self.after, "--end", self.before) + stdout, stderr = cmd.communicate(timeout=360) + events = stdout.decode(encoding) + events_list = events.splitlines() + print(f"Loaded {len(events_list)} security events (DNS).") + + if cmd.returncode != 0: + print(stderr.decode(encoding)) + + self.assertEqual(cmd.returncode, 0, f'cli-etp return code must be 0, {cmd.returncode} returned') + self.assertGreater(len(events_list), 0, "We expect at least one threat event") + self.assertTrue(TestEvents.is_seq_sorted(events_list), "The list of events is not sorted by query.time") + def test_event_aup_file(self): """ @@ -142,6 +205,17 @@ def test_event_netcon(self): self.assertEqual(cmd.returncode, 0, 'return code must be 0') +class TestEvents(BaseTestEvents): + def setUp(self): + super().setUp() + del os.environ['CLIETP_FETCH_CONCURRENT'] + + +class TestEventsMaxThread(BaseTestEvents): + def setUp(self): + super().setUp() + os.environ['CLIETP_FETCH_CONCURRENT'] = "8" + class TestCliETP(CliETPTest):