diff --git a/CHANGELOG.md b/CHANGELOG.md index fc00be78f..825b02925 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## master - CURRENT + +## 3.11.0 - 06/05/2022 ### Added * Add `namecom` provider (#1212) diff --git a/lexicon/providers/namecheap.py b/lexicon/providers/namecheap.py index 2cd6e8e4f..b929b01b9 100644 --- a/lexicon/providers/namecheap.py +++ b/lexicon/providers/namecheap.py @@ -1,12 +1,11 @@ """Module provider for Namecheap""" import logging import sys -import tldextract - from typing import Dict, Optional, Tuple from xml.etree.ElementTree import Element, fromstring import requests +import tldextract from lexicon.exceptions import AuthenticationError from lexicon.providers.base import Provider as BaseProvider diff --git a/lexicon/providers/namecom.py b/lexicon/providers/namecom.py index 4bd86bec1..f7cc3264c 100644 --- a/lexicon/providers/namecom.py +++ b/lexicon/providers/namecom.py @@ -10,22 +10,24 @@ LOGGER = logging.getLogger(__name__) -NAMESERVER_DOMAINS = ['name.com'] +NAMESERVER_DOMAINS = ["name.com"] DUPLICATE_ERROR = { - 'message': 'Invalid Argument', - 'details': 'Parameter Value Error - Duplicate Record' + "message": "Invalid Argument", + "details": "Parameter Value Error - Duplicate Record", } def provider_parser(subparser): """Configure a subparser for Name.com.""" - subparser.add_argument('--auth-username', help='specify a username') - subparser.add_argument('--auth-token', help='specify an API token') + subparser.add_argument("--auth-username", help="specify a username") + subparser.add_argument("--auth-token", help="specify an API token") -class NamecomLoader(object): # pylint: disable=useless-object-inheritance,too-few-public-methods +class NamecomLoader( + object +): # pylint: disable=useless-object-inheritance,too-few-public-methods """Loader that handles pagination for the Name.com provider.""" def __init__(self, get, url, data_key, next_page=1): @@ -36,10 +38,10 @@ def __init__(self, get, url, data_key, next_page=1): def __iter__(self): while self.next_page: - response = self.get(self.url, {'page': self.next_page}) + response = self.get(self.url, {"page": self.next_page}) for data in response[self.data_key]: yield data - self.next_page = response.get('next_page') + self.next_page = response.get("next_page") class NamecomProvider(BaseProvider): @@ -47,162 +49,153 @@ class NamecomProvider(BaseProvider): def __init__(self, config): super(Provider, self).__init__(config) - self.api_endpoint = 'https://api.name.com/v4' + self.api_endpoint = "https://api.name.com/v4" self.session = Session() def _authenticate(self): self.session.auth = HTTPBasicAuth( - username=self._get_provider_option('auth_username'), - password=self._get_provider_option('auth_token') + username=self._get_provider_option("auth_username"), + password=self._get_provider_option("auth_token"), ) # checking domain existence domain_name = self.domain - for domain in NamecomLoader(self._get, '/domains', 'domains'): - if domain['domainName'] == domain_name: + for domain in NamecomLoader(self._get, "/domains", "domains"): + if domain["domainName"] == domain_name: self.domain_id = domain_name return - raise Exception('{} domain does not exist'.format(domain_name)) + raise Exception("{} domain does not exist".format(domain_name)) def _create_record(self, rtype, name, content): data = { - 'type': rtype, - 'host': self._relative_name(name), - 'answer': content, - 'ttl': self._get_lexicon_option('ttl') + "type": rtype, + "host": self._relative_name(name), + "answer": content, + "ttl": self._get_lexicon_option("ttl"), } - if rtype in ('MX', 'SRV'): + if rtype in ("MX", "SRV"): # despite the documentation says a priority is # required for MX and SRV, it's actually optional - priority = self._get_lexicon_option('priority') + priority = self._get_lexicon_option("priority") if priority: - data['priority'] = priority + data["priority"] = priority - url = '/domains/{}/records'.format(self.domain) + url = "/domains/{}/records".format(self.domain) try: - record_id = self._post(url, data)['id'] + record_id = self._post(url, data)["id"] except HTTPError as error: response = error.response - if response.status_code == 400 and \ - response.json() == DUPLICATE_ERROR: - LOGGER.warning( - 'create_record: duplicate record has been skipped' - ) + if response.status_code == 400 and response.json() == DUPLICATE_ERROR: + LOGGER.warning("create_record: duplicate record has been skipped") return True raise - LOGGER.debug('create_record: record %s has been created', record_id) + LOGGER.debug("create_record: record %s has been created", record_id) return record_id def _list_records(self, rtype=None, name=None, content=None): - url = '/domains/{}/records'.format(self.domain) + url = "/domains/{}/records".format(self.domain) records = [] - for raw in NamecomLoader(self._get, url, 'records'): + for raw in NamecomLoader(self._get, url, "records"): record = { - 'id': raw['id'], - 'type': raw['type'], - 'name': raw['fqdn'][:-1], - 'ttl': raw['ttl'], - 'content': raw['answer'], + "id": raw["id"], + "type": raw["type"], + "name": raw["fqdn"][:-1], + "ttl": raw["ttl"], + "content": raw["answer"], } records.append(record) - LOGGER.debug('list_records: retrieved %s records', len(records)) + LOGGER.debug("list_records: retrieved %s records", len(records)) if rtype: - records = [record for record in records if record['type'] == rtype] + records = [record for record in records if record["type"] == rtype] if name: name = self._full_name(name) - records = [record for record in records if record['name'] == name] + records = [record for record in records if record["name"] == name] if content: - records = [record for record in records - if record['content'] == content] + records = [record for record in records if record["content"] == content] - LOGGER.debug('list_records: filtered %s records', len(records)) + LOGGER.debug("list_records: filtered %s records", len(records)) return records def _update_record(self, identifier, rtype=None, name=None, content=None): if not identifier: if not (rtype and name): - raise ValueError( - 'Record identifier or rtype+name must be specified' - ) + raise ValueError("Record identifier or rtype+name must be specified") records = self._list_records(rtype, name) if not records: - raise Exception('There is no record to update') + raise Exception("There is no record to update") if len(records) > 1: - filtered_records = [record for record in records - if record['content'] == content] + filtered_records = [ + record for record in records if record["content"] == content + ] if filtered_records: records = filtered_records if len(records) > 1: raise Exception( - 'There are multiple records to update: {}'.format( - ', '.join(record['id'] for record in records) + "There are multiple records to update: {}".format( + ", ".join(record["id"] for record in records) ) ) - record_id = records[0]['id'] + record_id = records[0]["id"] else: record_id = identifier - data = {'ttl': self._get_lexicon_option('ttl')} + data = {"ttl": self._get_lexicon_option("ttl")} # even though the documentation says a type and an answer # are required, they are not required actually if rtype: - data['type'] = rtype + data["type"] = rtype if name: - data['host'] = self._relative_name(name) + data["host"] = self._relative_name(name) if content: - data['answer'] = content + data["answer"] = content - url = '/domains/{}/records/{}'.format(self.domain, record_id) - record_id = self._put(url, data)['id'] - logging.debug('update_record: record %s has been updated', record_id) + url = "/domains/{}/records/{}".format(self.domain, record_id) + record_id = self._put(url, data)["id"] + logging.debug("update_record: record %s has been updated", record_id) return record_id - def _delete_record(self, identifier=None, - rtype=None, name=None, content=None): + def _delete_record(self, identifier=None, rtype=None, name=None, content=None): if not identifier: if not (rtype and name): - raise ValueError( - 'Record identifier or rtype+name must be specified' - ) + raise ValueError("Record identifier or rtype+name must be specified") records = self._list_records(rtype, name, content) if not records: - LOGGER.warning('delete_record: there is no record to delete') + LOGGER.warning("delete_record: there is no record to delete") return False - record_ids = [record['id'] for record in records] + record_ids = [record["id"] for record in records] else: - record_ids = [identifier, ] + record_ids = [ + identifier, + ] for record_id in record_ids: - url = '/domains/{}/records/{}'.format(self.domain, record_id) + url = "/domains/{}/records/{}".format(self.domain, record_id) self._delete(url) - LOGGER.debug( - 'delete_record: record %s has been deleted', record_id - ) + LOGGER.debug("delete_record: record %s has been deleted", record_id) return True def _get_raw_record(self, record_id): - url = '/domains/{}/records/{}'.format(self.domain, record_id) + url = "/domains/{}/records/{}".format(self.domain, record_id) return self._get(url) - def _request(self, action='GET', url='/', data=None, query_params=None): - response = self.session.request(method=action, - url=self.api_endpoint + url, - json=data, - params=query_params) + def _request(self, action="GET", url="/", data=None, query_params=None): + response = self.session.request( + method=action, url=self.api_endpoint + url, json=data, params=query_params + ) response.raise_for_status() return response.json() diff --git a/lexicon/tests/providers/test_namecom.py b/lexicon/tests/providers/test_namecom.py index 7ca4f70e1..1a0a54b47 100644 --- a/lexicon/tests/providers/test_namecom.py +++ b/lexicon/tests/providers/test_namecom.py @@ -1,15 +1,16 @@ """Integration tests for Name.com""" import json from unittest import TestCase +from unittest.mock import ANY, Mock, call, patch import pytest -from unittest.mock import ANY, Mock, patch, call from requests import HTTPError from lexicon.config import DictConfigSource from lexicon.providers.namecom import provider_parser from lexicon.tests.providers.integration_tests import ( - IntegrationTestsV2, vcr_integration_test + IntegrationTestsV2, + vcr_integration_test, ) @@ -22,30 +23,29 @@ class NamecomProviderTests(TestCase, IntegrationTestsV2): # I don't think we really need some docstrings here. # pylint: disable=missing-function-docstring - provider_name = 'namecom' - domain = 'mim.pw' + provider_name = "namecom" + domain = "mim.pw" def _filter_headers(self): - return ['Authorization', 'Cookie'] + return ["Authorization", "Cookie"] def _filter_response(self, response): - headers = response['headers'] - headers.pop('Set-Cookie', None) - headers.pop('content-length', None) + headers = response["headers"] + headers.pop("Set-Cookie", None) + headers.pop("content-length", None) - if response['status']['code'] == 200: + if response["status"]["code"] == 200: try: - data = json.loads(response['body']['string'].decode()) + data = json.loads(response["body"]["string"].decode()) except ValueError: pass else: - if 'records' in data: - min_id = 10 ** 8 - data['records'] = [ - record for record in data['records'] - if record['id'] > min_id + if "records" in data: + min_id = 10**8 + data["records"] = [ + record for record in data["records"] if record["id"] > min_id ] - response['body']['string'] = json.dumps(data).encode() + response["body"]["string"] = json.dumps(data).encode() return response @@ -61,82 +61,102 @@ def test_provider_authentication_method(self): # Provider.create_record() # ############################ @vcr_integration_test - def test_provider_when_calling_create_record_for_MX_with_priority(self): # pylint: disable=invalid-name + def test_provider_when_calling_create_record_for_MX_with_priority( + self, + ): # pylint: disable=invalid-name priority = 42 config = self._test_config() - config.add_config_source(DictConfigSource({'priority': priority}), 0) + config.add_config_source(DictConfigSource({"priority": priority}), 0) provider = self.provider_module.Provider(config) provider.authenticate() - record_id = provider.create_record('MX', 'mx.test1', self.domain) - assert provider._get_raw_record(record_id)['priority'] == priority # pylint: disable=protected-access + record_id = provider.create_record("MX", "mx.test1", self.domain) + assert ( + provider._get_raw_record(record_id)["priority"] == priority + ) # pylint: disable=protected-access @vcr_integration_test - def test_provider_when_calling_create_record_for_MX_with_no_priority(self): # pylint: disable=invalid-name + def test_provider_when_calling_create_record_for_MX_with_no_priority( + self, + ): # pylint: disable=invalid-name provider = self._construct_authenticated_provider() - record_id = provider.create_record('MX', 'mx.test2', self.domain) - assert 'priority' not in provider._get_raw_record(record_id) # pylint: disable=protected-access + record_id = provider.create_record("MX", "mx.test2", self.domain) + assert "priority" not in provider._get_raw_record( + record_id + ) # pylint: disable=protected-access @vcr_integration_test def test_provider_when_calling_create_record_should_fail_on_http_error(self): provider = self._construct_authenticated_provider() error = HTTPError(response=Mock()) - with patch.object(provider, '_request', side_effect=error): + with patch.object(provider, "_request", side_effect=error): with pytest.raises(HTTPError): - provider.create_record('TXT', 'httperror', 'HTTPError') + provider.create_record("TXT", "httperror", "HTTPError") ############################ # Provider.update_record() # ############################ @vcr_integration_test - def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long + def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail( + self, + ): # pylint: disable=line-too-long provider = self._construct_authenticated_provider() with pytest.raises(ValueError): provider.update_record(None) @vcr_integration_test - def test_provider_when_calling_update_record_should_fail_if_no_record_to_update(self): + def test_provider_when_calling_update_record_should_fail_if_no_record_to_update( + self, + ): provider = self._construct_authenticated_provider() with pytest.raises(Exception): - provider.update_record(None, 'TXT', 'missingrecord') + provider.update_record(None, "TXT", "missingrecord") @vcr_integration_test - def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update(self): + def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update( + self, + ): provider = self._construct_authenticated_provider() - provider.create_record('TXT', 'multiple.test', 'foo') - provider.create_record('TXT', 'multiple.test', 'bar') + provider.create_record("TXT", "multiple.test", "foo") + provider.create_record("TXT", "multiple.test", "bar") with pytest.raises(Exception): - provider.update_record(None, 'TXT', 'multiple.test', 'updated') + provider.update_record(None, "TXT", "multiple.test", "updated") @vcr_integration_test def test_provider_when_calling_update_record_filter_by_content_should_pass(self): provider = self._construct_authenticated_provider() - provider.create_record('TXT', 'multiple.test', 'foo') - provider.create_record('TXT', 'multiple.test', 'bar') - assert provider.update_record(None, 'TXT', 'multiple.test', 'foo') + provider.create_record("TXT", "multiple.test", "foo") + provider.create_record("TXT", "multiple.test", "bar") + assert provider.update_record(None, "TXT", "multiple.test", "foo") @vcr_integration_test - def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass(self): + def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass( + self, + ): provider = self._construct_authenticated_provider() - record_id = provider.create_record('TXT', 'update.test', 'foo') + record_id = provider.create_record("TXT", "update.test", "foo") assert provider.update_record(record_id) ############################ # Provider.delete_record() # ############################ @vcr_integration_test - def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long + def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail( + self, + ): # pylint: disable=line-too-long provider = self._construct_authenticated_provider() with pytest.raises(ValueError): provider.delete_record() @vcr_integration_test - @patch('lexicon.providers.namecom.LOGGER.warning') - def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete(self, warning): + @patch("lexicon.providers.namecom.LOGGER.warning") + def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete( + self, warning + ): provider = self._construct_authenticated_provider() - provider.delete_record(None, 'TXT', 'missingrecord') + provider.delete_record(None, "TXT", "missingrecord") warning.assert_called_once() - assert call('delete_record: there is no record to delete') == warning.call_args + assert call("delete_record: there is no record to delete") == warning.call_args def test_subparser_configuration(): @@ -144,5 +164,5 @@ def test_subparser_configuration(): subparser = Mock() provider_parser(subparser) - subparser.add_argument.assert_any_call('--auth-username', help=ANY) - subparser.add_argument.assert_any_call('--auth-token', help=ANY) + subparser.add_argument.assert_any_call("--auth-username", help=ANY) + subparser.add_argument.assert_any_call("--auth-token", help=ANY) diff --git a/pyproject.toml b/pyproject.toml index 0b91ba8d8..d060cd00a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "dns-lexicon" -version = "3.10.0" +version = "3.11.0" description = "Manipulate DNS records on various DNS providers in a standardized/agnostic way" license = "MIT" keywords = [