Skip to content

Commit

Permalink
Merge pull request #17 from akamai/EME-789
Browse files Browse the repository at this point in the history
- Fix bug introduced with the API concurrent fetching causing events to not be sorted.
- Added corresponding unit-test to prevent the regression in the future Bump version to 0.4.7
- Added corresponding unit-test to prevent the regression in the future
  • Loading branch information
bitonio authored Mar 25, 2024
2 parents 84ff91a + f486a78 commit 3adfe23
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 24 deletions.
74 changes: 59 additions & 15 deletions bin/akamai-etp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import json.decoder
import ipaddress
import time
import signal
from threading import Event
from threading import Event, Lock
from enum import Enum
import logging
from urllib.parse import parse_qs
import csv
import math
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from datetime import timedelta
import io

# 3rd party modules

Expand All @@ -40,7 +41,7 @@ from requests.compat import urljoin
from akamai.edgegrid import EdgeGridAuth, EdgeRc
from config import EdgeGridConfig

__version__ = "0.4.6"
__version__ = "0.4.7"

#: Window span in ad-hoc mode, default is 3 min
span_duration_min = 3
Expand Down Expand Up @@ -105,10 +106,10 @@ class ETPEventFetchStats(object):
#: Timedelta spent in network operation (Request/Response) for the poll interval
poll_elapsed = timedelta()

def inc_event(self):
"Increment the number of event by 1."
self.events += 1
self.poll_events += 1
def inc_event(self, increment: int=1):
"Increment the number of event by increment."
self.events += increment
self.poll_events += increment

def inc_api_call(self):
self.api_calls += 1
Expand All @@ -125,6 +126,44 @@ class ETPEventFetchStats(object):
self.poll_elapsed += d


class SortedPageOutput(object):
"""
Accumulates security events (individually dict), indexed per page number.
This is meant to used by // threads.
If some pages are missing, we hold off in flushing this to the output
"""

def __init__(self, output: io.TextIOWrapper) -> None:
self.eventByPage = dict()
self.output = output
self.last_flushed_page = 0
self.lock = Lock()

def append(self, page_number: int, events: list[dict]) -> None:
LOG.debug(f"Adding {len(events)} events from page #{page_number}")
self.eventByPage[page_number] = events
self.flush()

def flush(self) -> None:
"""
Flush available events from page arrived.
If a gap like page 2 and page 4 are arrived, we don't flush anything
"""
LOG.debug(f"flush() called last_flushed_page={self.last_flushed_page} {sorted(self.eventByPage.keys())}...")
with self.lock: # Critical section concurrent thread flushing will have to wait
for page_num in sorted(self.eventByPage.keys()):
if page_num == self.last_flushed_page + 1:
for event in self.eventByPage[page_num]:
self.output.write("%s\n" % json.dumps(event))
self.output.flush()
self.last_flushed_page = page_num
del self.eventByPage[page_num]
LOG.debug(f"Flushed till page {self.last_flushed_page}")
else:
LOG.debug(f"Current page {page_num} too ahead, waiting for page {self.last_flushed_page + 1}")
break


