diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f7cbb6bd..503e636e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## master - CURRENT + +## 3.4.1 - 21/08/2020 ### Added * Add the Njalla provider diff --git a/lexicon/providers/njalla.py b/lexicon/providers/njalla.py index ca75f7cca..8d1f87a7e 100644 --- a/lexicon/providers/njalla.py +++ b/lexicon/providers/njalla.py @@ -1,58 +1,59 @@ """Module provider for Njalla""" from __future__ import absolute_import + import logging import requests -from lexicon.providers.base import Provider as BaseProvider +from lexicon.providers.base import Provider as BaseProvider LOGGER = logging.getLogger(__name__) NAMESERVER_DOMAINS = [ - '1-you.njalla.no', - '2-can.njalla.in', - '3-get.njalla.fo', + "1-you.njalla.no", + "2-can.njalla.in", + "3-get.njalla.fo", ] def provider_parser(subparser): """Module provider for Njalla""" - subparser.add_argument( - "--auth-token", help="specify API token for authentication") + subparser.add_argument("--auth-token", help="specify API token for authentication") class Provider(BaseProvider): """Provider class for Njalla""" + def __init__(self, config): super(Provider, self).__init__(config) self.domain_id = None - self.api_endpoint = 'https://njal.la/api/1/' + self.api_endpoint = "https://njal.la/api/1/" def _authenticate(self): params = { - 'domain': self.domain, + "domain": self.domain, } - result = self._api_call('get-domain', params) + result = self._api_call("get-domain", params) - if result['name'] != self.domain: - raise Exception('Domain not found') + if result["name"] != self.domain: + raise Exception("Domain not found") self.domain_id = self.domain # Create record. If record already exists with the same content, do nothing' def _create_record(self, rtype, name, content): params = { - 'domain': self.domain, - 'type': rtype, - 'name': name, - 'content': content, - 'ttl': 10800, + "domain": self.domain, + "type": rtype, + "name": name, + "content": content, + "ttl": 10800, } - if self._get_lexicon_option('ttl'): - params['ttl'] = self._get_lexicon_option('ttl') - result = self._api_call('add-record', params) + if self._get_lexicon_option("ttl"): + params["ttl"] = self._get_lexicon_option("ttl") + result = self._api_call("add-record", params) - LOGGER.debug('create_record: %s', result) + LOGGER.debug("create_record: %s", result) return result # List all records. Return an empty list if no records found @@ -60,24 +61,32 @@ def _create_record(self, rtype, name, content): # If possible filter during the query, otherwise filter after response is received. def _list_records(self, rtype=None, name=None, content=None): params = { - 'domain': self.domain, + "domain": self.domain, } - result = self._api_call('list-records', params) - - records = result['records'] - processed_records = [{ - 'id': record['id'], - 'type': record['type'], - 'name': self._full_name(record['name']), - 'ttl': record['ttl'], - 'content': record['content'], - } for record in records] - filtered_records = [record for record in processed_records if ( - (rtype is None or record['type'] == rtype) - and (name is None or record['name'] == self._full_name(name)) - and (content is None or record['content'] == content))] - - LOGGER.debug('list_records: %s', filtered_records) + result = self._api_call("list-records", params) + + records = result["records"] + processed_records = [ + { + "id": record["id"], + "type": record["type"], + "name": self._full_name(record["name"]), + "ttl": record["ttl"], + "content": record["content"], + } + for record in records + ] + filtered_records = [ + record + for record in processed_records + if ( + (rtype is None or record["type"] == rtype) + and (name is None or record["name"] == self._full_name(name)) + and (content is None or record["content"] == content) + ) + ] + + LOGGER.debug("list_records: %s", filtered_records) return filtered_records # Create or update a record. @@ -86,66 +95,66 @@ def _update_record(self, identifier, rtype=None, name=None, content=None): identifier = self._get_record_identifier(rtype=rtype, name=name) params = { - 'id': identifier, - 'domain': self.domain, - 'content': content, + "id": identifier, + "domain": self.domain, + "content": content, } - result = self._api_call('edit-record', params) + result = self._api_call("edit-record", params) - LOGGER.debug('update_record: %s', result) + LOGGER.debug("update_record: %s", result) return result # Delete an existing record. # If record does not exist, do nothing. def _delete_record(self, identifier=None, rtype=None, name=None, content=None): if not identifier: - identifier = self._get_record_identifier(rtype=rtype, name=name, content=content) + identifier = self._get_record_identifier( + rtype=rtype, name=name, content=content + ) params = { - 'domain': self.domain, - 'id': identifier, + "domain": self.domain, + "id": identifier, } - self._api_call('remove-record', params) + self._api_call("remove-record", params) - LOGGER.debug('delete_record: %s', True) + LOGGER.debug("delete_record: %s", True) return True # Helpers def _api_call(self, method, params): - if self._get_provider_option('auth_token') is None: - raise Exception('Must provide API token') + if self._get_provider_option("auth_token") is None: + raise Exception("Must provide API token") data = { - 'method': method, - 'params': params, + "method": method, + "params": params, } - response = self._request('POST', - "", - data) + response = self._request("POST", "", data) - if 'error' in response.keys(): - error = response['error'] - raise Exception('%d: %s' % (error['code'], error['message'])) + if "error" in response.keys(): + error = response["error"] + raise Exception("%d: %s" % (error["code"], error["message"])) - return response['result'] + return response["result"] def _get_record_identifier(self, rtype=None, name=None, content=None): records = self._list_records(rtype=rtype, name=name, content=content) if len(records) == 1: - return records[0]['id'] + return records[0]["id"] - raise Exception('Unambiguous record could not be found.') + raise Exception("Unambiguous record could not be found.") def _request(self, action="GET", url="/", data=None, query_params=None): if data is None: data = {} if query_params is None: query_params = {} - token = self._get_provider_option('auth_token') + token = self._get_provider_option("auth_token") headers = { - 'Authorization': 'Njalla ' + token, - 'Content-Type': 'application/json', - 'Accept': 'application/json', + "Authorization": "Njalla " + token, + "Content-Type": "application/json", + "Accept": "application/json", } response = requests.request( action, diff --git a/lexicon/tests/providers/test_njalla.py b/lexicon/tests/providers/test_njalla.py index c46cc5fb7..411534e84 100644 --- a/lexicon/tests/providers/test_njalla.py +++ b/lexicon/tests/providers/test_njalla.py @@ -2,6 +2,7 @@ from unittest import TestCase import pytest + from lexicon.tests.providers.integration_tests import IntegrationTestsV2 # Hook into testing framework by inheriting unittest.TestCase and reuse @@ -11,14 +12,17 @@ class NjallaProviderTests(TestCase, IntegrationTestsV2): """TestCase for Njalla""" - provider_name = 'njalla' - domain = 'example.com' + + provider_name = "njalla" + domain = "example.com" def _filter_headers(self): - return ['Authorization'] + return ["Authorization"] @pytest.mark.skip(reason="provider allows duplicate records") - def test_provider_when_calling_create_record_with_duplicate_records_should_be_noop(self): + def test_provider_when_calling_create_record_with_duplicate_records_should_be_noop( + self, + ): return @pytest.mark.skip(reason="provider does not recognize record sets") diff --git a/pyproject.toml b/pyproject.toml index 04d8d3f6e..1cb1c2902 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "poetry.masonry.api" [tool.poetry] name = "dns-lexicon" -version = "3.4.0" +version = "3.4.1" description = "Manipulate DNS records on various DNS providers in a standardized/agnostic way" license = "MIT" keywords = [