diff --git a/lftools/deploy.py b/lftools/deploy.py
index 5583b46c..ef3c952d 100755
--- a/lftools/deploy.py
+++ b/lftools/deploy.py
@@ -9,6 +9,7 @@
# http://www.eclipse.org/legal/epl-v10.html
##############################################################################
"""Library of functions for deploying artifacts to Nexus."""
+from __future__ import annotations
import concurrent.futures
import datetime
@@ -26,6 +27,9 @@
import sys
import tempfile
import zipfile
+from concurrent.futures import Future
+from typing import Any, Dict, List, Optional, Tuple
+from xml.dom.minidom import Document, Node
import boto3
import requests
@@ -33,44 +37,44 @@
from botocore.exceptions import ClientError
from defusedxml.minidom import parseString
-log = logging.getLogger(__name__)
+log: logging.Logger = logging.getLogger(__name__)
logging.getLogger("botocore").setLevel(logging.CRITICAL)
-def _compress_text(dir):
+def _compress_text(directory: str) -> None:
"""Compress all text files in directory."""
- save_dir = os.getcwd()
- os.chdir(dir)
+ save_dir: str = os.getcwd()
+ os.chdir(directory)
- compress_types = [
+ compress_types: List[str] = [
"**/*.html",
"**/*.log",
"**/*.txt",
"**/*.xml",
]
- paths = []
+ paths: List[str] = []
for _type in compress_types:
- search = os.path.join(dir, _type)
+ search: str = os.path.join(directory, _type)
paths.extend(glob.glob(search, recursive=True))
for _file in paths:
# glob may follow symlink paths that open can't find
if os.path.exists(_file):
- log.debug("Compressing file {}".format(_file))
- with open(_file, "rb") as src, gzip.open("{}.gz".format(_file), "wb") as dest:
+ log.debug(f"Compressing file {_file}")
+ with open(_file, "rb") as src, gzip.open(f"{_file}.gz", "wb") as dest:
shutil.copyfileobj(src, dest)
os.remove(_file)
else:
- log.info("Could not open path from glob {}".format(_file))
+ log.info(f"Could not open path from glob {_file}")
os.chdir(save_dir)
-def _format_url(url):
+def _format_url(url: str) -> str:
"""Ensure url starts with http and trim trailing '/'s."""
- start_pattern = re.compile("^(http|https)://")
+ start_pattern: re.Pattern = re.compile("^(http|https)://")
if not start_pattern.match(url):
- url = "http://{}".format(url)
+ url = f"http://{url}"
if url.endswith("/"):
url = url.rstrip("/")
@@ -78,41 +82,40 @@ def _format_url(url):
return url
-def _log_error_and_exit(*msg_list):
+def _log_error_and_exit(*msg_list: str) -> None:
"""Print error message, and exit."""
for msg in msg_list:
log.error(msg)
sys.exit(1)
-def _request_post(url, data, headers):
+def _request_post(url: str, data: Optional[Any], headers: Optional[Dict[str, str]]) -> requests.Response:
"""Execute a request post, return the resp."""
- resp = {}
try:
- resp = requests.post(url, data=data, headers=headers)
+ resp: requests.Response = requests.post(url, data=data, headers=headers)
except requests.exceptions.MissingSchema:
log.debug("in _request_post. MissingSchema")
- _log_error_and_exit("Not valid URL: {}".format(url))
+ _log_error_and_exit(f"Not valid URL: {url}")
except requests.exceptions.ConnectionError:
log.debug("in _request_post. ConnectionError")
- _log_error_and_exit("Could not connect to URL: {}".format(url))
+ _log_error_and_exit(f"Could not connect to URL: {url}")
except requests.exceptions.InvalidURL:
log.debug("in _request_post. InvalidURL")
- _log_error_and_exit("Invalid URL: {}".format(url))
+ _log_error_and_exit(f"Invalid URL: {url}")
return resp
-def _get_filenames_in_zipfile(_zipfile):
+def _get_filenames_in_zipfile(_zipfile: str) -> List[str]:
"""Return a list with file names."""
- files = zipfile.ZipFile(_zipfile).infolist()
+ files: List[zipfile.ZipInfo] = zipfile.ZipFile(_zipfile).infolist()
return [f.filename for f in files]
-def _request_post_file(url, file_to_upload, parameters=None):
+def _request_post_file(url: str, file_to_upload: str, parameters: Optional[Any] = None) -> requests.Response:
"""Execute a request post, return the resp."""
- resp = {}
+ resp: requests.Response = requests.Response()
try:
- upload_file = open(file_to_upload, "rb")
+ upload_file: io.BufferedReader = open(file_to_upload, "rb")
except FileNotFoundError:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file_to_upload)
@@ -123,32 +126,33 @@ def _request_post_file(url, file_to_upload, parameters=None):
else:
resp = requests.post(url, data=upload_file.read())
except requests.exceptions.MissingSchema:
- raise requests.HTTPError("Not valid URL: {}".format(url))
+ raise requests.HTTPError(f"Not valid URL: {url}", response=resp)
except requests.exceptions.ConnectionError:
- raise requests.HTTPError("Could not connect to URL: {}".format(url))
+ raise requests.HTTPError(f"Could not connect to URL: {url}", response=resp)
except requests.exceptions.InvalidURL:
- raise requests.HTTPError("Invalid URL: {}".format(url))
+ raise requests.HTTPError(f"Invalid URL: {url}", response=resp)
if resp.status_code == 400:
- raise requests.HTTPError("Repository is read only")
+ raise requests.HTTPError("Repository is read only", response=resp)
elif resp.status_code == 404:
- raise requests.HTTPError("Did not find repository.")
+ raise requests.HTTPError("Did not find repository.", response=resp)
if not str(resp.status_code).startswith("20"):
raise requests.HTTPError(
"Failed to upload to Nexus with status code: {}.\n{}\n{}".format(
resp.status_code, resp.text, file_to_upload
- )
+ ),
+ response=resp,
)
return resp
-def _request_put_file(url, file_to_upload, parameters=None):
+def _request_put_file(url: str, file_to_upload: str, parameters: Optional[Any] = None) -> bool:
"""Execute a request put, return the resp."""
- resp = {}
+ resp: requests.Response = requests.Response()
try:
- upload_file = open(file_to_upload, "rb")
+ upload_file: io.BufferedReader = open(file_to_upload, "rb")
except FileNotFoundError:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file_to_upload)
@@ -159,53 +163,60 @@ def _request_put_file(url, file_to_upload, parameters=None):
else:
resp = requests.put(url, data=upload_file)
except requests.exceptions.MissingSchema:
- raise requests.HTTPError("Not valid URL format. Check for https:// etc..: {}".format(url))
+ raise requests.HTTPError(f"Not valid URL format. Check for https:// etc..: {url}", response=resp)
except requests.exceptions.ConnectTimeout:
- raise requests.HTTPError("Timed out connecting to {}".format(url))
+ raise requests.HTTPError(f"Timed out connecting to {url}", response=resp)
except requests.exceptions.ReadTimeout:
- raise requests.HTTPError("Timed out waiting for the server to reply ({})".format(url))
+ raise requests.HTTPError(f"Timed out waiting for the server to reply ({url})", response=resp)
except requests.exceptions.ConnectionError:
- raise requests.HTTPError("A connection error occurred ({})".format(url))
+ raise requests.HTTPError(f"A connection error occurred ({url})", response=resp)
except requests.exceptions.InvalidURL:
- raise requests.HTTPError("Invalid URL format: {}".format(url))
+ raise requests.HTTPError(f"Invalid URL format: {url}", response=resp)
except requests.RequestException as e:
log.error(e)
if resp.status_code == 201:
return True
if resp.status_code == 400:
- raise requests.HTTPError("Repository is read only")
+ raise requests.HTTPError("Repository is read only", response=resp)
if resp.status_code == 401:
- raise requests.HTTPError("Invalid repository credentials")
+ raise requests.HTTPError("Invalid repository credentials", response=resp)
if resp.status_code == 404:
- raise requests.HTTPError("Did not find repository.")
+ raise requests.HTTPError("Did not find repository.", response=resp)
if not str(resp.status_code).startswith("20"):
raise requests.HTTPError(
"Failed to upload to Nexus with status code: {}.\n{}\n{}".format(
resp.status_code, resp.text, file_to_upload
- )
+ ),
+ response=resp,
)
+ return False
-def _get_node_from_xml(xml_data, tag_name):
+
+def _get_node_from_xml(xml_data: str, tag_name: str) -> str:
"""Extract tag data from xml data."""
log.debug("xml={}".format(xml_data))
try:
- dom1 = parseString(xml_data)
- childnode = dom1.getElementsByTagName(tag_name)[0]
+ dom1: Document = parseString(xml_data)
+ childnode: Node = dom1.getElementsByTagName(tag_name)[0]
except Exception:
- _log_error_and_exit("Received bad XML, can not find tag {}".format(tag_name), xml_data)
- return childnode.firstChild.data
+ _log_error_and_exit(f"Received bad XML, can not find tag {tag_name}", xml_data)
+ if childnode.firstChild:
+ return str(childnode.firstChild.data) # type: ignore
+ else:
+ _log_error_and_exit(f"No data in {tag_name}", xml_data)
+ return ""
-def _remove_duplicates_and_sort(lst):
+def _remove_duplicates_and_sort(lst: List[str]) -> List[str]:
# Remove duplicates from list, and sort it
no_dups_lst = list(dict.fromkeys(lst))
no_dups_lst.sort()
- duplicated_list = []
+ duplicated_list: List[str] = []
for i in range(len(no_dups_lst)):
if lst.count(no_dups_lst[i]) > 1:
duplicated_list.append(no_dups_lst[i])
@@ -214,7 +225,7 @@ def _remove_duplicates_and_sort(lst):
return no_dups_lst
-def copy_archives(workspace, pattern=None):
+def copy_archives(workspace: str, pattern: Optional[List[str]] = None) -> None:
"""Copy files matching PATTERN in a WORKSPACE to the current directory.
The best way to use this function is to cd into the directory you wish to
@@ -230,51 +241,51 @@ def copy_archives(workspace, pattern=None):
:arg str pattern: Space-separated list of Unix style glob patterns.
(default: None)
"""
- archives_dir = os.path.join(workspace, "archives")
- dest_dir = os.getcwd()
+ archives_dir: str = os.path.join(workspace, "archives")
+ dest_dir: str = os.getcwd()
- log.debug("Copying files from {} with pattern '{}' to {}.".format(workspace, pattern, dest_dir))
- log.debug("archives_dir = {}".format(archives_dir))
+ log.debug(f"Copying files from {workspace} with pattern '{pattern}' to {dest_dir}.")
+ log.debug(f"archives_dir = {archives_dir}")
if os.path.exists(archives_dir):
if os.path.isfile(archives_dir):
- log.error("Archives {} is a file, not a directory.".format(archives_dir))
+ log.error(f"Archives {archives_dir} is a file, not a directory.")
raise OSError(errno.ENOENT, "Not a directory", archives_dir)
else:
- log.debug("Archives dir {} does exist.".format(archives_dir))
+ log.debug(f"Archives dir {archives_dir} does exist.")
for file_or_dir in os.listdir(archives_dir):
f = os.path.join(archives_dir, file_or_dir)
try:
- log.debug("Moving {}".format(f))
+ log.debug(f"Moving {f}")
shutil.move(f, dest_dir)
except shutil.Error as e:
log.error(e)
raise OSError(errno.EPERM, "Could not move to", archives_dir)
else:
- log.error("Archives dir {} does not exist.".format(archives_dir))
+ log.error(f"Archives dir {archives_dir} does not exist.")
raise OSError(errno.ENOENT, "Missing directory", archives_dir)
if pattern is None:
return
- no_dups_pattern = _remove_duplicates_and_sort(pattern)
+ no_dups_pattern: List[str] = _remove_duplicates_and_sort(pattern)
- paths = []
+ paths: List[str] = []
for p in no_dups_pattern:
if p == "": # Skip empty patterns as they are invalid
continue
- search = os.path.join(workspace, p)
+ search: str = os.path.join(workspace, p)
paths.extend(glob.glob(search, recursive=True))
- log.debug("Files found: {}".format(paths))
+ log.debug(f"Files found: {paths}")
- no_dups_paths = _remove_duplicates_and_sort(paths)
+ no_dups_paths: List[str] = _remove_duplicates_and_sort(paths)
for src in no_dups_paths:
if len(os.path.basename(src)) > 255:
log.warn("Filename {} is over 255 characters. Skipping...".format(os.path.basename(src)))
- dest = os.path.join(dest_dir, src[len(workspace) + 1 :])
- log.debug("{} -> {}".format(src, dest))
+ dest: str = os.path.join(dest_dir, src[len(workspace) + 1 :])
+ log.debug(f"{src} -> {dest}")
if os.path.isfile(src):
try:
@@ -284,20 +295,20 @@ def copy_archives(workspace, pattern=None):
os.makedirs(os.path.dirname(dest))
shutil.move(src, dest)
else:
- log.info("Not copying directories: {}.".format(src))
+ log.info(f"Not copying directories: {src}.")
# Create a temp file to handle empty dirs in AWS S3 buckets.
if os.environ.get("S3_BUCKET") is not None:
- now = datetime.datetime.now()
- p = now.strftime("_%d%m%Y_%H%M%S_")
+ now: datetime.datetime = datetime.datetime.now()
+ prefix: str = now.strftime("_%d%m%Y_%H%M%S_")
for dirpath, dirnames, files in os.walk(dest_dir):
if not files:
- fd, tmp = tempfile.mkstemp(prefix=p, dir=dirpath)
+ fd, tmp = tempfile.mkstemp(prefix=prefix, dir=dirpath)
os.close(fd)
log.debug("temp file created in dir: {}.".format(dirpath))
-def deploy_archives(nexus_url, nexus_path, workspace, pattern=None):
+def deploy_archives(nexus_url: str, nexus_path: str, workspace: str, pattern: Optional[List[str]] = None) -> None:
"""Archive files to a Nexus site repository named logs.
Provides 2 ways to archive files:
@@ -320,15 +331,15 @@ def deploy_archives(nexus_url, nexus_path, workspace, pattern=None):
archive. (optional)
"""
nexus_url = _format_url(nexus_url)
- previous_dir = os.getcwd()
- work_dir = tempfile.mkdtemp(prefix="lftools-da.")
+ previous_dir: str = os.getcwd()
+ work_dir: str = tempfile.mkdtemp(prefix="lftools-da.")
os.chdir(work_dir)
- log.debug("workspace: {}, work_dir: {}".format(workspace, work_dir))
+ log.debug(f"workspace: {workspace}, work_dir: {work_dir}")
copy_archives(workspace, pattern)
_compress_text(work_dir)
- archives_zip = shutil.make_archive("{}/archives".format(workspace), "zip")
+ archives_zip: str = shutil.make_archive(f"{workspace}/archives", "zip")
log.debug("archives zip: {}".format(archives_zip))
deploy_nexus_zip(nexus_url, "logs", nexus_path, archives_zip)
@@ -336,7 +347,7 @@ def deploy_archives(nexus_url, nexus_path, workspace, pattern=None):
shutil.rmtree(work_dir)
-def deploy_logs(nexus_url, nexus_path, build_url):
+def deploy_logs(nexus_url: str, nexus_path: str, build_url: str) -> None:
"""Deploy logs to a Nexus site repository named logs.
Fetches logs and system information and pushes them to Nexus
@@ -355,16 +366,16 @@ def deploy_logs(nexus_url, nexus_path, build_url):
via the $BUILD_URL environment variable.
"""
nexus_url = _format_url(nexus_url)
- previous_dir = os.getcwd()
- work_dir = tempfile.mkdtemp(prefix="lftools-dl.")
+ previous_dir: str = os.getcwd()
+ work_dir: str = tempfile.mkdtemp(prefix="lftools-dl.")
os.chdir(work_dir)
- log.debug("work_dir: {}".format(work_dir))
+ log.debug(f"work_dir: {work_dir}")
build_details = open("_build-details.log", "w+")
build_details.write("build-url: {}".format(build_url))
with open("_sys-info.log", "w+") as sysinfo_log:
- sys_cmds = []
+ sys_cmds: List[List[str]] = []
log.debug("Platform: {}".format(sys.platform))
if sys.platform == "linux" or sys.platform == "linux2":
@@ -381,7 +392,7 @@ def deploy_logs(nexus_url, nexus_path, build_url):
for c in sys_cmds:
try:
- output = subprocess.check_output(c).decode("utf-8")
+ output: str = subprocess.check_output(c).decode("utf-8")
except OSError: # TODO: Switch to FileNotFoundError when Python < 3.5 support is dropped.
log.debug("Command not found: {}".format(c))
continue
@@ -393,7 +404,7 @@ def deploy_logs(nexus_url, nexus_path, build_url):
build_details.close()
# Magic string used to trim console logs at the appropriate level during wget
- MAGIC_STRING = "-----END_OF_BUILD-----"
+ MAGIC_STRING: str = "-----END_OF_BUILD-----"
log.info(MAGIC_STRING)
resp = requests.get("{}/consoleText".format(_format_url(build_url)))
@@ -406,7 +417,7 @@ def deploy_logs(nexus_url, nexus_path, build_url):
_compress_text(work_dir)
- console_zip = tempfile.NamedTemporaryFile(prefix="lftools-dl", delete=True)
+ console_zip: tempfile._TemporaryFileWrapper[bytes] = tempfile.NamedTemporaryFile(prefix="lftools-dl", delete=True)
log.debug("console-zip: {}".format(console_zip.name))
shutil.make_archive(console_zip.name, "zip", work_dir)
deploy_nexus_zip(nexus_url, "logs", nexus_path, "{}.zip".format(console_zip.name))
@@ -416,7 +427,9 @@ def deploy_logs(nexus_url, nexus_path, build_url):
shutil.rmtree(work_dir)
-def deploy_s3(s3_bucket, s3_path, build_url, workspace, pattern=None):
+def deploy_s3(
+ s3_bucket: str, s3_path: str, build_url: str, workspace: str, pattern: Optional[List[str]] = None
+) -> None:
"""Add logs and archives to temp directory to be shipped to S3 bucket.
Fetches logs and system information and pushes them and archives to S3
@@ -437,68 +450,45 @@ def deploy_s3(s3_bucket, s3_path, build_url, workspace, pattern=None):
archive. (optional)
"""
- def _upload_to_s3(file):
- extra_args = {"ContentType": "text/plain"}
- text_html_extra_args = {"ContentType": "text/html", "ContentEncoding": mimetypes.guess_type(file)[1]}
- text_plain_extra_args = {"ContentType": "text/plain", "ContentEncoding": mimetypes.guess_type(file)[1]}
- app_xml_extra_args = {"ContentType": "application/xml", "ContentEncoding": mimetypes.guess_type(file)[1]}
+ def _upload_to_s3(file: str) -> bool:
+ guess: Tuple[Optional[str], Optional[str]] = mimetypes.guess_type(file)
+
+ extra_args: Dict[str, str] = {"Content-Type": "text/plain"}
+
+ if guess[0]:
+ extra_args["Content-Type"] = guess[0]
+
+ if guess[1]:
+ extra_args["Content-Encoding"] = guess[1]
+
if file == "_tmpfile":
- for dir in (logs_dir, silo_dir, jenkins_node_dir):
+ for directory in (logs_dir, silo_dir, jenkins_node_dir):
try:
- s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(dir, file))
+ s3.Bucket(s3_bucket).upload_file(file, f"{directory}{file}")
except ClientError as e:
log.error(e)
return False
+
return True
- if mimetypes.guess_type(file)[0] is None and mimetypes.guess_type(file)[1] is None:
- try:
- s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args)
- except ClientError as e:
- log.error(e)
- return False
- return True
- elif mimetypes.guess_type(file)[0] is None or mimetypes.guess_type(file)[0] in "text/plain":
- extra_args = text_plain_extra_args
- try:
- s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args)
- except ClientError as e:
- log.error(e)
- return False
- return True
- elif mimetypes.guess_type(file)[0] in "text/html":
- extra_args = text_html_extra_args
- try:
- s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args)
- except ClientError as e:
- log.error(e)
- return False
- return True
- elif mimetypes.guess_type(file)[0] in "application/xml":
- extra_args = app_xml_extra_args
- try:
- s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args)
- except ClientError as e:
- log.error(e)
- return False
- return True
- else:
- try:
- s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args)
- except ClientError as e:
- log.error(e)
- return False
- return True
- previous_dir = os.getcwd()
- work_dir = tempfile.mkdtemp(prefix="lftools-dl.")
+ try:
+ s3.Bucket(s3_bucket).upload_file(file, f"{s3_path}{file}", ExtraArgs=extra_args)
+ except ClientError as e:
+ log.error(e)
+ return False
+
+ return True
+
+ previous_dir: str = os.getcwd()
+ work_dir: str = tempfile.mkdtemp(prefix="lftools-dl.")
os.chdir(work_dir)
s3_bucket = s3_bucket.lower()
s3 = boto3.resource("s3")
- logs_dir = s3_path.split("/")[0] + "/"
- silo_dir = s3_path.split("/")[1] + "/"
- jenkins_node_dir = logs_dir + silo_dir + s3_path.split("/")[2] + "/"
+ logs_dir: str = s3_path.split("/")[0] + "/"
+ silo_dir: str = s3_path.split("/")[1] + "/"
+ jenkins_node_dir: str = logs_dir + silo_dir + s3_path.split("/")[2] + "/"
- log.debug("work_dir: {}".format(work_dir))
+ log.debug(f"work_dir: {work_dir}")
# Copy archive files to tmp dir
copy_archives(workspace, pattern)
@@ -538,7 +528,7 @@ def _upload_to_s3(file):
sysinfo_log.close()
# Magic string used to trim console logs at the appropriate level during wget
- MAGIC_STRING = "-----END_OF_BUILD-----"
+ MAGIC_STRING: str = "-----END_OF_BUILD-----"
log.info(MAGIC_STRING)
resp = requests.get("{}/consoleText".format(_format_url(build_url)))
@@ -560,8 +550,8 @@ def _upload_to_s3(file):
_compress_text(work_dir)
# Create file list to upload
- file_list = []
- files = glob.glob("**/*", recursive=True)
+ file_list: List[str] = []
+ files: List[str] = glob.glob("**/*", recursive=True)
for file in files:
if os.path.isfile(file):
file_list.append(file)
@@ -571,13 +561,13 @@ def _upload_to_s3(file):
# Perform s3 upload
for file in file_list:
- log.info("Attempting to upload file {}".format(file))
+ log.info(f"Attempting to upload file {file}")
if _upload_to_s3(file):
- log.info("Successfully uploaded {}".format(file))
+ log.info(f"Successfully uploaded {file}")
else:
- log.error("FAILURE: Uploading {} failed".format(file))
+ log.error(f"FAILURE: Uploading {file} failed")
- log.info("Finished deploying from {} to {}/{}".format(work_dir, s3_bucket, s3_path))
+ log.info(f"Finished deploying from {work_dir} to {s3_bucket}/{s3_path}")
log.info("#######################################################")
# Cleanup
@@ -588,7 +578,7 @@ def _upload_to_s3(file):
# shutil.rmtree(work_dir)
-def deploy_nexus_zip(nexus_url, nexus_repo, nexus_path, zip_file):
+def deploy_nexus_zip(nexus_url: str, nexus_repo: str, nexus_path: str, zip_file: str) -> None:
""""Deploy zip file containing artifacts to Nexus using requests.
This function simply takes a zip file preformatted in the correct
@@ -615,23 +605,23 @@ def deploy_nexus_zip(nexus_url, nexus_repo, nexus_path, zip_file):
tst_path \
tests/fixtures/deploy/zip-test-files/test.zip
"""
- url = "{}/service/local/repositories/{}/content-compressed/{}".format(
+ url: str = "{}/service/local/repositories/{}/content-compressed/{}".format(
_format_url(nexus_url), nexus_repo, nexus_path
)
- log.debug("Uploading {} to {}".format(zip_file, url))
+ log.debug(f"Uploading {zip_file} to {url}")
try:
- resp = _request_post_file(url, zip_file)
+ resp: requests.Response = _request_post_file(url, zip_file)
except requests.HTTPError as e:
- files = _get_filenames_in_zipfile(zip_file)
- log.info("Uploading {} failed. It contained the following files".format(zip_file))
+ files: List[str] = _get_filenames_in_zipfile(zip_file)
+ log.info(f"Uploading {zip_file} failed. It contained the following files")
for f in files:
- log.info(" {}".format(f))
- raise requests.HTTPError(e)
- log.debug("{}: {}".format(resp.status_code, resp.text))
+ log.info(f" {f}")
+ raise requests.HTTPError(e, response=resp)
+ log.debug(f"{resp.status_code}: {resp.text}")
-def nexus_stage_repo_create(nexus_url, staging_profile_id):
+def nexus_stage_repo_create(nexus_url: str, staging_profile_id: str) -> str:
"""Create a Nexus staging repo.
Parameters:
@@ -646,7 +636,7 @@ def nexus_stage_repo_create(nexus_url, staging_profile_id):
"""
nexus_url = "{0}/service/local/staging/profiles/{1}/start".format(_format_url(nexus_url), staging_profile_id)
- log.debug("Nexus URL = {}".format(nexus_url))
+ log.debug(f"Nexus URL = {nexus_url}")
xml = """
@@ -656,32 +646,32 @@ def nexus_stage_repo_create(nexus_url, staging_profile_id):
"""
- headers = {"Content-Type": "application/xml"}
- resp = _request_post(nexus_url, xml, headers)
+ headers: Dict[str, str] = {"Content-Type": "application/xml"}
+ resp: requests.Response = _request_post(nexus_url, xml, headers)
- log.debug("resp.status_code = {}".format(resp.status_code))
- log.debug("resp.text = {}".format(resp.text))
+ log.debug(f"resp.status_code = {resp.status_code}")
+ log.debug(f"resp.text = {resp.text}")
if re.search("nexus-error", resp.text):
- error_msg = _get_node_from_xml(resp.text, "msg")
+ error_msg: str = _get_node_from_xml(resp.text, "msg")
if re.search(".*profile with id:.*does not exist.", error_msg):
- _log_error_and_exit("Staging profile id {} not found.".format(staging_profile_id))
+ _log_error_and_exit(f"Staging profile id {staging_profile_id} not found.")
_log_error_and_exit(error_msg)
if resp.status_code == 405:
_log_error_and_exit("HTTP method POST is not supported by this URL", nexus_url)
if resp.status_code == 404:
- _log_error_and_exit("Did not find nexus site: {}".format(nexus_url))
+ _log_error_and_exit(f"Did not find nexus site: {nexus_url}")
if not resp.status_code == 201:
- _log_error_and_exit("Failed with status code {}".format(resp.status_code), resp.text)
+ _log_error_and_exit(f"Failed with status code {resp.status_code}", resp.text)
staging_repo_id = _get_node_from_xml(resp.text, "stagedRepositoryId")
- log.debug("staging_repo_id = {}".format(staging_repo_id))
+ log.debug(f"staging_repo_id = {staging_repo_id}")
return staging_repo_id
-def nexus_stage_repo_close(nexus_url, staging_profile_id, staging_repo_id):
+def nexus_stage_repo_close(nexus_url: str, staging_profile_id: str, staging_repo_id: str) -> None:
"""Close a Nexus staging repo.
Parameters:
@@ -695,8 +685,8 @@ def nexus_stage_repo_close(nexus_url, staging_profile_id, staging_repo_id):
"""
nexus_url = "{0}/service/local/staging/profiles/{1}/finish".format(_format_url(nexus_url), staging_profile_id)
- log.debug("Nexus URL = {}".format(nexus_url))
- log.debug("staging_repo_id = {}".format(staging_repo_id))
+ log.debug(f"Nexus URL = {nexus_url}")
+ log.debug(f"staging_repo_id = {staging_repo_id}")
xml = """
@@ -709,19 +699,19 @@ def nexus_stage_repo_close(nexus_url, staging_profile_id, staging_repo_id):
staging_repo_id
)
- headers = {"Content-Type": "application/xml"}
- resp = _request_post(nexus_url, xml, headers)
+ headers: Dict[str, str] = {"Content-Type": "application/xml"}
+ resp: requests.Response = _request_post(nexus_url, xml, headers)
- log.debug("resp.status_code = {}".format(resp.status_code))
- log.debug("resp.text = {}".format(resp.text))
+ log.debug(f"resp.status_code = {resp.status_code}")
+ log.debug(f"resp.text = {resp.text}")
if re.search("nexus-error", resp.text):
- error_msg = _get_node_from_xml(resp.text, "msg")
+ error_msg: str = _get_node_from_xml(resp.text, "msg")
else:
error_msg = resp.text
if resp.status_code == 404:
- _log_error_and_exit("Did not find nexus site: {}".format(nexus_url))
+ _log_error_and_exit(f"Did not find nexus site: {nexus_url}")
if re.search("invalid state: closed", error_msg):
_log_error_and_exit("Staging repository is already closed.")
@@ -729,12 +719,19 @@ def nexus_stage_repo_close(nexus_url, staging_profile_id, staging_repo_id):
_log_error_and_exit("Staging repository do not exist.")
if not resp.status_code == 201:
- _log_error_and_exit("Failed with status code {}".format(resp.status_code), resp.text)
+ _log_error_and_exit(f"Failed with status code {resp.status_code}", resp.text)
def upload_maven_file_to_nexus(
- nexus_url, nexus_repo_id, group_id, artifact_id, version, packaging, file, classifier=None
-):
+ nexus_url: str,
+ nexus_repo_id: str,
+ group_id: str,
+ artifact_id: str,
+ version: str,
+ packaging: str,
+ file: str,
+ classifier: Optional[str] = None,
+) -> None:
"""Upload file to Nexus as a Maven artifact.
This function will upload an artifact to Nexus while providing all of
@@ -759,26 +756,26 @@ def upload_maven_file_to_nexus(
"""
url = "{}/service/local/artifact/maven/content".format(_format_url(nexus_url))
- log.info("Uploading URL: {}".format(url))
+ log.info(f"Uploading URL: {url}")
params = {}
- params.update({"r": (None, "{}".format(nexus_repo_id))})
- params.update({"g": (None, "{}".format(group_id))})
- params.update({"a": (None, "{}".format(artifact_id))})
- params.update({"v": (None, "{}".format(version))})
- params.update({"p": (None, "{}".format(packaging))})
+ params.update({"r": (None, f"{nexus_repo_id}")})
+ params.update({"g": (None, f"{group_id}")})
+ params.update({"a": (None, f"{artifact_id}")})
+ params.update({"v": (None, f"{version}")})
+ params.update({"p": (None, f"{packaging}")})
if classifier:
- params.update({"c": (None, "{}".format(classifier))})
+ params.update({"c": (None, f"{classifier}")})
- log.debug("Maven Parameters: {}".format(params))
+ log.debug(f"Maven Parameters: {params}")
- resp = _request_post_file(url, file, params)
+ resp: requests.Response = _request_post_file(url, file, params)
if re.search("nexus-error", resp.text):
- error_msg = _get_node_from_xml(resp.text, "msg")
- raise requests.HTTPError("Nexus Error: {}".format(error_msg))
+ error_msg: str = _get_node_from_xml(resp.text, "msg")
+ raise requests.HTTPError(f"Nexus Error: {error_msg}", response=resp)
-def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False, workers=2):
+def deploy_nexus(nexus_repo_url: str, deploy_dir: str, snapshot: bool = False, workers: int = 2) -> None:
"""Deploy a local directory of files to a Nexus repository.
One purpose of this is so that we can get around the problematic
@@ -802,17 +799,17 @@ def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False, workers=2):
tests/fixtures/deploy/zip-test-files
"""
- def _get_filesize(file):
+ def _get_filesize(file: str) -> str:
bytesize = os.path.getsize(file)
if bytesize == 0:
return "0B"
- suffix = ("b", "kb", "mb", "gb")
- i = int(math.floor(math.log(bytesize, 1024)))
- p = math.pow(1024, i)
- s = round(bytesize / p, 2)
- return "{} {}".format(s, suffix[i])
+ suffix: List[str] = ["b", "kb", "mb", "gb"]
+ i: int = int(math.floor(math.log(bytesize, 1024)))
+ p: int = int(math.pow(1024, i))
+ s: int = int(round(bytesize / p, 2))
+ return f"{s} {suffix[i]}"
- def _deploy_nexus_upload(file):
+ def _deploy_nexus_upload(file: str) -> bool:
# Fix file path, and call _request_put_file.
nexus_url_with_file = "{}/{}".format(_format_url(nexus_repo_url), file)
log.info("Attempting to upload {} ({})".format(file, _get_filesize(file)))
@@ -821,13 +818,13 @@ def _deploy_nexus_upload(file):
else:
return False
- file_list = []
- previous_dir = os.getcwd()
+ file_list: List[str] = []
+ previous_dir: str = os.getcwd()
os.chdir(deploy_dir)
- files = glob.glob("**/*", recursive=True)
+ files: List[str] = glob.glob("**/*", recursive=True)
for file in files:
if os.path.isfile(file):
- base_name = os.path.basename(file)
+ base_name: str = os.path.basename(file)
# Skip blacklisted files
if base_name == "_remote.repositories" or base_name == "resolver-status.properties":
@@ -840,38 +837,37 @@ def _deploy_nexus_upload(file):
file_list.append(file)
log.info("#######################################################")
- log.info("Deploying directory {} to {}".format(deploy_dir, nexus_repo_url))
+ log.info(f"Deploying directory {deploy_dir} to {nexus_repo_url}")
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# this creates a dict where the key is the Future object, and the value is the file name
# see concurrent.futures.Future for more info
- futures = {executor.submit(_deploy_nexus_upload, file_name): file_name for file_name in file_list}
+ futures: Dict[Future[Any], str] = {
+ executor.submit(_deploy_nexus_upload, file_name): file_name for file_name in file_list
+ }
for future in concurrent.futures.as_completed(futures):
- filename = futures[future]
+ filename: str = futures[future]
try:
- data = future.result()
- # remove pyflake warning
- if data == data:
- pass
+ future.result()
except Exception as e:
- log.error("Uploading {}: {}".format(filename, e))
+ log.error(f"Uploading {filename}: {e}")
# wait until all threads complete (successfully or not)
# then log the results of the upload threads
concurrent.futures.wait(futures)
for k, v in futures.items():
if k.result():
- log.info("Successfully uploaded {}".format(v))
+ log.info(f"Successfully uploaded {v}")
else:
- log.error("FAILURE: Uploading {} failed".format(v))
+ log.error(f"FAILURE: Uploading {v} failed")
- log.info("Finished deploying {} to {}".format(deploy_dir, nexus_repo_url))
+ log.info(f"Finished deploying {deploy_dir} to {nexus_repo_url}")
log.info("#######################################################")
os.chdir(previous_dir)
-def deploy_nexus_stage(nexus_url, staging_profile_id, deploy_dir):
+def deploy_nexus_stage(nexus_url: str, staging_profile_id: str, deploy_dir: str) -> None:
"""Deploy Maven artifacts to Nexus staging repo.
Parameters:
@@ -890,18 +886,18 @@ def deploy_nexus_stage(nexus_url, staging_profile_id, deploy_dir):
~/LF/work/lftools-dev/lftools/shell
Completed uploading files to aaf-1005.
"""
- staging_repo_id = nexus_stage_repo_create(nexus_url, staging_profile_id)
- log.info("Staging repository {} created.".format(staging_repo_id))
+ staging_repo_id: str = nexus_stage_repo_create(nexus_url, staging_profile_id)
+ log.info(f"Staging repository {staging_repo_id} created.")
deploy_nexus_url = "{0}/service/local/staging/deployByRepositoryId/{1}".format(
_format_url(nexus_url), staging_repo_id
)
- sz_m2repo = sum(os.path.getsize(f) for f in os.listdir(deploy_dir) if os.path.isfile(f))
- log.debug("Staging repository upload size: {} bytes".format(sz_m2repo))
+ sz_m2repo: int = sum(os.path.getsize(f) for f in os.listdir(deploy_dir) if os.path.isfile(f))
+ log.debug(f"Staging repository upload size: {sz_m2repo} bytes")
log.debug("Nexus Staging URL: {}".format(_format_url(deploy_nexus_url)))
deploy_nexus(deploy_nexus_url, deploy_dir)
nexus_stage_repo_close(nexus_url, staging_profile_id, staging_repo_id)
- log.info("Completed uploading files to {}.".format(staging_repo_id))
+ log.info(f"Completed uploading files to {staging_repo_id}.")