diff --git a/README.md b/README.md index b4974bb..3a2554d 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,23 @@ -# Akamai CLI: MFA +# Akamai CLI: MFA -Welcome to the Akamai MFA module for Akamai CLI. +Welcome to the Akamai MFA package for Akamai CLI, *cli-mfa* for short. For more information about Akamai MFA, see https://www.akamai.com/mfa ## Table of contents -- [Akamai CLI: MFA](#akamai-cli-mfa) - - [Pre-requisites](#pre-requisites) - - [Akamai CLI](#akamai-cli) - - [Python](#python) - - [Getting started](#getting-started) - - [Field documentation](#field-documentation) - - [Command examples](#command-examples) - - [Streaming Akamai MFA events to a SIEM](#streaming-akamai-mfa-events-to-a-siem) - - [Support](#support) +- [Pre-requisites](#pre-requisites) + - [Akamai CLI](#akamai-cli) + - [Python 3](#python-3) +- [Getting started](#getting-started) + - [API Credentials to interact with Akamai MFA configuration](#api-credentials-to-interact-with-akamai-mfa-configuration) + - [API Credentials to fetch authentication events](#api-credentials-to-fetch-authentication-events) + - [Advanced usage](#advanced-usage) +- [Command examples](#command-examples) + - [General information and inline help](#general-information-and-inline-help) + - [Fetch authentification events](#fetch-authentification-events) + - [MFA identity management (users, groups...)](#mfa-identity-management-users-groups) +- [Streaming Akamai MFA events to a SIEM](#streaming-akamai-mfa-events-to-a-siem) +- [Support](#support) ## Pre-requisites @@ -27,16 +31,46 @@ Download the CLI from [https://techdocs.akamai.com/developer/docs/about-clis](ht For more information, please visit the [Getting Started video](https://www.youtube.com/watch?v=BbojoaTTT3A). -### Python +### Python 3 -Beyond Akamai CLI pre-requisites, `cli-mfa` requires Python 3.6 or greater on your system, as well as `pip`. +Beyond Akamai CLI pre-requisites, `cli-mfa` requires Python 3.7 or greater on your system, as well as Python Package manager `pip`. You can verify by opening a shell and type `python --version` and `pip --version` If you don't have Python on your system, go to [https://www.python.org](https://www.python.org). ## Getting started -You'll need to configure an logging integration in [Akamai Control Center](https://control.akamai.com). +`cli-mfa` allows to interact with different Akamai MFA components: + +- Configuration, to manage your various Akamai MFA setup (users, group, policy, ...) +- Logging Integration, to pull authentication events + +Each comes with its set of API credentials, so depending on the operation you're looking for, you may need one or two sets of credentials. Instructions provided below. + +### API Credentials to interact with Akamai MFA configuration + +For any other *cli-mfa* operations you will need you Akamai {OPEN} credentials. + +In [Akamai Control Center](https://control.akamai.com), make sure you create an API user +with the _Akamai MFA_ (`/amfa`) with `READ-WRITE` or `READ` permission. +If you choose `READ`, *cli-mfa* will be allowed to perform only API HTTP `GET` class. + +Upon user credential creation, you'll get a `.edgerc` file with 4 parameters. + +The value of the parameter is a integer you can obtain by navigating in Akamai Control Center: + +Example of `.edgerc` file: +``` +[default] +client_secret = client-secret-goes-here +host = akab-xxxx.luna.akamaiapis.net +access_token = your-access-token +client_token = your-client-token +``` + +### API Credentials to fetch authentication events + +To be able to use the command `akamai mfa events` you'll need to configure an logging integration in [Akamai Control Center](https://control.akamai.com). - Use left navigation (mega menu) and select Enterprise Center - Open **MFA** > **Integrations** @@ -45,7 +79,7 @@ You'll need to configure an logging integration in [Akamai Control Center](https - Set a name, e.g. *cli-mfa* - Click and **Save and Deploy** -Now, copy both Integration ID and Signing Key +Now, copy both **Integration ID** and **Signing Key** Add them both into your `~/.edgerc` file, either in the [default] section or one of your choice: @@ -55,25 +89,63 @@ mfa_integration_id = app_12345abcdef mfa_signing_key = some-random-key ``` -If you are working with multiple tenants, create a different integration credentials in each tenant and place them into different section of the `.edgerc` file. +### Advanced usage -## Field documentation +If you are working with multiple tenants, create a different integration credentials in each tenant and place them into different section of the `.edgerc` file. -Output is using JSON formatting, you'll find all the details about each attribute on our dedicated -section on [techdocs.akamai.com](https://techdocs.akamai.com/mfa/docs/field-sequence) +To verify your configuration, you may use `akamai mfa info`, see example below. ## Command examples -Inline general help +### General information and inline help + +General help: ``` % akamai mfa --help ``` -Inline help for auth event +Help about fetching Akamai MFA authentication events: ``` % akamai mfa event --help ``` +Information about your *cli-mfa* configuration +``` +% akamai mfa info +``` +output: +```json +{ + "general": { + "cli-mfa_version": "1.2.3", + "python": "3.8.15 (default, Oct 11 2022, 21:52:37)", + "akamai_cli": "1.5.1", + "edgerc_file": "~/.edgerc", + "edgerc_section": "default" + }, + "amfa-logging-api": { + "mfa_integration_id": "app_12345abcdef", + "mfa_signing_key": "************************abcd" + }, + "akamai-open-api": { + "host": "akab-xxxx.luna.akamaiapis.net", + "access_token": "your-access-token", + "client_token": "your-client-token", + "client_secret": "**********client-secret-goes-here", + "contract_id": "1-123-456" + } +} +``` + +Version of `cli-mfa` + +``` +% akamai mfa version +1.2.3 +``` + +### Fetch authentification events + Try to pull MFA security events with the following examples. When ``--start`` is omitted, start is set to 5 minutes ago. When ``--end`` is omitted, end takes now minutes 30 seconds. @@ -82,11 +154,11 @@ When ``--end`` is omitted, end takes now minutes 30 seconds. % akamai mfa event ``` -Version of `cli-mfa` +### MFA identity management (users, groups...) +List of all the users: ``` -% akamai mfa version -1.2.3 +% akamai users list ``` ## Streaming Akamai MFA events to a SIEM diff --git a/bin/akamai-mfa b/bin/akamai-mfa index 61efb13..5fa3dba 100755 --- a/bin/akamai-mfa +++ b/bin/akamai-mfa @@ -14,457 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" -cli-mfa -Command Line Input tool for Akamai MFA -:url: https://github.com/akamai/cli-mfa -:author: Antoine Drochon - -Field description can be found at: -https://learn.akamai.com/en-us/webhelp/enterprise-mfa/akamai-mfa-logs-from-splunk-application/GUID-0F17296F-90F3-483E-AFDE-F98FBC51A8AC.html - -""" - -from ctypes import ArgumentError -import logging -import requests -import hashlib -import hmac -import time -import datetime -from http.client import HTTPConnection -import argparse -import configparser import sys import os -import json -import csv - - -#: cli-mfa version, see also cli.json -__VERSION__ = "0.0.10" -#: Log formatting aligned with other CLIs -LOG_FMT = '%(asctime)s [%(levelname)s] %(threadName)s %(message)s' -#: Near real-time, 30s ago is the most recent by default -MOST_RECENT_PADDING = 30 - -log_file = None -logger = logging.getLogger() -mfa_api_url = "https://mfa.akamai.com" -mfa_api_ver = "v1" -tail_pull_interval = 60 # Default is 60 seconds -epilog = '''Copyright (C) Akamai Technologies, Inc\n''' \ - '''Visit http://github.com/akamai/cli-mfa for detailed documentation''' - -class cli(): - """ - TODO: share this module with other cli-* - """ - - @staticmethod - def print(s): - sys.stdout.write("%s\n" % s) - sys.stdout.flush() - - @staticmethod - def print_error(s): - sys.stderr.write("%s\n" % s) - sys.stderr.flush() - - @staticmethod - def current_command(): - return "akamai mfa " + " ".join(sys.argv[1:]) - - @staticmethod - def exit(code): - sys.exit(code) - - -class MFAConfig(): - """ - Manage CLI MFA input parameters - """ - - CONFIG_KEYS = [ - 'mfa_integration_id', - 'mfa_signing_key', - 'mfa_api_integration_id', - 'mfa_api_signing_key' - ] - - def __init__(self): - - self.mfa_integration_id = None - self.mfa_signing_key = None - - # 1. Scan parameters from the CLI arguments +sys.path.append(os.path.abspath('../src')) - self.parser = argparse.ArgumentParser(prog="akamai mfa", epilog=epilog, - description='Process command line options.', - formatter_class=argparse.RawTextHelpFormatter) - - subparsers = self.parser.add_subparsers(dest='command', help='Main command') - subparsers.add_parser('version', help="Display CLI-MFA version") - eventparser = subparsers.add_parser('event', help="Dump MFA events") - eventparser.add_argument("--start", "-s", default=None, type=int, help="Scan for events after this epoch") - eventparser.add_argument("--end", "-e", default=None, type=int, help="Scan for events before this epoch") - eventparser.add_argument("--tail", "-f", default=False, action="store_true", - help="""Do not stop when most recent log is reached, rather - wait for additional data to be appended to the input.""") - eventparser.add_argument("--noreceipt", default=False, action="store_true", - help="Discard the receipt attribute to save log space") - - # User sub parser, will replace the loaduserparser down the road - user_parser = subparsers.add_parser('users', help="User operations (search, import, etc...)") - # userparser.add_argument("action", choices=['search', 'import', 'invite']) - user_action_parser = user_parser.add_subparsers(dest="action", help='User operations (search, import, etc...)') - - # usersearch_parser = user_action_parser.add_parser("search", help="Search users") - # usersearch_parser.add_argument('-g', '--group', dest="group", help='Limit search to users in this group') - # usersearch_parser.add_argument(dest='filter', default="*", help="Pattern to search user, default is *") - invite_parser = user_action_parser.add_parser("invite", help="Send enrollement invite over email") - invite_parser.add_argument('-g', '--group', dest="group", help='Send invite to member of this group') - - # ad-hoc implementation to support MFA customers - loaduserparser = subparsers.add_parser('importusers', help="Import users from a CSV file") - loaduserparser.add_argument("--file", "-f", required=True, help="CSV file used as input") - loaduserparser.add_argument("--ignore-header", "-i", dest="ignore_header", default=False, action="store_true", - help="Ignore the first line (if header is present)") - loaduserparser.add_argument("--fullname-format", "-n", dest="fullname_format", default="{firstname} {lastname}", - help="Full name formatting (default is '{firstname} {lastname}')") - - self.parser.add_argument("--edgerc", type=str, default="~/.edgerc", - help='Location of the credentials file (default is "~/.edgerc")') - - self.parser.add_argument("--section", default="default", help="Section inside .edgerc, default is [default]") - self.parser.add_argument("--debug", '-d', action="store_true", default=False, help="Debug mode") - self.parser.add_argument("--user-agent-prefix", dest='ua_prefix', default='Akamai-CLI', help=argparse.SUPPRESS) - - try: - scanned_cli_args = self.parser.parse_args() - cli_args = vars(scanned_cli_args) - for option in cli_args: - setattr(self, option, cli_args[option]) - except Exception as e: - logging.exception(e) - sys.exit(1) - - # 2. Load MFA params from .edgerc - edgerc_config = configparser.ConfigParser() - edgerc_config.read(os.path.expanduser(self.edgerc)) - if not edgerc_config.has_section(self.section): - err_msg = "ERROR: No section named %s was found in your .edgerc file\n" % self.section - err_msg += "ERROR: Please generate credentials for the script functionality\n" - err_msg += "ERROR: and run 'python gen_edgerc.py %s' to generate the credential file\n" % self.edgerc - sys.exit(err_msg) - for key, value in edgerc_config.items(self.section): - if key in MFAConfig.CONFIG_KEYS: - setattr(self, key, value) - - # And the environment variables - if os.getenv('MFA_INTEGRATION_ID'): - self.integration_id = os.getenv('MFA_INTEGRATION_ID') - if os.getenv('MFA_SIGNING_KEY'): - self.signing_key = os.getenv('MFA_SIGNING_KEY') - if os.getenv('MFA_API_INTEGRATION_ID'): - self.api_integration_id = os.getenv('MFA_API_INTEGRATION_ID') - if os.getenv('MFA_API_SIGNING_KEY'): - self.api_signing_key = os.getenv('MFA_API_SIGNING_KEY') - - self.validate() - - def validate(self): - if not hasattr(self, 'mfa_integration_id'): - raise RuntimeError("Missing mfa_integration_id") - if not hasattr(self, 'mfa_signing_key'): - raise RuntimeError("Missing mfa_signing_key") - - def display_help(self): - self.parser.print_help() - - -class BaseAPI(object): - - api_version = None # Specific API can be overriden on derivated class - - def __init__(self, config, signing_key=None, integration_id=None): - self.config = config - self._session = requests.Session() - self._session.headers.update({'User-Agent': f'{config.ua_prefix} cli-mfa/{__VERSION__}'}) - if not signing_key or not integration_id: # use the default integration creds - self._session.auth = AkamaiMFAAuth(config.mfa_signing_key, config.mfa_integration_id, self.api_version) - else: - self._session.auth = AkamaiMFAAuth(signing_key, integration_id, self.api_version) - - def get(self, url, params=None): - url = f"{mfa_api_url}{url}" - api_response = self._session.get(url, params=params) - return api_response.json() - - def post(self, url, params=None, json=None): - url = f"{mfa_api_url}{url}" - api_response = self._session.post(url, params=params, json=json) - if api_response.status_code != 200: - raise Exception(f"Akamai MFA API response error HTTP/{api_response.status_code}, {api_response.text}") - return api_response.json() - - -class AkamaiMFAAuth(requests.auth.AuthBase): - """ - Akamai MFA API authentication for Requests. - """ - - def __init__(self, signing_key, integration_id, api_version=None): - """ - Args: - config (MFAConfig): cli-mfa config - api_version (_type_): API version in the backend - optional - """ - self._signing_key = signing_key - self._integration_id = integration_id - self._content_type_json = {'Content-Type': 'application/json'} - self._api_version = '2021-07-15' - if api_version: - self._api_version = api_version - - def get_signature(self, t): - signature = hmac.new( - key=self._signing_key.encode("utf-8"), - msg=str(t).encode("utf-8"), - digestmod=hashlib.sha256).hexdigest() - return signature - - def __call__(self, r): - now = str(int(time.time())) - signature = self.get_signature(now) - self._headers = { - 'X-Pushzero-Id': self._integration_id, - 'X-Pushzero-Signature': signature, - 'X-Pushzero-Signature-Time': now, - 'X-Api-Version': self._api_version} - r.headers.update(self._headers) - r.headers.update(self._content_type_json) - return r - - -class EventAPI(object): - - @staticmethod - def pull_events(): - session = requests.Session() - session.headers.update({'User-Agent': f'{config.ua_prefix} cli-mfa/{__VERSION__}'}) - session.auth = AkamaiMFAAuth(config.mfa_signing_key, config.mfa_integration_id) - - api_url = f'{mfa_api_url}/api/{mfa_api_ver}/control/reports/auths' - scan_end = datetime.datetime.utcnow() - datetime.timedelta(seconds=MOST_RECENT_PADDING) - scan_start = scan_end - datetime.timedelta(minutes=5) - if config.end: - scan_end = datetime.datetime.utcfromtimestamp(config.end) - if config.start: - scan_start = datetime.datetime.utcfromtimestamp(config.start) - - while True: # main loop, used with the --tail/-f mode - loop_start = time.time() - continuation_token = None - params = { - 'after': scan_start.isoformat(), - 'before': scan_end.isoformat() - } - - while True: # Iterate until continuation_token attribute is missing - payload = {} - if continuation_token: - payload = {'continuation_token': continuation_token} - r = session.post(api_url, params=params, json=payload) - - api_response = r.json() - continuation_token = api_response.get('continuation_token') - scanned_events = api_response.get('result', {}).get('data', []) - - for mfa_event in scanned_events: - if config.noreceipt: - mfa_event.pop('receipt') - print(json.dumps(mfa_event)) - sys.stdout.flush() - - if not continuation_token: - break - - if config.tail: - wait = tail_pull_interval - (time.time() - loop_start) - logging.debug("Wait %s sec..." % wait) - time.sleep(wait) - scan_start = scan_end # next iteration we stich, start is the previous end - scan_end = datetime.datetime.utcnow() - datetime.timedelta(seconds=MOST_RECENT_PADDING) - else: - break - - -class IdentityManagementAPI(BaseAPI): - - api_version = "2022-02-22" +import amfa.main # noqa: E402 +if __name__ == "__main__": """ - Manage users/groups on Akamai MFA - - Args: - object (_type_): _description_ + cli-mfa + Command Line Input tool for Akamai MFA + :url: https://github.com/akamai/cli-mfa + :author: Antoine Drochon """ - def list_groups(self): - """ - Fetch the list of groups visible in Akamai MFA. - """ - return self.get("/api/v1/control/groups") - - def group_id(self, group_name): - """ - Return Group ID for a given group name - If the group name is not found return None - If multiple matches, raise an exception - - Args: - group_name (string): Group name - - Returns: - string: ID of the group found - """ - group_info = self.get("/api/v1/control/groups", params={'name': group_name}) - matches = group_info.get('result', {}).get('page', []) - group_id = None - for m in matches: - if m.get('name') == group_name: - if group_id is None: - group_id = m.get('id') - else: - raise Exception(f"Ambiguity, more than one group matching name {group_name}") - return group_id - - def create_group(self, group_name, group_summary=None): - """Create a new group in MFA backend.""" - payload = { - "name": group_name, - "summary": group_summary - } - return self.post("/api/v1/control/groups", json=payload) - - def create_users(self, users): - payload = {"ignore_conflicts": True, "users": users} - logger.debug("create_users payload %s" % payload) - return self.post("/api/v1/control/users/bulk/create", json=payload) - - def associate_users_to_group(self, usernames, group_id): - logger.debug(f"Adding users {usernames} to group {group_id}") - return self.post(f"/api/v1/control/groups/{group_id}/users/bulk/associate", json=usernames) - - def import_users_from_csv(self, csv_filename, ignore_header, fullname_format): - """ - Read a CSV file containing user identity, and one group - Create the users, groups and association in the Akamai MFA backend. - - Args: - csv_filename (_type_): path to the csv file - ignore_header (_type_): ignore the first line of the file - fullname_format (_type_): How the user Fullname should be formatted - """ - new_users = [] - scanned_groups = set() - user_groupname_map = {} - - # Parse the CSV file to prepare the API calls - with open(csv_filename) as csvfile: - reader = csv.reader(csvfile) - if ignore_header: - next(reader) - for row in reader: - newuser = { - 'full_name': fullname_format.format(firstname=row[1], lastname=row[2]), - 'last_name': row[2], - 'email': row[0], - 'username': row[3] - } - user_groupname_map[row[3]] = row[4] - scanned_groups.add(row[4]) - new_users.append(newuser) - - # First, let's figure out groups, existing vs. ones in the input file - count_group_added = 0 - count_group_existing = 0 - existing_groups = self.list_groups() - groups_map = {g.get('id'): g.get('name') for g in existing_groups.get('result').get('page')} - reverse_groups_map = {g.get('name'): g.get('id') for g in existing_groups.get('result').get('page')} - for g in scanned_groups: - if g not in groups_map.values(): - r = self.create_group(g, f"Created with command: {cli.current_command()}") - groups_map[r.get('result').get('id')] = r.get('result').get('name') - reverse_groups_map[r.get('result').get('name')] = r.get('result').get('id') - count_group_added += 1 - else: - count_group_existing += 1 - logger.debug(f"Group {g} was already present, not added.") - logger.debug("Final groups: %s" % groups_map) - cli.print(f"{count_group_added} group(s) added, {count_group_existing} group(s) were already existing") - - # Second, bulk user insertion - new_users_response = self.create_users(new_users) - logger.debug("new_users: %s" % new_users_response) - count_user_added = len(new_users_response.get('result', {}).get('created', [])) - count_user_exist = len(new_users_response.get('result', {}).get('existing', [])) - cli.print(f"{count_user_added} user(s) added, {count_user_exist} user(s) where already existing and left unchanged") - - # Third, associate user to group - for new_user in new_users_response.get('result', {}).get('created', []): - group_id = reverse_groups_map[user_groupname_map[new_user.get('username')]] - self.associate_users_to_group([new_user.get('id')], group_id) - - def enroll_users(self, group_name): - """ - Sends off emails to users in a list of groups. - """ - if not isinstance(group_name, str): - raise ArgumentError("groups must be an string") - - group_id = self.group_id(group_name) - if group_id is None: - cli.print_error("Group %s not found." % group_name) - cli.exit(2) - payload = {'exclude_enrolled_users': True} - payload["groups"] = [group_id] - cli.print("Sending enrollment email to ununrolled users in group %s..." % group_name) - response = self.post("/api/v1/control/email/enroll_users", json=payload) - logger.debug(response) - - -if __name__ == "__main__": - - config = MFAConfig() - - logging.basicConfig(filename=log_file, level=logging.INFO, format=LOG_FMT) - - if config.debug: - HTTPConnection.debuglevel = 1 - logger.setLevel(logging.DEBUG) - requests_log = logging.getLogger("urllib3") - requests_log.setLevel(logging.DEBUG) - requests_log.propagate = True - - if config.command is None: - config.display_help() - sys.exit(1) - elif config.command == "version": - print(__VERSION__) - sys.exit(0) - elif config.command == 'event': - EventAPI.pull_events() - elif config.command == 'users': - identity = IdentityManagementAPI(config, config.mfa_api_signing_key, config.mfa_api_integration_id) - if config.action == "invite": - identity.enroll_users(config.group) - else: - cli.write("not supported") - cli.exit(1) - elif config.command == 'importusers': - logger.debug("starting import...") - identity = IdentityManagementAPI(config, config.mfa_api_signing_key, config.mfa_api_integration_id) - identity.import_users_from_csv(config.file, config.ignore_header, config.fullname_format) - else: - raise ValueError(f"Unsupported command: {config.command}") + amfa.main.run() diff --git a/cli.json b/cli.json index a4fda4e..9b49b33 100644 --- a/cli.json +++ b/cli.json @@ -1,12 +1,12 @@ { "requirements": { - "python": "3.6.0" + "python": "3.7.0" }, "commands": [ { "name": "mfa", - "version": "0.0.10", - "description": "Akamai CLI for MFA" + "version": "0.1.0", + "description": "Akamai CLI for Akamai MFA" } ] } diff --git a/src/amfa/__init__.py b/src/amfa/__init__.py new file mode 100644 index 0000000..7f5e709 --- /dev/null +++ b/src/amfa/__init__.py @@ -0,0 +1,6 @@ +import logging + +#: cli-mfa version, see also cli.json +__VERSION__ = "0.1.0" + +logger = logging.getLogger(__name__) diff --git a/src/amfa/cli.py b/src/amfa/cli.py new file mode 100644 index 0000000..87b57a8 --- /dev/null +++ b/src/amfa/cli.py @@ -0,0 +1,28 @@ +# Python modules +import sys +from math import ceil + + +def print(s): + sys.stdout.write("%s\n" % s) + sys.stdout.flush() + + +def print_error(s): + sys.stderr.write("%s\n" % s) + sys.stderr.flush() + + +def current_command(): + return "akamai mfa " + " ".join(sys.argv[1:]) + + +def exit(code): + sys.exit(code) + + +def mask_string(s, percentage=0.9): + if s is None: + return None + mask_chars = ceil(len(s) * percentage) + return f'{"*" * mask_chars}{s[mask_chars:]}' diff --git a/src/amfa/config.py b/src/amfa/config.py new file mode 100644 index 0000000..1f016a6 --- /dev/null +++ b/src/amfa/config.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 + +# Copyright 2022 Akamai Technologies, Inc. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import configparser +import os +import logging +import sys + +from . import __VERSION__ +from . import cli + + +#: Log formatting aligned with other CLIs +LOG_FMT = '%(asctime)s [%(levelname)s] %(threadName)s %(message)s' +#: Near real-time, 30s ago is the most recent by default +MOST_RECENT_PADDING = 30 + +epilog = '''Copyright (C) Akamai Technologies, Inc\n''' \ + '''Visit http://github.com/akamai/cli-mfa for detailed documentation''' + +#: How often we pull data in --tail mode (default is 60 seconds) +tail_pull_interval = 60 + + +class MFAConfig(): + """ + Manage CLI MFA input parameters + """ + + CONFIG_KEYS = [ + 'mfa_integration_id', # SIEM Log Integration + 'mfa_signing_key', # SIEM Log Integration + 'mfa_api_integration_id', # Service API prior to {OPEN} launch - experimental + 'mfa_api_signing_key', # Service API prior to {OPEN} launch - experimental + 'client_secret', # {OPEN} + 'host', # {OPEN} + 'access_token', # {OPEN} + 'client_token', # {OPEN} + 'contract_id', # {OPEN} + 'accountkey' # {OPEN} + ] + + def __init__(self): + + self.mfa_integration_id = None + self.mfa_signing_key = None + self.mfa_api_integration_id = None + self.mfa_api_signing_key = None + + # 1. Scan parameters from the CLI arguments + + self.parser = argparse.ArgumentParser(prog="akamai mfa", epilog=epilog, + description='Process command line options.', + formatter_class=argparse.RawTextHelpFormatter) + + subparsers = self.parser.add_subparsers(dest='command', help='Main command') + subparsers.add_parser('version', help="Display CLI-MFA version") + subparsers.add_parser('info', help="Show CLI-MFA configuration informations") + eventparser = subparsers.add_parser('event', help="Dump MFA events") + eventparser.add_argument("--start", "-s", default=None, type=int, help="Scan for events after this epoch") + eventparser.add_argument("--end", "-e", default=None, type=int, help="Scan for events before this epoch") + eventparser.add_argument("--tail", "-f", default=False, action="store_true", + help="""Do not stop when most recent log is reached, rather + wait for additional data to be appended to the input.""") + eventparser.add_argument("--noreceipt", default=False, action="store_true", + help="Discard the receipt attribute to save log space") + + # User sub parser, will replace the loaduserparser down the road + user_parser = subparsers.add_parser('users', help="User operations (search, import, etc...)") + # userparser.add_argument("action", choices=['search', 'import', 'invite']) + user_action_parser = user_parser.add_subparsers(dest="action", help='User operations (search, import, etc...)') + + # usersearch_parser = user_action_parser.add_parser("search", help="Search users") + # usersearch_parser.add_argument('-g', '--group', dest="group", help='Limit search to users in this group') + # usersearch_parser.add_argument(dest='filter', default="*", help="Pattern to search user, default is *") + invite_parser = user_action_parser.add_parser("invite", help="Send enrollement invite over email") + invite_parser.add_argument('-g', '--group', dest="group", help='Send invite to member of this group') + listuser_parser = user_action_parser.add_parser("list", help="List users") + listuser_parser.add_argument('--json', action="store_true", help='Format list of users as JSON') + listuser_parser.add_argument('--include-devices', dest="include_devices", default=False, action="store_true", + help='Include device details for listed users') + + # ad-hoc implementation to support MFA customers + loaduserparser = subparsers.add_parser('importusers', help="Import users from a CSV file") + loaduserparser.add_argument("--file", "-f", required=True, help="CSV file used as input") + loaduserparser.add_argument("--ignore-header", "-i", dest="ignore_header", default=False, action="store_true", + help="Ignore the first line (if header is present)") + loaduserparser.add_argument("--fullname-format", "-n", dest="fullname_format", default="{firstname} {lastname}", + help="Full name formatting (default is '{firstname} {lastname}')") + + self.parser.add_argument("--edgerc", type=str, default="~/.edgerc", + help='Location of the credentials file (default is "~/.edgerc")') + + self.parser.add_argument("--section", '-c', + help="Section inside .edgerc, default is [default] ($AKAMAI_EDGERC_SECTION)", + default=os.environ.get("AKAMAI_EDGERC_SECTION", "default")) + self.parser.add_argument("--accountkey", "--account-key", help="Account Switch Key", + default=os.environ.get('AKAMAI_EDGERC_ACCOUNT_KEY')) + self.parser.add_argument("--debug", '-d', action="store_true", default=False, help="Debug mode") + self.parser.add_argument("--logfile", dest="logfile", help="Path to log file") + self.parser.add_argument("--loglevel", dest="loglevel", choices=["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"], + default="WARNING", help="Log level") + self.parser.add_argument("--user-agent-prefix", dest='ua_prefix', default='Akamai-CLI', help=argparse.SUPPRESS) + + try: + scanned_cli_args = self.parser.parse_args() + cli_args = vars(scanned_cli_args) + for option in cli_args: + setattr(self, option, cli_args[option]) + except Exception as e: + logging.exception(e) + sys.exit(1) + + # 2. Load MFA params from .edgerc + edgerc_config = configparser.ConfigParser() + edgerc_config.read(os.path.expanduser(self.edgerc)) + if not edgerc_config.has_section(self.section): + err_msg = "ERROR: No section named %s was found in your .edgerc file\n" % self.section + err_msg += "ERROR: Please generate credentials for the script functionality\n" + err_msg += "ERROR: and run 'python gen_edgerc.py %s' to generate the credential file\n" % self.edgerc + sys.exit(err_msg) + for key, value in edgerc_config.items(self.section): + if key in MFAConfig.CONFIG_KEYS: + setattr(self, key, value) + + # And the environment variables + if os.getenv('MFA_INTEGRATION_ID'): + self.integration_id = os.getenv('MFA_INTEGRATION_ID') + if os.getenv('MFA_SIGNING_KEY'): + self.signing_key = os.getenv('MFA_SIGNING_KEY') + if os.getenv('MFA_API_INTEGRATION_ID'): + self.mfa_api_integration_id = os.getenv('MFA_API_INTEGRATION_ID') + if os.getenv('MFA_API_SIGNING_KEY'): + self.mfa_api_signing_key = os.getenv('MFA_API_SIGNING_KEY') + + def display_help(self): + self.parser.print_help() + + def info(self): + """ + Display information about the current CLI. + Helpful for troubleshooting + """ + config_info = { + "general": { + "cli-mfa_version": __VERSION__, + "python": sys.version.replace("\n", " "), + "akamai_cli": os.environ.get("AKAMAI_CLI_VERSION"), + "edgerc_file": self.edgerc, + "edgerc_section": self.section + } + } + + # Logging Integration API + if self.mfa_integration_id or self.mfa_signing_key: + config_info["amfa-logging-api"] = {} + if self.mfa_integration_id: + config_info["amfa-logging-api"]["mfa_integration_id"] = self.mfa_integration_id + if self.mfa_signing_key: + config_info["amfa-logging-api"]["mfa_signing_key"] = cli.mask_string(self.mfa_signing_key) + + # Akamai MFA Service API (unsupported, do not use) + if self.mfa_api_integration_id or self.mfa_api_signing_key: + config_info["amfa-service-api"] = {} + if self.mfa_api_integration_id: + config_info["amfa-service-api"]["mfa_api_integration_id"] = self.mfa_api_integration_id + if self.mfa_api_signing_key: + config_info["amfa-service-api"]["mfa_api_signing_key"] = cli.mask_string(self.mfa_api_signing_key) + + # {OPEN} API + config_info["akamai-open-api"] = {} + if hasattr(self, "host"): + config_info["akamai-open-api"]["host"] = self.host + if hasattr(self, "access_token"): + config_info["akamai-open-api"]["access_token"] = cli.mask_string(self.access_token) + if hasattr(self, "client_token"): + config_info["akamai-open-api"]["client_token"] = cli.mask_string(self.client_token) + if hasattr(self, "client_secret"): + config_info["akamai-open-api"]["client_secret"] = cli.mask_string(self.client_secret) + if hasattr(self, "accountkey") and self.accountkey: + config_info["akamai-open-api"]["accountkey"] = self.accountkey + if hasattr(self, "contract_id") and self.contract_id: + config_info["akamai-open-api"]["contract_id"] = self.contract_id + + return config_info diff --git a/src/amfa/core.py b/src/amfa/core.py new file mode 100644 index 0000000..5bd0ed8 --- /dev/null +++ b/src/amfa/core.py @@ -0,0 +1,380 @@ +from ctypes import ArgumentError +import logging +import hashlib +import hmac +import time +import datetime + +import sys +import json +import csv + +# 3rd party modules +import requests +from akamai.edgegrid import EdgeGridAuth + +# local modules +from . import cli +from . import config +from . import __VERSION__ +from . import logger + + +class BaseAPIHelper(object): + + api_version = None # Specific API can be overriden on derivated class + + def __init__(self, cli_config): + self.config = cli_config + self._session = requests.Session() + self._session.headers.update({'User-Agent': self.user_agent}) + self.baseurl = "" + + @property + def user_agent(self): + return f'{self.config.ua_prefix} cli-mfa/{__VERSION__}' + + def prepare_querystring(self, qs_args): + """Allow inherting class to implement logic to inject querystring arguments.""" + return qs_args + + def get(self, url, params=None): + url = f"{self.baseurl}{url}" + api_response = self._session.get(url, params=self.prepare_querystring(params)) + if api_response.status_code != requests.codes.ok: + error_msg = f"FATAL: API call {api_response.request.method} {api_response.url} " \ + f"returned HTTP/{api_response.status_code}" + cli.print_error(error_msg) + cli.print_error(api_response.text) + cli.exit(1) + return api_response.json() + + def post(self, url, params=None, json=None): + url = f"{self.baseurl}{url}" + api_response = self._session.post(url, params=self.prepare_querystring(params), json=json) + if api_response.status_code != 200: + raise Exception(f"Akamai MFA API response error HTTP/{api_response.status_code}, {api_response.text}") + return api_response.json() + + +class MFAServiceAPIHelper(BaseAPIHelper): + """ + Akamai MFA service API (experimental) + DO NOT USE, use {OPEN} API through `OpenAPIHelper` whenever possible. + """ + + mfa_api_url = "https://mfa.akamai.com" + mfa_api_ver = "v1" + + def __init__(self, cli_config, signing_key=None, integration_id=None): + super().__init__(cli_config) + self.baseurl = MFAServiceAPIHelper.mfa_api_url + if not signing_key or not integration_id: # use the default integration creds + self._session.auth = AkamaiMFAAuth(self.config.mfa_signing_key, self.config.mfa_integration_id, + self.api_version) + else: + self._session.auth = AkamaiMFAAuth(signing_key, integration_id, self.api_version) + + +class MFALoggingAPIHelper(BaseAPIHelper): + """ + Akamai MFA Logging Integration API + """ + + mfa_api_url = "https://mfa.akamai.com" + mfa_api_ver = "v1" + + def __init__(self, cli_config): + super().__init__(cli_config) + self.baseurl = MFAServiceAPIHelper.mfa_api_url + self._session.auth = AkamaiMFAAuth(self.config.mfa_signing_key, self.config.mfa_integration_id) + + +class OpenAPIHelper(BaseAPIHelper): + """ + Akamai MFA {OPEN} API + See https://techdocs.akamai.com/mfa/reference/api for more information + """ + def __init__(self, cli_config): + super().__init__(cli_config) + self.baseurl = f"https://{self.config.host}" + self._session.auth = EdgeGridAuth( + client_token=self.config.client_token, + client_secret=self.config.client_secret, + access_token=self.config.access_token + ) + + def prepare_querystring(self, qs_args): + args = super().prepare_querystring(qs_args) + if isinstance(args, dict): + final_params = qs_args.copy() + else: + final_params = {} + if self.config.accountkey: + final_params.update({"accountSwitchKey": self.config.accountkey}) + if hasattr(self.config, "contract_id") and self.config.contract_id: + final_params.update({"contractId": self.config.contract_id}) + final_params.update({"ua": self.user_agent}) + return final_params + + +class AkamaiMFAAuth(requests.auth.AuthBase): + """ + Akamai MFA Integration authentication for Requests. + """ + + def __init__(self, signing_key, integration_id, api_version=None): + """ + Args: + config (MFAConfig): cli-mfa config + api_version (_type_): API version in the backend - optional + """ + self._signing_key = signing_key + self._integration_id = integration_id + self._content_type_json = {'Content-Type': 'application/json'} + self._api_version = '2021-07-15' + if api_version: + self._api_version = api_version + + def get_signature(self, t): + signature = hmac.new( + key=self._signing_key.encode("utf-8"), + msg=str(t).encode("utf-8"), + digestmod=hashlib.sha256).hexdigest() + return signature + + def __call__(self, r): + now = str(int(time.time())) + signature = self.get_signature(now) + self._headers = { + 'X-Pushzero-Id': self._integration_id, + 'X-Pushzero-Signature': signature, + 'X-Pushzero-Signature-Time': now, + 'X-Api-Version': self._api_version} + r.headers.update(self._headers) + r.headers.update(self._content_type_json) + return r + + +class EventAPI(object): + + def __init__(self, cli_config): + self.config = cli_config + + def pull_events(self): + api_helper = MFALoggingAPIHelper(self.config) + api_url = f'/api/{api_helper.mfa_api_ver}/control/reports/auths' + scan_end = datetime.datetime.utcnow() - datetime.timedelta(seconds=config.MOST_RECENT_PADDING) + scan_start = scan_end - datetime.timedelta(minutes=5) + if self.config.end: + scan_end = datetime.datetime.utcfromtimestamp(self.config.end) + if self.config.start: + scan_start = datetime.datetime.utcfromtimestamp(self.config.start) + + while True: # main loop, used with the --tail/-f mode + loop_start = time.time() + continuation_token = None + params = { + 'after': scan_start.isoformat(), + 'before': scan_end.isoformat() + } + + while True: # Iterate until continuation_token attribute is missing + payload = {} + if continuation_token: + payload = {'continuation_token': continuation_token} + api_response = api_helper.post(api_url, params=params, json=payload) + + continuation_token = api_response.get('continuation_token') + scanned_events = api_response.get('result', {}).get('data', []) + + for mfa_event in scanned_events: + if self.config.noreceipt: + mfa_event.pop('receipt') + print(json.dumps(mfa_event)) + sys.stdout.flush() + + if not continuation_token: + break + + if self.config.tail: + wait = self.config.tail_pull_interval - (time.time() - loop_start) + logging.debug("Wait %s sec..." % wait) + time.sleep(wait) + scan_start = scan_end # next iteration we stich, start is the previous end + scan_end = datetime.datetime.utcnow() - datetime.timedelta(seconds=config.MOST_RECENT_PADDING) + else: + break + + +class IdentityManagementAPI(object): + """ + Manage users/groups on Akamai MFA + """ + + api_version = "2022-02-22" + + def __init__(self, cli_config): + """ + This class use two different API set with different auth logic. + We prepare them here based on what's available in the .edgerc file. + """ + self.service_api_helper = None + self.open_api_helper = None + self.config = cli_config + if cli_config.mfa_api_signing_key and cli_config.mfa_api_integration_id: + self.service_api_helper = MFAServiceAPIHelper( + cli_config, + cli_config.mfa_api_signing_key, + cli_config.mfa_api_integration_id + ) + if cli_config.host: + self.open_api_helper = OpenAPIHelper(cli_config) + + def list_groups(self): + """ + Fetch the list of groups visible in Akamai MFA. + """ + return self.service_api_helper.get("/api/v1/control/groups") + + def group_id(self, group_name): + """ + Return Group ID for a given group name + If the group name is not found return None + If multiple matches, raise an exception + + Args: + group_name (string): Group name + + Returns: + string: ID of the group found + """ + group_info = self.service_api_helper.get("/api/v1/control/groups", params={'name': group_name}) + matches = group_info.get('result', {}).get('page', []) + group_id = None + for m in matches: + if m.get('name') == group_name: + if group_id is None: + group_id = m.get('id') + else: + raise Exception(f"Ambiguity, more than one group matching name {group_name}") + return group_id + + def create_group(self, group_name, group_summary=None): + """Create a new group in MFA backend.""" + payload = { + "name": group_name, + "summary": group_summary + } + return self.service_api_helper.post("/api/v1/control/groups", json=payload) + + def create_users(self, users): + payload = {"ignore_conflicts": True, "users": users} + logger.debug("create_users payload %s" % payload) + return self.service_api_helper.post("/api/v1/control/users/bulk/create", json=payload) + + def associate_users_to_group(self, usernames, group_id): + logger.debug(f"Adding users {usernames} to group {group_id}") + return self.service_api_helper.post(f"/api/v1/control/groups/{group_id}/users/bulk/associate", json=usernames) + + def import_users_from_csv(self, csv_filename, ignore_header, fullname_format): + """ + Read a CSV file containing user identity, and one group + Create the users, groups and association in the Akamai MFA backend. + + Args: + csv_filename (_type_): path to the csv file + ignore_header (_type_): ignore the first line of the file + fullname_format (_type_): How the user Fullname should be formatted + """ + new_users = [] + scanned_groups = set() + user_groupname_map = {} + + # Parse the CSV file to prepare the API calls + with open(csv_filename) as csvfile: + reader = csv.reader(csvfile) + if ignore_header: + next(reader) + for row in reader: + newuser = { + 'full_name': fullname_format.format(firstname=row[1], lastname=row[2]), + 'last_name': row[2], + 'email': row[0], + 'username': row[3] + } + user_groupname_map[row[3]] = row[4] + scanned_groups.add(row[4]) + new_users.append(newuser) + + # First, let's figure out groups, existing vs. ones in the input file + count_group_added = 0 + count_group_existing = 0 + existing_groups = self.list_groups() + groups_map = {g.get('id'): g.get('name') for g in existing_groups.get('result').get('page')} + reverse_groups_map = {g.get('name'): g.get('id') for g in existing_groups.get('result').get('page')} + for g in scanned_groups: + if g not in groups_map.values(): + r = self.create_group(g, f"Created with command: {cli.current_command()}") + groups_map[r.get('result').get('id')] = r.get('result').get('name') + reverse_groups_map[r.get('result').get('name')] = r.get('result').get('id') + count_group_added += 1 + else: + count_group_existing += 1 + logger.debug(f"Group {g} was already present, not added.") + logger.debug("Final groups: %s" % groups_map) + cli.print(f"{count_group_added} group(s) added, {count_group_existing} group(s) were already existing") + + # Second, bulk user insertion + new_users_response = self.create_users(new_users) + logger.debug("new_users: %s" % new_users_response) + cli.print(f"{len(new_users_response.get('result'))} user(s) added or changed") + + # Third, associate user to group + for new_user in new_users_response.get('result', []): + group_id = reverse_groups_map[user_groupname_map[new_user.get('username')]] + self.associate_users_to_group([new_user.get('id')], group_id) + + def enroll_users(self, group_name): + """ + Sends off emails to users in a list of groups. + """ + if not isinstance(group_name, str): + raise ArgumentError("groups must be an string") + + group_id = self.group_id(group_name) + if group_id is None: + cli.print_error("Group %s not found." % group_name) + cli.exit(2) + payload = {'exclude_enrolled_users': True} + payload["groups"] = [group_id] + cli.print("Sending enrollment email to ununrolled users in group %s..." % group_name) + response = self.service_api_helper.post("/api/v1/control/email/enroll_users", json=payload) + logger.debug(response) + + def list_users(self): + """ + List the users synchronized with the MFA tenant. + """ + page = 1 + page_size = 2048 #: Maximum allowed by the API + total_page = None + total_users = 0 + + while total_page is None or page <= total_page: + logger.info(f"Page {page} of {total_page if total_page else 'unknown'}") + params = {'page': page, 'pageSize': page_size} + if self.config.include_devices: + params.update({"includeDevices": "true"}) + scan_users = self.open_api_helper.get('/amfa/v1/users', params) + total_page = scan_users.get('totalPages', 1) + page += 1 + for u in scan_users.get('users', []): + if self.config.json: + cli.print(json.dumps(u, indent=4)) + else: + cli.print(f"{u.get('userId')},{u.get('username')},{u.get('userStatus')}") + total_users += 1 + + if not self.config.json: + cli.print(f"# Total users exported: {total_users}") diff --git a/src/amfa/main.py b/src/amfa/main.py new file mode 100644 index 0000000..e4913ce --- /dev/null +++ b/src/amfa/main.py @@ -0,0 +1,64 @@ +import logging +from http.client import HTTPConnection +import sys +import json + +# cli-mfa modules +from . import logger, __VERSION__ +from . import cli +from . import config +from . import core + + +def prepare_logging(cli_config): + logging.basicConfig(filename=cli_config.logfile, + level=logging.getLevelName(cli_config.loglevel), + format=config.LOG_FMT) + + if cli_config.debug: + HTTPConnection.debuglevel = 1 + logger.setLevel(logging.DEBUG) + requests_log = logging.getLogger("urllib3") + requests_log.setLevel(logging.DEBUG) + requests_log.propagate = True + + +def run(): + cli_config = config.MFAConfig() + prepare_logging(cli_config) + + if cli_config.command is None: + cli_config.display_help() + sys.exit(1) + elif cli_config.command == "version": + print(__VERSION__) + sys.exit(0) + elif cli_config.command == "info": + cli.print(json.dumps(cli_config.info(), indent=4)) + elif cli_config.command == 'event': + event_api = core.EventAPI(cli_config) + event_api.pull_events() + elif cli_config.command == 'users': + identity = core.IdentityManagementAPI(cli_config) + if cli_config.action == "invite": + identity.enroll_users(cli_config.group) + elif cli_config.action == "list": + identity.list_users() + pass + else: + cli.print_error("not supported") + cli.exit(1) + elif cli_config.command == 'importusers': + logger.debug("starting import...") + identity = core.IdentityManagementAPI(cli_config) + identity.import_users_from_csv(cli_config.file, cli_config.ignore_header, cli_config.fullname_format) + else: + raise ValueError(f"Unsupported command: {cli_config.command}") + + +""" +Handy handler to be able to run command as module: + > python3 -m amfa.main info +""" +if __name__ == "__main__": + run() diff --git a/test/test.py b/test/test.py index f75065b..f61c41b 100644 --- a/test/test.py +++ b/test/test.py @@ -19,7 +19,7 @@ ```bash # Optional -EDGERC_SECTION=mysection +AKAMAI_EDGERC_SECTION=mysection # End Optional cd test @@ -56,8 +56,7 @@ def setUp(self): def cli_command(self, *args): command = shlex.split(f'python3 {self.maindir}/bin/akamai-mfa') - if os.environ.get('EDGERC_SECTION'): - command.extend(["--section", os.environ['EDGERC_SECTION']]) + # command.extend(["--somearg", os.environ['somevariable']]) command.extend(*args) print("\nCOMMAND: ", " ".join(command)) return command @@ -66,6 +65,19 @@ def cli_run(self, *args): cmd = subprocess.Popen(self.cli_command(str(a) for a in args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) return cmd + def cli_debug_print_output(self, h, line_prefix=""): + "Handy function to print the CLI output." + line_count = 1 + for line in h.decode().splitlines(): + print(f"{line_count:4} {line_prefix} {line}") + line_count += 1 + + def cli_debug_output(self, return_code, stdout, stderr=None): + if return_code != 0 and stderr: + self.cli_debug_print_output(stderr, "stderr>") + else: + self.cli_debug_print_output(stdout, "stdout>") + def line_count(filename): count = 0 with open(filename) as f: @@ -76,7 +88,7 @@ def line_count(filename): def duplicate_count(filename): total_count = 0 with open(filename) as infile: - counts = collections.Counter(l.strip() for l in infile) + counts = collections.Counter(ln.strip() for ln in infile) for line, count in counts.most_common(): if count > 1: print(f"DUPLICATE[{count}] {line}") @@ -86,28 +98,36 @@ def duplicate_count(filename): class TestEvents(CliMFATest): - after = int(time.time() - 15 * 60) + after = int(time.time() - 3 * 60 * 60) before = int(time.time()) def test_events(self): - """ - Fetch MFA events - """ + "Fetch MFA authentication events for last 3 hours." cmd = self.cli_run("event", "--start", self.after, "--end", self.before) stdout, stderr = cmd.communicate(timeout=60) + if cmd.returncode != 0: + print(f"STDERR> {stderr.decode()} {stdout.decode()}