Skip to content

Commit

Permalink
Fix issue with group API returning partial matching result, not exact…
Browse files Browse the repository at this point in the history
… name match
  • Loading branch information
bitonio committed Aug 22, 2022
1 parent 977d8ef commit b95478f
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
test/report.html
24 changes: 16 additions & 8 deletions bin/akamai-mfa
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import csv


#: cli-mfa version, see also cli.json
__VERSION__ = "0.0.9.1"
__VERSION__ = "0.0.10"
#: Log formatting aligned with other CLIs
LOG_FMT = '%(asctime)s [%(levelname)s] %(threadName)s %(message)s'
#: Near real-time, 30s ago is the most recent by default
Expand Down Expand Up @@ -79,6 +79,7 @@ class cli():
def exit(code):
sys.exit(code)


class MFAConfig():
"""
Manage CLI MFA input parameters
Expand Down Expand Up @@ -126,7 +127,7 @@ class MFAConfig():

# ad-hoc implementation to support MFA customers
loaduserparser = subparsers.add_parser('importusers', help="Import users from a CSV file")
loaduserparser.add_argument("--file", "-f", help="CSV file used as input")
loaduserparser.add_argument("--file", "-f", required=True, help="CSV file used as input")
loaduserparser.add_argument("--ignore-header", "-i", dest="ignore_header", default=False, action="store_true",
help="Ignore the first line (if header is present)")
loaduserparser.add_argument("--fullname-format", "-n", dest="fullname_format", default="{firstname} {lastname}",
Expand Down Expand Up @@ -203,8 +204,11 @@ class BaseAPI(object):
def post(self, url, params=None, json=None):
url = f"{mfa_api_url}{url}"
api_response = self._session.post(url, params=params, json=json)
if api_response.status_code != 200:
raise Exception(f"Akamai MFA API response error HTTP/{api_response.status_code}, {api_response.text}")
return api_response.json()


class AkamaiMFAAuth(requests.auth.AuthBase):
"""
Akamai MFA API authentication for Requests.
Expand Down Expand Up @@ -315,7 +319,8 @@ class IdentityManagementAPI(BaseAPI):
def group_id(self, group_name):
"""
Return Group ID for a given group name
If the group name is not found or API return more than one result, return None
If the group name is not found return None
If multiple matches, raise an exception
Args:
group_name (string): Group name
Expand All @@ -325,11 +330,14 @@ class IdentityManagementAPI(BaseAPI):
"""
group_info = self.get("/api/v1/control/groups", params={'name': group_name})
matches = group_info.get('result', {}).get('page', [])
match_count = len(matches)
if match_count != 1:
return None
else:
return matches[0].get('id')
group_id = None
for m in matches:
if m.get('name') == group_name:
if group_id is None:
group_id = m.get('id')
else:
raise Exception(f"Ambiguity, more than one group matching name {group_name}")
return group_id

def create_group(self, group_name, group_summary=None):
"""Create a new group in MFA backend."""
Expand Down
2 changes: 1 addition & 1 deletion cli.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"commands": [
{
"name": "mfa",
"version": "0.0.9.1",
"version": "0.0.10",
"description": "Akamai CLI for MFA"
}
]
Expand Down
7 changes: 7 additions & 0 deletions test/nose2.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[unittest]
plugins = nose2_html_report.html_report
nose2.plugins.attrib

