diff --git a/CHANGELOG.md b/CHANGELOG.md index 940e26502..5b8aad419 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## master - CURRENT + +## 3.19.0 - 06/12/2024 ### Added * Add `regfish` provider (#2102) * Add `ionos` provider (#2127) diff --git a/pyproject.toml b/pyproject.toml index ae095655f..8ff6eae37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "dns-lexicon" -version = "3.18.0" +version = "3.19.0" description = "Manipulate DNS records on various DNS providers in a standardized/agnostic way" license = "MIT" keywords = [ diff --git a/src/lexicon/_private/providers/ionos.py b/src/lexicon/_private/providers/ionos.py index 6fe087aaf..c1061e78e 100644 --- a/src/lexicon/_private/providers/ionos.py +++ b/src/lexicon/_private/providers/ionos.py @@ -2,65 +2,69 @@ from lexicon.interfaces import Provider as BaseProvider - -_ZONES_API = 'https://api.hosting.ionos.com/dns/v1/zones' +_ZONES_API = "https://api.hosting.ionos.com/dns/v1/zones" class Provider(BaseProvider): @staticmethod def get_nameservers(): - return ['ui-dns.com', 'ui-dns.org', 'ui-dns.de', 'ui-dns.biz'] + return ["ui-dns.com", "ui-dns.org", "ui-dns.de", "ui-dns.biz"] @staticmethod def configure_parser(parser): parser.add_argument( - '--api-key', + "--api-key", required=True, - help='IONOS api key: public prefix + period + key proper', + help="IONOS api key: public prefix + period + key proper", ) def authenticate(self): zones = self._get(_ZONES_API) for zone in zones: - if zone['name'] == self.domain: - self.domain_id = zone['id'] + if zone["name"] == self.domain: + self.domain_id = zone["id"] return - raise Exception('domain not found: ' + self.domain) + raise Exception("domain not found: " + self.domain) def create_record(self, rtype, name, content): self._post( - _ZONES_API + '/' + self.domain_id + '/records', - data=[{ - 'name': self._full_name(name), - 'type': rtype, - 'content': content, - 'ttl': self._get_lexicon_option('ttl'), - 'prio': 0, - 'disabled': False, - }], + _ZONES_API + "/" + self.domain_id + "/records", + data=[ + { + "name": self._full_name(name), + "type": rtype, + "content": content, + "ttl": self._get_lexicon_option("ttl"), + "prio": 0, + "disabled": False, + } + ], ) return True def list_records(self, rtype=None, name=None, content=None): query_params = {} if rtype: - query_params['recordType'] = rtype + query_params["recordType"] = rtype if name: - query_params['recordName'] = self._full_name(name) - data = self._get(_ZONES_API + '/' + self.domain_id, query_params) - records = data['records'] - records = [{ - 'type': r['type'], - 'name': r['name'], - 'ttl': r['ttl'], - 'content': r['content'], - 'id': r['id'], - } for r in records] + query_params["recordName"] = self._full_name(name) + data = self._get(_ZONES_API + "/" + self.domain_id, query_params) + records = data["records"] + records = [ + { + "type": r["type"], + "name": r["name"], + "ttl": r["ttl"], + "content": r["content"], + "id": r["id"], + } + for r in records + ] for record in records: self._clean_TXT_record(record) if content: - records = [r for r in records if r['content'] == content] + records = [r for r in records if r["content"] == content] return records def update_record(self, identifier, rtype, name, content): @@ -71,25 +75,20 @@ def delete_record(self, identifier=None, rtype=None, name=None, content=None): if identifier: identifiers = [identifier] else: - identifiers = [ - r['id'] - for r in self.list_records(rtype, name, content) - ] + identifiers = [r["id"] for r in self.list_records(rtype, name, content)] for identifier in identifiers: - self._delete( - _ZONES_API + '/' + self.domain_id + '/records/' + identifier - ) + self._delete(_ZONES_API + "/" + self.domain_id + "/records/" + identifier) return True - def _request(self, action='GET', url='/', data=None, query_params=None): + def _request(self, action="GET", url="/", data=None, query_params=None): response = requests.request( action, url, params=query_params, json=data, headers={ - 'accept': 'application/json', - 'x-api-key': self._get_provider_option('api_key'), + "accept": "application/json", + "x-api-key": self._get_provider_option("api_key"), }, ) response.raise_for_status() diff --git a/src/lexicon/_private/providers/regfish.py b/src/lexicon/_private/providers/regfish.py index 6838b8950..b4c034f4d 100644 --- a/src/lexicon/_private/providers/regfish.py +++ b/src/lexicon/_private/providers/regfish.py @@ -60,7 +60,9 @@ def create_record(self, rtype, name, content): payload = self._post("/dns/rr", data) except requests.exceptions.HTTPError as err: json_res = err.response.json() - if not json_res["code"] if "code" in json_res else 0 != 15003: # ResourceRecordAlreadyExists + if ( + not json_res["code"] if "code" in json_res else 0 != 15003 + ): # ResourceRecordAlreadyExists raise if payload["code"] if "code" in payload else 0 != 0: @@ -91,10 +93,14 @@ def list_records(self, rtype=None, name=None, content=None): # Apply filters on rtype, name, and content if they are provided if rtype or name or content: records = [ - record for record in records - if (not rtype or record['type'] == rtype) and - (not name or record['name'] == self._full_name(name)) and - (not content or record['content'] == self._format_content(rtype, content)) + record + for record in records + if (not rtype or record["type"] == rtype) + and (not name or record["name"] == self._full_name(name)) + and ( + not content + or record["content"] == self._format_content(rtype, content) + ) ] LOGGER.debug("list_records after filtering: %s", records) @@ -108,9 +114,13 @@ def update_record(self, identifier, rtype=None, name=None, content=None): if len(records) == 1: identifier = records[0]["id"] elif len(records) < 1: - raise Exception("No records found matching type and name - won't update") + raise Exception( + "No records found matching type and name - won't update" + ) else: - raise Exception("Multiple records found matching type and name - won't update") + raise Exception( + "Multiple records found matching type and name - won't update" + ) content = self._format_content(rtype, content) data = {} diff --git a/tests/providers/integration_tests.py b/tests/providers/integration_tests.py index ad994c7b0..b657de855 100644 --- a/tests/providers/integration_tests.py +++ b/tests/providers/integration_tests.py @@ -97,8 +97,9 @@ def __init__(self): self.provider_module = None def setup_method(self, _): - warnings.filterwarnings("ignore", category=ResourceWarning, - message="unclosed.*") + warnings.filterwarnings( + "ignore", category=ResourceWarning, message="unclosed.*" + ) self.provider_module = load_provider_module(self.provider_name) ########################################################################### diff --git a/tests/providers/test_ionos.py b/tests/providers/test_ionos.py index c8b14aa02..982198770 100644 --- a/tests/providers/test_ionos.py +++ b/tests/providers/test_ionos.py @@ -1,4 +1,5 @@ """Integration tests for IONOS""" + import json import os from unittest import TestCase @@ -12,24 +13,24 @@ class IONOSProviderTests(TestCase, IntegrationTestsV2): """Integration tests for IONOS provider""" - provider_name = 'ionos' - domain = os.environ.get('LEXICON_IONOS_DOMAIN', 'example.com') + provider_name = "ionos" + domain = os.environ.get("LEXICON_IONOS_DOMAIN", "example.com") def _filter_request(self, request): - request.uri = request.uri.replace(self.domain, 'example.com') + request.uri = request.uri.replace(self.domain, "example.com") if request.body: - body = request.body.decode('utf-8') - body = body.replace(self.domain, 'example.com') - request.body = body.encode('utf-8') + body = request.body.decode("utf-8") + body = body.replace(self.domain, "example.com") + request.body = body.encode("utf-8") return request def _filter_headers(self): - return ['x-api-key'] + return ["x-api-key"] def _filter_response(self, response): - for key in ['Set-Cookie', 'x-b3-traceid']: - response['headers'].pop(key, None) - body = response['body']['string'].decode('utf-8') + for key in ["Set-Cookie", "x-b3-traceid"]: + response["headers"].pop(key, None) + body = response["body"]["string"].decode("utf-8") try: data = json.loads(body) if isinstance(data, list): @@ -37,11 +38,13 @@ def _filter_response(self, response): body = json.dumps(data) except json.JSONDecodeError: pass - body = body.replace(self.domain, 'example.com') - response['body']['string'] = body.encode('utf-8') + body = body.replace(self.domain, "example.com") + response["body"]["string"] = body.encode("utf-8") return response def _is_unrelated_zone(self, entry): - return isinstance(entry, dict) \ - and 'name' in entry \ - and not entry['name'].endswith(self.domain) + return ( + isinstance(entry, dict) + and "name" in entry + and not entry["name"].endswith(self.domain) + ) diff --git a/uv.lock b/uv.lock index 2f698ed8e..b8e3c4180 100644 --- a/uv.lock +++ b/uv.lock @@ -434,7 +434,7 @@ wheels = [ [[package]] name = "dns-lexicon" -version = "3.18.0" +version = "3.19.0" source = { editable = "." } dependencies = [ { name = "beautifulsoup4" },