-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 22b4e3a
Showing
9 changed files
with
706 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
VIRTUAL_ENV_LOCATION := ./alcli_env | ||
VIRTUAL_ENV_ACTIVATE_CMD := $(VIRTUAL_ENV_LOCATION)/bin/activate | ||
|
||
.PHONY: dist install uninstall init | ||
.DEFAULT_GOAL := dist | ||
|
||
init: | ||
pip install -r requirements.txt | ||
|
||
test: | ||
python -m unittest discover -p '*_tests.py' -v -b | ||
|
||
lint: | ||
pycodestyle . | ||
|
||
dist: | ||
python setup.py sdist | ||
|
||
pypi_upload: dist | ||
twine upload --skip-existing dist/alcli-*.* | ||
|
||
pypi_test_upload: dist | ||
twine upload --skip-existing --repository-url https://test.pypi.org/legacy/ dist/alcli-*.* | ||
|
||
install: virtualenv | ||
. $(VIRTUAL_ENV_ACTIVATE_CMD); python setup.py install | ||
. $(VIRTUAL_ENV_ACTIVATE_CMD); python setup.py clean --all install clean --all | ||
|
||
uninstall: | ||
pip uninstall alcli -y | ||
|
||
virtualenv: | ||
python3 -m venv $(VIRTUAL_ENV_LOCATION) | ||
|
||
virtualenv2: | ||
virtualenv $(VIRTUAL_ENV_LOCATION) | ||
|
||
virtual_uninstall: | ||
rm -rf $(VIRTUAL_ENV_LOCATION) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
__version__ = '1.0.1' | ||
__author__ = 'Alert Logic, Inc.' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
|
||
import sys | ||
import os | ||
import json | ||
from json import JSONDecodeError | ||
import logging | ||
import argparse | ||
import pydoc | ||
|
||
from collections import OrderedDict | ||
from pydoc import pager | ||
|
||
|
||
import almdrlib | ||
from almdrlib.session import Session | ||
from almdrlib import __version__ as almdrlib_version | ||
from almdrlib.client import OpenAPIKeyWord | ||
|
||
from alcli.cliparser import ALCliArgsParser | ||
from alcli.cliparser import USAGE | ||
from alcli.clihelp import ALCliServiceHelpFormatter | ||
from alcli.clihelp import ALCliOperationHelpFormatter | ||
|
||
sys.path.insert(0, os.path.dirname(__file__)) | ||
|
||
logger = logging.getLogger('alcli.almdr_cli') | ||
LOG_FORMAT = ( | ||
'%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s') | ||
|
||
#almdrlib.set_logger('almdrlib.session', logging.DEBUG, format_string=LOG_FORMAT) | ||
|
||
class AlertLogicCLI(object): | ||
def __init__(self, session=None): | ||
self._session = session or Session() | ||
self._subparsers = None | ||
self._services = None | ||
self._arguments = None | ||
|
||
def main(self, args=None): | ||
args = args or sys.argv[1:] | ||
services = self._get_services() | ||
parser = self._create_parser(services) | ||
|
||
parsed_args, remaining = parser.parse_known_args(args) | ||
|
||
if parsed_args.service == 'help' or parsed_args.service is None: | ||
sys.stderr.write(f"usage: {USAGE}\n") | ||
return 128 | ||
|
||
if parsed_args.operation == 'help': | ||
help_page = ALCliServiceHelpFormatter(self._services[parsed_args.service]) | ||
pydoc.pipepager(help_page.format_page() + '\n', 'less -R') | ||
return 0 | ||
|
||
if hasattr(parsed_args, 'help') and \ | ||
hasattr(parsed_args, 'service') and \ | ||
hasattr(parsed_args, 'operation') and \ | ||
parsed_args.help == 'help': | ||
operation = self._services[parsed_args.service].operations.get(parsed_args.operation) | ||
help_page = ALCliOperationHelpFormatter(operation) | ||
pydoc.pipepager(help_page.format_page() + '\n', 'less -R') | ||
return 0 | ||
|
||
|
||
try: | ||
return services[parsed_args.service](remaining, parsed_args) | ||
except Exception as e: | ||
sys.stderr.write(f"Caught exception in main(). Error: {str(e)}\n") | ||
return 255 | ||
|
||
def _get_services(self): | ||
if self._services is None: | ||
self._services = OrderedDict() | ||
services_list = Session.list_services() | ||
self._services = { | ||
service_name: ServiceOperation(name=service_name, session=self._session) | ||
for service_name in services_list | ||
} | ||
|
||
return self._services | ||
|
||
def _create_parser(self, services): | ||
parser = ALCliArgsParser( | ||
services, | ||
almdrlib_version, | ||
"Alert Logic CLI Utility", | ||
prog="alcli") | ||
|
||
return parser | ||
|
||
class ServiceOperation(object): | ||
""" | ||
A service operation. For example: alcli aetuner would create | ||
a ServiceOperation object for aetuner service | ||
""" | ||
def __init__(self, name, session): | ||
self._name = name | ||
self._session = session | ||
self._service = None | ||
self._description = None | ||
self._operations = None | ||
|
||
def __call__(self, args, parsed_globals): | ||
operation_name = "" | ||
kwargs = {} | ||
for name, value in parsed_globals.__dict__.items(): | ||
if name == 'operation': | ||
operation_name = value | ||
elif name == 'service': | ||
continue | ||
else: | ||
kwargs[name] = value | ||
|
||
# service = self._session.client(self._name) | ||
operation = self.operations.get(operation_name, None) | ||
if operation: | ||
# Remove optional arguments that haven't been supplied | ||
op_args = {k:self._encode(operation, k, v) for (k,v) in kwargs.items() if v is not None} | ||
res = operation(**op_args) | ||
self._print_result(res.json()) | ||
|
||
@property | ||
def service(self): | ||
if self._service is None: | ||
self._service = self._session.client(self._name) | ||
return self._service | ||
|
||
@property | ||
def name(self): | ||
return self._name | ||
|
||
@property | ||
def session(self): | ||
return self._session | ||
|
||
@property | ||
def description(self): | ||
if self._description is None: | ||
self._description = self.service.description | ||
return self._description | ||
|
||
@property | ||
def operations(self): | ||
if self._operations is None: | ||
self._operations= self.service.operations | ||
return self._operations | ||
|
||
def _encode(self, operation, param_name, param_value): | ||
schema = operation.get_schema() | ||
parameter = schema[OpenAPIKeyWord.PARAMETERS][param_name] | ||
type = parameter[OpenAPIKeyWord.TYPE] | ||
if type in OpenAPIKeyWord.SIMPLE_DATA_TYPES: | ||
return param_value | ||
|
||
if type == OpenAPIKeyWord.OBJECT: | ||
try: | ||
return json.loads(param_value) | ||
except JSONDecodeError as e: | ||
# | ||
# this could be an argument encoded using a short form such as | ||
# key1=value,key2=value2 | ||
# | ||
result = dict( | ||
(k.strip(), v.strip()) | ||
for k, v in (kv.split('=') | ||
for kv in param_value.split(',')) | ||
) | ||
return result | ||
|
||
# TODO Raise an exception if we can't build a dictionary from the provided input | ||
return param_value | ||
|
||
def _print_result(self, result): | ||
print(f"{json.dumps(result, sort_keys=True, indent=4)}") | ||
return | ||
|
||
|
||
def main(): | ||
if os.environ.get("DEBUG"): | ||
print("Running in DEBUG mode") | ||
logging.basicConfig(level=logging.DEBUG) | ||
else: | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
session = Session() | ||
cli= AlertLogicCLI(session=session) | ||
cli.main() | ||
|
||
if __name__ == "__main__": | ||
main() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import sys | ||
import argparse | ||
from difflib import get_close_matches | ||
|
||
HELP_MESSAGE = ( | ||
"To see help text, you can run:\n" | ||
"\n" | ||
" alcli help\n" | ||
" alcli <command> help\n" | ||
" alcli <command> <subcommand> help\n" | ||
) | ||
USAGE = ( | ||
"alcli [options] <command> <subcommand> [<subcommand> ...] [parameters]\n" | ||
f"{HELP_MESSAGE}" | ||
) | ||
|
||
class CommandAction(argparse.Action): | ||
def __init__(self, option_strings, dest, command_table, **kwargs): | ||
self.command_table = command_table | ||
super(CommandAction, self).__init__( | ||
option_strings, dest, choices=self.choices, **kwargs | ||
) | ||
|
||
def __call__(self, parser, namespace, values, option_string=None): | ||
setattr(namespace, self.dest, values) | ||
|
||
@property | ||
def choices(self): | ||
return list(self.command_table.keys()) | ||
|
||
@choices.setter | ||
def choices(self, val): | ||
# argparse.Action will always try to set this value upon | ||
# instantiation, but this value should be dynamically | ||
# generated from the command table keys. So make this a | ||
# NOOP if argparse.Action tries to set this value. | ||
pass | ||
|
||
class CLIArgParser(argparse.ArgumentParser): | ||
Formatter = argparse.RawTextHelpFormatter | ||
|
||
# Number of choices per line | ||
ChoicesPerLine = 2 | ||
|
||
def _check_value(self, action, value): | ||
# converted value must be one of the choices (if specified) | ||
if action.choices is not None and value not in action.choices: | ||
msg = ['Invalid choice, valid choices are:\n'] | ||
for i in range(len(action.choices))[::self.ChoicesPerLine]: | ||
current = [] | ||
for choice in action.choices[i:i+self.ChoicesPerLine]: | ||
current.append('%-40s' % choice) | ||
msg.append(' | '.join(current)) | ||
possible = get_close_matches(value, action.choices, cutoff=0.8) | ||
if possible: | ||
extra = ['\n\nInvalid choice: %r, maybe you meant:\n' % value] | ||
for word in possible: | ||
extra.append(' * %s' % word) | ||
msg.extend(extra) | ||
raise argparse.ArgumentError(action, '\n'.join(msg)) | ||
|
||
def __init__(self, command_table, version_string, | ||
description, argument_table, prog=None): | ||
super(CLIArgParser, self).__init__( | ||
formatter_class=self.Formatter, | ||
add_help=False, | ||
conflict_handler='resolve', | ||
description=description, | ||
usage=USAGE, | ||
prog=prog) | ||
self._build(command_table, version_string, argument_table) | ||
|
||
def parse_known_args(self, args, namespace=None): | ||
parsed, remaining = super(CLIArgParser, self).parse_known_args(args, namespace) | ||
return parsed, remaining | ||
|
||
def _build(self, command_table, version_string, argument_table): | ||
self.add_argument('--version', action="version", | ||
version=version_string, | ||
help='Display the version of this tool') | ||
self.add_argument('--profile', action="store", | ||
help="Specify cnfiguration profile name to use") | ||
self.add_argument('command', action=CommandAction, | ||
command_table=command_table) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
class CLIArgument(object): | ||
def __init__(self, name): | ||
self._name = name | ||
|
||
@property | ||
def cli_name(self): | ||
if self._positional_arg: | ||
return self._name | ||
else: | ||
return '--' + self._name | ||
|
||
def add_to_parser(self, parser): | ||
cli_name = self.cli_name | ||
|
||
def add_to_parser(self, parser): | ||
kwargs = {} | ||
parser.add_argument(cli_name, **kwargs) | ||
|
||
@property | ||
def name(self): | ||
return self._name |
Oops, something went wrong.