From 11ed467cfd44f86b68de5364c431b12371fdd149 Mon Sep 17 00:00:00 2001 From: Antoine Drochon Date: Wed, 9 Jun 2021 17:45:00 -0700 Subject: [PATCH 1/4] Introduce two new parameters for tail mode Reduce the risk of loosing event in case of API/network slowness Some improvments in the test script (some bugs TO BE FIXED) Bump to 0.3.5 --- bin/akamai-etp | 85 +++++++++++++++++++++++++++++++------------------- bin/config.py | 18 +++++------ cli.json | 2 +- test/test.bash | 40 ++++++++++++++++++++++-- 4 files changed, 100 insertions(+), 45 deletions(-) diff --git a/bin/akamai-etp b/bin/akamai-etp index c662ad3..34bbe2f 100755 --- a/bin/akamai-etp +++ b/bin/akamai-etp @@ -35,37 +35,32 @@ from requests.compat import urljoin from akamai.edgegrid import EdgeGridAuth, EdgeRc from config import EdgeGridConfig -__version__ = "0.3.4" +__version__ = "0.3.5" -#: Data collection delay, default is 30 minutes -collection_delay_min = 30 #: Window span in ad-hoc mode, default is 15 min span_duration_min = 15 -#: How often we poll in --tail mode, default is 60 sec -poll_interval_sec = 60 session = requests.Session() verbose = False section_name = "default" +headers = {'content-type': "application/json;charset=UTF-8"} +extra_qs = None LOG = logging.getLogger(__name__) # If all parameters are set already, use them. Otherwise # use the config config = EdgeGridConfig({"verbose": False}, section_name) -verbose = getattr(config, "verbose", False) -# Set auth -session.auth = EdgeGridAuth( - client_token=config.client_token, - client_secret=config.client_secret, - access_token=config.access_token -) +#: Verbose mode, configurable with -v or --verbose +verbose = getattr(config, "verbose", False) +#: Fetch limit in seconds, configurable with --limit +fetch_limit = getattr(config, "limit", 3 * 60 * 60) +#: Poll interval (also defin how much data we get each time) +#: Default is 5 minutes, configurable with --poll +poll_interval_sec = getattr(config, "poll", 60) -session.headers.update({'User-Agent': f"{config.ua_prefix} cli-etp/{__version__}"}) -headers = {'content-type': "application/json;charset=UTF-8"} -baseurl = '%s://%s' % ('https', config.host) -extra_qs = None +baseurl = '%s://%s' % ('https', getattr(config, "host", "host-not-set-in-config")) class ETPListType(Enum): @@ -175,17 +170,18 @@ def fetch_events(config, output): """ stop_event = Event() event_count = 0 + byte_count = 0 def exit_gracefully(signum, frame): stop_event.set() if config.tail: # The window span is show so we can keep adding content by small increment - start = int(time.time()) - (collection_delay_min * 60) - poll_interval_sec + start = int(time.time()) - fetch_limit - poll_interval_sec end = start + poll_interval_sec signal.signal(signal.SIGTERM, exit_gracefully) signal.signal(signal.SIGINT, exit_gracefully) else: # Larger window span - start = int(time.time()) - (collection_delay_min * 60) - (span_duration_min * 60) + start = int(time.time()) - fetch_limit - (span_duration_min * 60) if config.start: start = config.start end = start + (span_duration_min * 60) @@ -196,6 +192,7 @@ def fetch_events(config, output): while not stop_event.is_set(): pageNumber = 1 numOfPages = 0 + event_interval_count = 0 while numOfPages == 0 or pageNumber <= numOfPages: post_data = { 'startTimeSec': start, @@ -215,7 +212,8 @@ def fetch_events(config, output): } LOG.info("{OPEN} API URL: %s" % event_url) LOG.info("{OPEN} API POST param %s" % post_data) - r = session.post(event_url, params=build_params(), json=post_data, headers=headers) + r = session.post(event_url, params=build_params(), json=post_data, headers=headers, timeout=min(300, poll_interval_sec*2)) + byte_count += len(r.content) LOG.info("{OPEN} API response code is HTTP/%s, body %s bytes" % (r.status_code, len(r.content))) if r.status_code != 200: LOG.error(r.content) @@ -233,9 +231,10 @@ def fetch_events(config, output): for e in response_data.get('dataRows', []): output.write("%s\n" % json.dumps(e)) event_count += 1 + event_interval_count += 1 output.flush() pageNumber += 1 - LOG.info("%s events reported so far." % event_count) + LOG.info(f"{event_interval_count} event(s) for current {poll_interval_sec}s interval [{start} -> {end}], {pageNumber - 1} page(s).") if not config.tail: break else: @@ -245,14 +244,23 @@ def fetch_events(config, output): # TODO: add a better/more resilient logic sleep_time = max(0, poll_interval_sec - (time.time() - timing_s)) if sleep_time == 0: - LOG.warn("Potential data gaps") - LOG.info("Sleeping for %.2f sec..." % sleep_time) - stop_event.wait(sleep_time) - start = int(time.time()) - (collection_delay_min * 60) - poll_interval_sec - end = start + poll_interval_sec - LOG.info("Next cycle will be from %s to %s..." % (start, end)) + LOG.warning(f"Drifting, consider increase the poll interval (currently {poll_interval_sec}s)") + if not stop_event.wait(sleep_time): + LOG.info("Sleeping for %.2f sec..." % sleep_time) + # prior to 0.3.5 + # start = int(time.time()) - fetch_limit - poll_interval_sec + # end = start + poll_interval_sec + # after 0.3.5 + start = end # next cycle resume where we finish this one + end = int(time.time()) - fetch_limit # the window ends at now - limit + LOG.info(f"Next cycle will be from {start} to {end} [{end - start}s]...") + except KeyboardInterrupt: + stop_event.set() + LOG.warning("Keyboard interrupt detected") + except requests.exceptions.ReadTimeout: + LOG.warning(f"Request timeout, consider increase poll interval (currently {poll_interval_sec}s)") finally: - LOG.info("%d event(s) fetched in total" % event_count) + LOG.info(f"{event_count} event(s) fetched in total, {byte_count} byte(s)") def list_add_or_delete(config): @@ -352,13 +360,29 @@ def main(): logging.basicConfig( filename=config.logfile, level=log_level(), - format='%(asctime)s [%(levelname)s] %(threadName)s %(message)s' + format='%(asctime)s %(levelname).1s %(message)s' ) LOG.info("Python %s" % sys.version) LOG.info("PID: %s" % os.getpid()) LOG.info("Command is: %s" % config.command) - LOG.info("ETP Config ID: %s" % config.etp_config_id) + LOG.info("ETP Config ID: %s" % getattr(config, 'etp_config_id', None)) + LOG.info(f"Tail fetch limit: {fetch_limit} seconds") + LOG.info(f"Tail poll interval: {poll_interval_sec} seconds") + + if not config.command: + config.parser.print_help() + elif config.command == "version": + print(__version__) + sys.exit(0) + + # Initialize Requests Session for the API calls + session.auth = EdgeGridAuth( + client_token=config.client_token, + client_secret=config.client_secret, + access_token=config.access_token + ) + session.headers.update({'User-Agent': f"{config.ua_prefix} cli-etp/{__version__}"}) if config.command == "event": if config.output is None: @@ -413,9 +437,6 @@ def main(): ioc.timeseries(config.domain) elif config.ioc_action == "changes": ioc.changes(config.domain) - elif config.command == "version": - print(__version__) - if __name__ == '__main__': main() \ No newline at end of file diff --git a/bin/config.py b/bin/config.py index 5732ac5..3a5a75d 100644 --- a/bin/config.py +++ b/bin/config.py @@ -19,15 +19,8 @@ import os import argparse import logging - -if sys.version_info[0] >= 3: - # python3 - from configparser import ConfigParser - import http.client as http_client -else: - # python2.7 - from ConfigParser import ConfigParser - import httplib as http_client +from configparser import ConfigParser +import http.client as http_client epilog = '''Copyright (C) Akamai Technologies, Inc\n''' \ @@ -57,6 +50,10 @@ def __init__(self, config_values, configuration, flags=None): help="""Do not stop when most recent log is reached,\n""" """rather to wait for additional data to be appended\n""" """to the input. --start and --end are ignored when used.""") + event_parser.add_argument('--poll', type=int, default=60, + help="How often we pull data in tail mode") + event_parser.add_argument('--limit', type=int, default=3*60*60, + help="Stop the most recent fetch to now minus specified seconds, default is 3 hours. Applicable to --tail") list_parser = subparsers.add_parser("list", help="Manage ETP security list", epilog=epilog, formatter_class=argparse.RawTextHelpFormatter) @@ -160,5 +157,6 @@ def __init__(self, config_values, configuration, flags=None): self.create_base_url() def create_base_url(self): - self.base_url = "https://%s" % self.host + if hasattr(self, 'host'): + self.base_url = "https://%s" % self.host \ No newline at end of file diff --git a/cli.json b/cli.json index 966ddc7..8afc519 100755 --- a/cli.json +++ b/cli.json @@ -5,7 +5,7 @@ "commands": [ { "name": "etp", - "version": "0.3.4", + "version": "0.3.5", "description": "Akamai CLI for Enterprise Threat Protector" } ] diff --git a/test/test.bash b/test/test.bash index ca13a0d..d6c824f 100755 --- a/test/test.bash +++ b/test/test.bash @@ -3,6 +3,31 @@ dir=$(cd .. && pwd -P) echo "Starting akamai cli etp tests..." +total_pass=0 +total_fail=0 + +function test_result() { + if [[ $1 == 0 ]]; then + pass "[PASS] $2" + total_pass=$(($total_pass + 1)) + else + error "[FAIL] $2" + total_fail=$(($total_fail + 1)) + fi +} + +function pass() { + GREEN='\033[0;32m' + NC='\033[0m' # No Color + printf "${GREEN}$1${NC}\n" +} + +function error() { + RED='\033[0;31m' + NC='\033[0m' # No Color + printf "${RED}$1${NC}\n" +} + if [ "$1" == "cli" ]; then # Native Akamai CLI interpreter='akamai etp -v' @@ -31,18 +56,26 @@ random_host3="host3-$random_ip.test.akamai.com" # Version -$interpreter version +$interpreter version +test_result $? "Display cli-etp version" # Pull events $interpreter event aup +test_result $? "Fetch recent AUP events" $interpreter event threat - +test_result $? "Fetch recent Threat events" # List management $interpreter list get +test_result $? "Fetch security lists" + random_listid=$($interpreter list get|sort -R| head -n 1|cut -f1 -d,) +test_result $? "Pick a random list to work with" + $interpreter list add $etp_config_id $random_ip +test_result $? "Add IP to the list $random_listid" + $interpreter list add $etp_config_id $random_ip2 $random_ip3 $interpreter list add $etp_config_id $random_host $interpreter list add $etp_config_id $random_host2 $random_host3 @@ -56,4 +89,7 @@ if type -t deactivate > /dev/null; then deactivate fi +error "Total error(s): $total_fail" +pass "Total success(es): $total_pass" + echo "Test completed." \ No newline at end of file From dcadcdb29101719ab16ea581fab20fc261b712bc Mon Sep 17 00:00:00 2001 From: Antoine Drochon Date: Thu, 10 Jun 2021 16:30:34 -0700 Subject: [PATCH 2/4] Fix missing exit --- bin/akamai-etp | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/akamai-etp b/bin/akamai-etp index 34bbe2f..14e2f07 100755 --- a/bin/akamai-etp +++ b/bin/akamai-etp @@ -372,6 +372,7 @@ def main(): if not config.command: config.parser.print_help() + sys.exit(0) elif config.command == "version": print(__version__) sys.exit(0) From 925fe4a1f9870c42e8efef4496958cd5a9b34986 Mon Sep 17 00:00:00 2001 From: Antoine Drochon Date: Thu, 10 Jun 2021 16:32:25 -0700 Subject: [PATCH 3/4] Initial testing with unittest/nose --- test/test.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 test/test.py diff --git a/test/test.py b/test/test.py new file mode 100644 index 0000000..274d103 --- /dev/null +++ b/test/test.py @@ -0,0 +1,108 @@ +# Copyright 2021 Akamai Technologies, Inc. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module replaces the old test.bash script +Tested with nose2: +```bash +cd test +nose2 --html-report -v +open report.html +``` +""" + +import unittest +import subprocess +import shlex +import time +from pathlib import Path + +# Global variables +encoding = 'utf-8' + +class CliETPTest(unittest.TestCase): + testdir = None + maindir = None + + def setUp(self): + self.testdir = Path(__file__).resolve().parent + self.maindir = Path(__file__).resolve().parent.parent + + def cli_command(self, *args): + command = shlex.split(f'python3 {self.maindir}/bin/akamai-etp') + command.extend(*args) + print("\nCOMMAND: ", command) + return command + + def cli_run(self, *args): + cmd = subprocess.Popen(self.cli_command(str(a) for a in args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return cmd + + def line_count(filename): + count = 0 + for line in open(filename).xreadlines(): + count += 1 + return count + + +class TestEvents(CliETPTest): + + after = int(time.time() - 15 * 60) + before = int(time.time()) + + def test_event_threat(self): + """ + Fetch threat events + """ + cmd = self.cli_run("event", "threat", "--start", self.after, "--end", self.before) + stdout, stderr = cmd.communicate(timeout=60) + events = stdout.decode(encoding) + event_count = len(events.splitlines()) + self.assertGreater(event_count, 0, "We expect at least one threat event") + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + + def test_event_aup(self): + """ + Fetch AUP events + """ + cmd = self.cli_run("event", "aup", "--start", self.after, "--end", self.before) + stdout, stderr = cmd.communicate(timeout=120) + events = stdout.decode(encoding) + event_count = len(events.splitlines()) + self.assertGreater(event_count, 0, "We expect at least one AUP event") + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + +class TestCliETP(CliETPTest): + + def test_no_edgerc(self): + """ + Call CLI with a bogus edgerc file, help should be displayed. + """ + cmd = self.cli_run('-e', 'file_not_exist') + stdout, stderr = cmd.communicate() + output = stdout.decode(encoding) + self.assertIn("usage: akamai etp", output) + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + + def test_cli_version(self): + """ + Ensure version of the CLI is displayed + """ + cmd = self.cli_run('version') + stdout, stderr = cmd.communicate() + self.assertRegex(stdout.decode(encoding), r'[0-9]+\.[0-9]+\.[0-9]+\n', 'Version should be x.y.z') + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + +if __name__ == '__main__': + unittest.main() From 2e51dee1744d9bd3136dc0bf9a4dfa61276ba1e9 Mon Sep 17 00:00:00 2001 From: Antoine Drochon Date: Thu, 10 Jun 2021 17:00:11 -0700 Subject: [PATCH 4/4] Additional tests --- test/test.py | 42 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/test/test.py b/test/test.py index 274d103..27fafa4 100644 --- a/test/test.py +++ b/test/test.py @@ -27,10 +27,14 @@ import shlex import time from pathlib import Path +import collections +import tempfile +import os # Global variables encoding = 'utf-8' + class CliETPTest(unittest.TestCase): testdir = None maindir = None @@ -51,10 +55,21 @@ def cli_run(self, *args): def line_count(filename): count = 0 - for line in open(filename).xreadlines(): - count += 1 + with open(filename) as f: + while next(f, False): + count += 1 return count + def duplicate_count(filename): + total_count = 0 + with open(filename) as infile: + counts = collections.Counter(l.strip() for l in infile) + for line, count in counts.most_common(): + if count > 1: + print(f"DUPLICATE[{count}] {line}") + total_count += 1 + return total_count + class TestEvents(CliETPTest): @@ -83,6 +98,28 @@ def test_event_aup(self): self.assertGreater(event_count, 0, "We expect at least one AUP event") self.assertEqual(cmd.returncode, 0, 'return code must be 0') + def test_event_aup_file(self): + """ + Fetch AUP events, export as a file + """ + output_handle, output_filename = tempfile.mkstemp() + try: + cmd = self.cli_run("event", "aup", "--start", self.after, "--end", self.before, '--output', output_filename) + stdout, stderr = cmd.communicate(timeout=120) + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + line_count = TestCliETP.line_count(output_filename) + print(f"Output contains {line_count} lines") + duplicate_count = CliETPTest.duplicate_count(output_filename) + self.assertGreater(line_count, 0, "We expect at least a few events") + print(f"We found {duplicate_count} duplicates") + self.assertEqual(duplicate_count, 0) + + finally: + if os.path.isfile(output_filename): + os.remove(output_filename) + + + class TestCliETP(CliETPTest): def test_no_edgerc(self): @@ -104,5 +141,6 @@ def test_cli_version(self): self.assertRegex(stdout.decode(encoding), r'[0-9]+\.[0-9]+\.[0-9]+\n', 'Version should be x.y.z') self.assertEqual(cmd.returncode, 0, 'return code must be 0') + if __name__ == '__main__': unittest.main()