def exit_fromresponse(response):
"""
Convert an HTTP Code into an CLI return code.
Expand Down Expand Up @@ -248,7 +287,7 @@ def exit_gracefully(signum, frame):
stop_event.set()


def fetch_event_page(start, end, page_number, thread_pool, pool_futures, stats, output):
def fetch_event_page(start, end, page_number, thread_pool, pool_futures, stats, output: SortedPageOutput):
"""
Fetch a single page of result from ETP API
It runs the API call and write the output
Expand Down Expand Up @@ -288,17 +327,20 @@ def fetch_event_page(start, end, page_number, thread_pool, pool_futures, stats,

if r.status_code != 200:
stats.api_calls_fail += 1
LOG.error(f"API call failed with HTTP/{r.status_code}: {r.content}")
LOG.error(f"API call failed with HTTP/{r.status_code}: {r.content}. URL was {r.url}")
# Handle more gracefully 401 (auth error) on page 1 and
# abort the cli right away with return code 1
if page_number == 1 and r.status_code == 401:
exit(4) # 4xx translates into RC 4

stats.inc_poll_elapsed(r.elapsed)
LOG.info("{OPEN} API call took %.2f seconds, page #%s" % (r.elapsed.total_seconds(), page_number))
response_data = r.json()
stats.bytes += len(r.content)

for e in response_data.get('dataRows', []):
output.write("%s\n" % json.dumps(e))
stats.inc_event()
output.flush()
rows = response_data.get('dataRows', [])
output.append(page_number, rows)
stats.inc_event(len(rows))

if page_number == 1:
page_info = response_data.get("pageInfo", {})
Expand Down Expand Up @@ -347,11 +389,11 @@ def events_summary(start, end, config):
LOG.exception(f"Error fetching {config.event_type} summary...")


def fetch_events_concurrent(config, output):
def fetch_events_concurrent(config, output: io.TextIOWrapper):
"""
Fetch ETP security events
Unlike the old fetch_events (prior to cli-etp 0.4.1) this version is optimized
to fetch the pages with concurrency leveraging a pool of thread.
to fetch the pages with concurrency leveraging a pool of threads.
"""
stats = ETPEventFetchStats()

Expand All @@ -368,6 +410,7 @@ def fetch_events_concurrent(config, output):
if config.end:
end = config.end

output_pages = SortedPageOutput(output)
pool_futures = []
concurrent_fetch = ThreadPoolExecutor(
max_workers=min(config.concurrent, MAX_FETCH_CONCURRENCY),
Expand All @@ -378,10 +421,11 @@ def fetch_events_concurrent(config, output):
expected_event_count = events_summary(start, end, config)
# 1st page fetch is executed in the main cli thread, optional subsequent
# ones are executed within the thread pool
fetch_event_page(start, end, 1, concurrent_fetch, pool_futures, stats, output)
fetch_event_page(start, end, 1, concurrent_fetch, pool_futures, stats, output_pages)
# Wait until all the API calls fetching pages are processed
try:
wait(pool_futures, return_when=ALL_COMPLETED)
output_pages.flush()
if not config.tail:
break
else:
Expand Down
2 changes: 1 addition & 1 deletion cli.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"commands": [
{
"name": "etp",
"version": "0.4.6",
"version": "0.4.7",
"description": "Akamai CLI for Secure Internet Access Enterprise (f.k.a. Enterprise Threat Protector)"
}
]
Expand Down
90 changes: 82 additions & 8 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import os
import random
import re
import json


# Global variables
encoding = 'utf-8'
Expand All @@ -58,6 +60,8 @@ def cli_command(self, *args):
if os.environ.get('EDGERC_SECTION'):
command.extend(["--section", os.environ['EDGERC_SECTION']])
command.extend(*args)

print(f"\nEDGERC_SECTION={os.environ.get('EDGERC_SECTION', 'default')}")
print("\nCOMMAND: ", " ".join(command))
return command

Expand All @@ -83,21 +87,41 @@ def duplicate_count(filename):
return total_count


class TestEvents(CliETPTest):
class BaseTestEvents(CliETPTest):

after = int(time.time() - 15 * 60)
after = int(time.time() - 30 * 60)
before = int(time.time())

@staticmethod
def is_sorted(l):
return all(a <= b for a, b in zip(l, l[1:]))

@staticmethod
def is_seq_sorted(events_list):
time_seq = []
for e in events_list:
event = json.loads(e)
time_seq.append(event.get('query', {}).get("time"))

return TestEvents.is_sorted(time_seq)


def test_event_threat(self):
"""
Fetch threat events
"""
cmd = self.cli_run("event", "threat", "--start", self.after, "--end", self.before)
stdout, stderr = cmd.communicate(timeout=60)
stdout, stderr = cmd.communicate(timeout=360)
events = stdout.decode(encoding)
event_count = len(events.splitlines())
self.assertGreater(event_count, 0, "We expect at least one threat event")
self.assertEqual(cmd.returncode, 0, 'return code must be 0')
events_list = events.splitlines()

if cmd.returncode != 0:
print(stderr.decode(encoding))

self.assertEqual(cmd.returncode, 0, f'cli-etp return code must be 0, {cmd.returncode} returned')
self.assertGreater(len(events_list), 0, "We expect at least one threat event")
self.assertTrue(TestEvents.is_seq_sorted(events_list), "The list of events is not sorted by query.time")


def test_event_aup(self):
"""
Expand All @@ -106,9 +130,48 @@ def test_event_aup(self):
cmd = self.cli_run("event", "aup", "--start", self.after, "--end", self.before)
stdout, stderr = cmd.communicate(timeout=120)
events = stdout.decode(encoding)
event_count = len(events.splitlines())
self.assertGreater(event_count, 0, "We expect at least one AUP event")
events_list = events.splitlines()
event_count = len(events_list)
self.assertEqual(cmd.returncode, 0, 'return code must be 0')
self.assertGreater(event_count, 0, "We expect at least one AUP event")
self.assertTrue(TestEvents.is_seq_sorted(events_list), "The list of events is not sorted by query.time")

def test_is_seq_sorted(self):
"""
Test built-in function to ensure a sequence of event is sorted.
"""
self.assertTrue(
TestEvents.is_seq_sorted(
[
'{"query": {"time": "1"}}',
'{"query": {"time": "2"}}',
'{"query": {"time": "3"}}'
]), "Positive Test")
self.assertFalse(
TestEvents.is_seq_sorted(
[
'{"query": {"time": "2"}}',
'{"query": {"time": "1"}}',
'{"query": {"time": "3"}}'
]), "Negative Test")

def test_event_dns(self):
"""
Fetch DNS events (the most chatty one)
"""
cmd = self.cli_run("event", "dns", "--start", self.after, "--end", self.before)
stdout, stderr = cmd.communicate(timeout=360)
events = stdout.decode(encoding)
events_list = events.splitlines()
print(f"Loaded {len(events_list)} security events (DNS).")

if cmd.returncode != 0:
print(stderr.decode(encoding))

self.assertEqual(cmd.returncode, 0, f'cli-etp return code must be 0, {cmd.returncode} returned')
self.assertGreater(len(events_list), 0, "We expect at least one threat event")
self.assertTrue(TestEvents.is_seq_sorted(events_list), "The list of events is not sorted by query.time")


def test_event_aup_file(self):
"""
Expand Down Expand Up @@ -142,6 +205,17 @@ def test_event_netcon(self):
self.assertEqual(cmd.returncode, 0, 'return code must be 0')


class TestEvents(BaseTestEvents):
def setUp(self):
super().setUp()
del os.environ['CLIETP_FETCH_CONCURRENT']


class TestEventsMaxThread(BaseTestEvents):
def setUp(self):
super().setUp()
os.environ['CLIETP_FETCH_CONCURRENT'] = "8"


class TestCliETP(CliETPTest):

Expand Down

0 comments on commit 3adfe23

Please sign in to comment.