[html-report]
always-on = True
path = report.html
2 changes: 2 additions & 0 deletions test/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
nose2
nose2-html-report
165 changes: 165 additions & 0 deletions test/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright 2022 Akamai Technologies, Inc. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module replaces the old test.bash script
Tested with nose2:
```bash
# Optional
EDGERC_SECTION=mysection
# End Optional
cd test
nose2 -v
open report.html
```
For some specific test
```
nose2 -v test.TestUserGroupManagement.test_import_csv
```
"""

import unittest
import subprocess
import shlex
from pathlib import Path
import collections
import time
import os
import tempfile

# Global variables
encoding = 'utf-8'


class CliMFATest(unittest.TestCase):
testdir = None
maindir = None

def setUp(self):
self.testdir = Path(__file__).resolve().parent
self.maindir = Path(__file__).resolve().parent.parent

def cli_command(self, *args):
command = shlex.split(f'python3 {self.maindir}/bin/akamai-mfa')
if os.environ.get('EDGERC_SECTION'):
command.extend(["--section", os.environ['EDGERC_SECTION']])
command.extend(*args)
print("\nCOMMAND: ", " ".join(command))
return command

def cli_run(self, *args):
cmd = subprocess.Popen(self.cli_command(str(a) for a in args), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return cmd

def line_count(filename):
count = 0
with open(filename) as f:
while next(f, False):
count += 1
return count

def duplicate_count(filename):
total_count = 0
with open(filename) as infile:
counts = collections.Counter(l.strip() for l in infile)
for line, count in counts.most_common():
if count > 1:
print(f"DUPLICATE[{count}] {line}")
total_count += 1
return total_count


class TestEvents(CliMFATest):

after = int(time.time() - 15 * 60)
before = int(time.time())

def test_events(self):
"""
Fetch MFA events
"""
cmd = self.cli_run("event", "--start", self.after, "--end", self.before)
stdout, stderr = cmd.communicate(timeout=60)
events = stdout.decode(encoding)
# event_count = len(events.splitlines())
# self.assertGreater(event_count, 0, "We expect at least one threat event")
self.assertEqual(cmd.returncode, 0, 'return code must be 0')


class TestUserGroupManagement(CliMFATest):

def test_invite(self):
cmd = self.cli_run('users', 'invite', '-g', 'NEW_GROUP')
stdout, stderr = cmd.communicate()
print(stdout)
print(stderr)
self.assertEquals(cmd.returncode, 0, 'CLI return code must be 0')

def test_import_csv(self):
domain = "example.org"
# Mockup CSV file with 1st row as header
csv_content = """email,firstName,lastName,username,GROUP\n""" \
f"""user1@{domain},User1,Lastname1,user1.lastname1,NEW_GROUP\n""" \
f"""user2@{domain},User2,Lastname2,user2.lastname2,NEW_GROUP\n""" \
f"""user3@{domain},User3,Lastname3,user3.lastname3,NEW_GROUP\n""" \
f"""user4@{domain},User4,Lastname4_Kichirō,user4.lastname4,NEW_GROUP\n""" \
f"""user5@{domain},User5,Lastname5,user5.lastname5,NEW_GROUP2\n""" \
f"""user6@{domain},User6,Lastname6,user6.lastname6,NEW_GROUP2\n"""

csv_fp, csv_filename = tempfile.mkstemp(text=True)
with os.fdopen(csv_fp, 'w+t') as f:
f.write(csv_content)

cmd = self.cli_run('importusers', '--ignore-header', '-f', csv_filename)
stdout, stderr = cmd.communicate()
print(stdout)
print(stderr)
os.unlink(csv_filename)
self.assertEqual(cmd.returncode, 0, 'return code must be 0')

def test_csvfiledoesntexist(self):
cmd = self.cli_run('importusers', '-f', 'csv_file_not_exist')
stdout, stderr = cmd.communicate()
self.assertGreater(cmd.returncode, 0, 'CLI return code must be strictly positive')


class TestCliMFA(CliMFATest):

def test_no_edgerc(self):
"""
Call CLI with a bogus edgerc file, help should be displayed.
"""
cmd = self.cli_run('--edgerc', 'file_not_exist')
stdout, stderr = cmd.communicate()
output = stdout.decode(encoding)
error = stderr.decode(encoding)
self.assertIn("ERROR: No section", error)
self.assertEqual(cmd.returncode, 1, 'return code must be 1')

def test_cli_version(self):
"""
Ensure version of the CLI is displayed
"""
cmd = self.cli_run('version')
stdout, stderr = cmd.communicate()
self.assertRegex(stdout.decode(encoding), r'[0-9]+\.[0-9]+\.[0-9]+\n', 'Version should be x.y.z')
self.assertEqual(cmd.returncode, 0, 'return code must be 0')


if __name__ == '__main__':
unittest.main()

0 comments on commit b95478f

Please sign in to comment.