From b95478f4e0d706164c71bc6663b648a98febc014 Mon Sep 17 00:00:00 2001 From: Antoine Drochon Date: Mon, 22 Aug 2022 14:13:23 -0700 Subject: [PATCH] Fix issue with group API returning partial matching result, not exact name match --- .gitignore | 2 + bin/akamai-mfa | 24 ++++-- cli.json | 2 +- test/nose2.cfg | 7 ++ test/requirements.txt | 2 + test/test.py | 165 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 193 insertions(+), 9 deletions(-) create mode 100644 .gitignore create mode 100644 test/nose2.cfg create mode 100644 test/requirements.txt create mode 100644 test/test.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c365b3f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +test/report.html diff --git a/bin/akamai-mfa b/bin/akamai-mfa index 32b84fe..61efb13 100755 --- a/bin/akamai-mfa +++ b/bin/akamai-mfa @@ -42,7 +42,7 @@ import csv #: cli-mfa version, see also cli.json -__VERSION__ = "0.0.9.1" +__VERSION__ = "0.0.10" #: Log formatting aligned with other CLIs LOG_FMT = '%(asctime)s [%(levelname)s] %(threadName)s %(message)s' #: Near real-time, 30s ago is the most recent by default @@ -79,6 +79,7 @@ class cli(): def exit(code): sys.exit(code) + class MFAConfig(): """ Manage CLI MFA input parameters @@ -126,7 +127,7 @@ class MFAConfig(): # ad-hoc implementation to support MFA customers loaduserparser = subparsers.add_parser('importusers', help="Import users from a CSV file") - loaduserparser.add_argument("--file", "-f", help="CSV file used as input") + loaduserparser.add_argument("--file", "-f", required=True, help="CSV file used as input") loaduserparser.add_argument("--ignore-header", "-i", dest="ignore_header", default=False, action="store_true", help="Ignore the first line (if header is present)") loaduserparser.add_argument("--fullname-format", "-n", dest="fullname_format", default="{firstname} {lastname}", @@ -203,8 +204,11 @@ class BaseAPI(object): def post(self, url, params=None, json=None): url = f"{mfa_api_url}{url}" api_response = self._session.post(url, params=params, json=json) + if api_response.status_code != 200: + raise Exception(f"Akamai MFA API response error HTTP/{api_response.status_code}, {api_response.text}") return api_response.json() + class AkamaiMFAAuth(requests.auth.AuthBase): """ Akamai MFA API authentication for Requests. @@ -315,7 +319,8 @@ class IdentityManagementAPI(BaseAPI): def group_id(self, group_name): """ Return Group ID for a given group name - If the group name is not found or API return more than one result, return None + If the group name is not found return None + If multiple matches, raise an exception Args: group_name (string): Group name @@ -325,11 +330,14 @@ class IdentityManagementAPI(BaseAPI): """ group_info = self.get("/api/v1/control/groups", params={'name': group_name}) matches = group_info.get('result', {}).get('page', []) - match_count = len(matches) - if match_count != 1: - return None - else: - return matches[0].get('id') + group_id = None + for m in matches: + if m.get('name') == group_name: + if group_id is None: + group_id = m.get('id') + else: + raise Exception(f"Ambiguity, more than one group matching name {group_name}") + return group_id def create_group(self, group_name, group_summary=None): """Create a new group in MFA backend.""" diff --git a/cli.json b/cli.json index c6ffa2f..a4fda4e 100644 --- a/cli.json +++ b/cli.json @@ -5,7 +5,7 @@ "commands": [ { "name": "mfa", - "version": "0.0.9.1", + "version": "0.0.10", "description": "Akamai CLI for MFA" } ] diff --git a/test/nose2.cfg b/test/nose2.cfg new file mode 100644 index 0000000..17582cc --- /dev/null +++ b/test/nose2.cfg @@ -0,0 +1,7 @@ +[unittest] +plugins = nose2_html_report.html_report + nose2.plugins.attrib + +[html-report] +always-on = True +path = report.html \ No newline at end of file diff --git a/test/requirements.txt b/test/requirements.txt new file mode 100644 index 0000000..51a6173 --- /dev/null +++ b/test/requirements.txt @@ -0,0 +1,2 @@ +nose2 +nose2-html-report \ No newline at end of file diff --git a/test/test.py b/test/test.py new file mode 100644 index 0000000..f75065b --- /dev/null +++ b/test/test.py @@ -0,0 +1,165 @@ +# Copyright 2022 Akamai Technologies, Inc. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module replaces the old test.bash script +Tested with nose2: + +```bash + +# Optional +EDGERC_SECTION=mysection +# End Optional + +cd test +nose2 -v +open report.html +``` + +For some specific test +``` +nose2 -v test.TestUserGroupManagement.test_import_csv +``` +""" + +import unittest +import subprocess +import shlex +from pathlib import Path +import collections +import time +import os +import tempfile + +# Global variables +encoding = 'utf-8' + + +class CliMFATest(unittest.TestCase): + testdir = None + maindir = None + + def setUp(self): + self.testdir = Path(__file__).resolve().parent + self.maindir = Path(__file__).resolve().parent.parent + + def cli_command(self, *args): + command = shlex.split(f'python3 {self.maindir}/bin/akamai-mfa') + if os.environ.get('EDGERC_SECTION'): + command.extend(["--section", os.environ['EDGERC_SECTION']]) + command.extend(*args) + print("\nCOMMAND: ", " ".join(command)) + return command + + def cli_run(self, *args): + cmd = subprocess.Popen(self.cli_command(str(a) for a in args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return cmd + + def line_count(filename): + count = 0 + with open(filename) as f: + while next(f, False): + count += 1 + return count + + def duplicate_count(filename): + total_count = 0 + with open(filename) as infile: + counts = collections.Counter(l.strip() for l in infile) + for line, count in counts.most_common(): + if count > 1: + print(f"DUPLICATE[{count}] {line}") + total_count += 1 + return total_count + + +class TestEvents(CliMFATest): + + after = int(time.time() - 15 * 60) + before = int(time.time()) + + def test_events(self): + """ + Fetch MFA events + """ + cmd = self.cli_run("event", "--start", self.after, "--end", self.before) + stdout, stderr = cmd.communicate(timeout=60) + events = stdout.decode(encoding) + # event_count = len(events.splitlines()) + # self.assertGreater(event_count, 0, "We expect at least one threat event") + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + + +class TestUserGroupManagement(CliMFATest): + + def test_invite(self): + cmd = self.cli_run('users', 'invite', '-g', 'NEW_GROUP') + stdout, stderr = cmd.communicate() + print(stdout) + print(stderr) + self.assertEquals(cmd.returncode, 0, 'CLI return code must be 0') + + def test_import_csv(self): + domain = "example.org" + # Mockup CSV file with 1st row as header + csv_content = """email,firstName,lastName,username,GROUP\n""" \ + f"""user1@{domain},User1,Lastname1,user1.lastname1,NEW_GROUP\n""" \ + f"""user2@{domain},User2,Lastname2,user2.lastname2,NEW_GROUP\n""" \ + f"""user3@{domain},User3,Lastname3,user3.lastname3,NEW_GROUP\n""" \ + f"""user4@{domain},User4,Lastname4_Kichirō,user4.lastname4,NEW_GROUP\n""" \ + f"""user5@{domain},User5,Lastname5,user5.lastname5,NEW_GROUP2\n""" \ + f"""user6@{domain},User6,Lastname6,user6.lastname6,NEW_GROUP2\n""" + + csv_fp, csv_filename = tempfile.mkstemp(text=True) + with os.fdopen(csv_fp, 'w+t') as f: + f.write(csv_content) + + cmd = self.cli_run('importusers', '--ignore-header', '-f', csv_filename) + stdout, stderr = cmd.communicate() + print(stdout) + print(stderr) + os.unlink(csv_filename) + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + + def test_csvfiledoesntexist(self): + cmd = self.cli_run('importusers', '-f', 'csv_file_not_exist') + stdout, stderr = cmd.communicate() + self.assertGreater(cmd.returncode, 0, 'CLI return code must be strictly positive') + + +class TestCliMFA(CliMFATest): + + def test_no_edgerc(self): + """ + Call CLI with a bogus edgerc file, help should be displayed. + """ + cmd = self.cli_run('--edgerc', 'file_not_exist') + stdout, stderr = cmd.communicate() + output = stdout.decode(encoding) + error = stderr.decode(encoding) + self.assertIn("ERROR: No section", error) + self.assertEqual(cmd.returncode, 1, 'return code must be 1') + + def test_cli_version(self): + """ + Ensure version of the CLI is displayed + """ + cmd = self.cli_run('version') + stdout, stderr = cmd.communicate() + self.assertRegex(stdout.decode(encoding), r'[0-9]+\.[0-9]+\.[0-9]+\n', 'Version should be x.y.z') + self.assertEqual(cmd.returncode, 0, 'return code must be 0') + + +if __name__ == '__main__': + unittest.main()