Skip to content

Commit

Permalink
Version 3.4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
adferrand committed Aug 21, 2020
1 parent 3e439ae commit 3795fff
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 69 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## master - CURRENT

## 3.4.1 - 21/08/2020
### Added
* Add the Njalla provider

Expand Down
137 changes: 73 additions & 64 deletions lexicon/providers/njalla.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,92 @@
"""Module provider for Njalla"""
from __future__ import absolute_import

import logging

import requests
from lexicon.providers.base import Provider as BaseProvider

from lexicon.providers.base import Provider as BaseProvider

LOGGER = logging.getLogger(__name__)

NAMESERVER_DOMAINS = [
'1-you.njalla.no',
'2-can.njalla.in',
'3-get.njalla.fo',
"1-you.njalla.no",
"2-can.njalla.in",
"3-get.njalla.fo",
]


def provider_parser(subparser):
"""Module provider for Njalla"""
subparser.add_argument(
"--auth-token", help="specify API token for authentication")
subparser.add_argument("--auth-token", help="specify API token for authentication")


class Provider(BaseProvider):
"""Provider class for Njalla"""

def __init__(self, config):
super(Provider, self).__init__(config)
self.domain_id = None
self.api_endpoint = 'https://njal.la/api/1/'
self.api_endpoint = "https://njal.la/api/1/"

def _authenticate(self):
params = {
'domain': self.domain,
"domain": self.domain,
}
result = self._api_call('get-domain', params)
result = self._api_call("get-domain", params)

if result['name'] != self.domain:
raise Exception('Domain not found')
if result["name"] != self.domain:
raise Exception("Domain not found")

self.domain_id = self.domain

# Create record. If record already exists with the same content, do nothing'
def _create_record(self, rtype, name, content):
params = {
'domain': self.domain,
'type': rtype,
'name': name,
'content': content,
'ttl': 10800,
"domain": self.domain,
"type": rtype,
"name": name,
"content": content,
"ttl": 10800,
}
if self._get_lexicon_option('ttl'):
params['ttl'] = self._get_lexicon_option('ttl')
result = self._api_call('add-record', params)
if self._get_lexicon_option("ttl"):
params["ttl"] = self._get_lexicon_option("ttl")
result = self._api_call("add-record", params)

LOGGER.debug('create_record: %s', result)
LOGGER.debug("create_record: %s", result)
return result

# List all records. Return an empty list if no records found
# type, name and content are used to filter records.
# If possible filter during the query, otherwise filter after response is received.
def _list_records(self, rtype=None, name=None, content=None):
params = {
'domain': self.domain,
"domain": self.domain,
}
result = self._api_call('list-records', params)

records = result['records']
processed_records = [{
'id': record['id'],
'type': record['type'],
'name': self._full_name(record['name']),
'ttl': record['ttl'],
'content': record['content'],
} for record in records]
filtered_records = [record for record in processed_records if (
(rtype is None or record['type'] == rtype)
and (name is None or record['name'] == self._full_name(name))
and (content is None or record['content'] == content))]

LOGGER.debug('list_records: %s', filtered_records)
result = self._api_call("list-records", params)

records = result["records"]
processed_records = [
{
"id": record["id"],
"type": record["type"],
"name": self._full_name(record["name"]),
"ttl": record["ttl"],
"content": record["content"],
}
for record in records
]
filtered_records = [
record
for record in processed_records
if (
(rtype is None or record["type"] == rtype)
and (name is None or record["name"] == self._full_name(name))
and (content is None or record["content"] == content)
)
]

LOGGER.debug("list_records: %s", filtered_records)
return filtered_records

# Create or update a record.
Expand All @@ -86,66 +95,66 @@ def _update_record(self, identifier, rtype=None, name=None, content=None):
identifier = self._get_record_identifier(rtype=rtype, name=name)

params = {
'id': identifier,
'domain': self.domain,
'content': content,
"id": identifier,
"domain": self.domain,
"content": content,
}
result = self._api_call('edit-record', params)
result = self._api_call("edit-record", params)

LOGGER.debug('update_record: %s', result)
LOGGER.debug("update_record: %s", result)
return result

# Delete an existing record.
# If record does not exist, do nothing.
def _delete_record(self, identifier=None, rtype=None, name=None, content=None):
if not identifier:
identifier = self._get_record_identifier(rtype=rtype, name=name, content=content)
identifier = self._get_record_identifier(
rtype=rtype, name=name, content=content
)

params = {
'domain': self.domain,
'id': identifier,
"domain": self.domain,
"id": identifier,
}
self._api_call('remove-record', params)
self._api_call("remove-record", params)

LOGGER.debug('delete_record: %s', True)
LOGGER.debug("delete_record: %s", True)
return True

# Helpers
def _api_call(self, method, params):
if self._get_provider_option('auth_token') is None:
raise Exception('Must provide API token')
if self._get_provider_option("auth_token") is None:
raise Exception("Must provide API token")

data = {
'method': method,
'params': params,
"method": method,
"params": params,
}
response = self._request('POST',
"",
data)
response = self._request("POST", "", data)

if 'error' in response.keys():
error = response['error']
raise Exception('%d: %s' % (error['code'], error['message']))
if "error" in response.keys():
error = response["error"]
raise Exception("%d: %s" % (error["code"], error["message"]))

return response['result']
return response["result"]

def _get_record_identifier(self, rtype=None, name=None, content=None):
records = self._list_records(rtype=rtype, name=name, content=content)
if len(records) == 1:
return records[0]['id']
return records[0]["id"]

raise Exception('Unambiguous record could not be found.')
raise Exception("Unambiguous record could not be found.")

def _request(self, action="GET", url="/", data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
token = self._get_provider_option('auth_token')
token = self._get_provider_option("auth_token")
headers = {
'Authorization': 'Njalla ' + token,
'Content-Type': 'application/json',
'Accept': 'application/json',
"Authorization": "Njalla " + token,
"Content-Type": "application/json",
"Accept": "application/json",
}
response = requests.request(
action,
Expand Down
12 changes: 8 additions & 4 deletions lexicon/tests/providers/test_njalla.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import TestCase

import pytest

from lexicon.tests.providers.integration_tests import IntegrationTestsV2

# Hook into testing framework by inheriting unittest.TestCase and reuse
Expand All @@ -11,14 +12,17 @@

class NjallaProviderTests(TestCase, IntegrationTestsV2):
"""TestCase for Njalla"""
provider_name = 'njalla'
domain = 'example.com'

provider_name = "njalla"
domain = "example.com"

def _filter_headers(self):
return ['Authorization']
return ["Authorization"]

@pytest.mark.skip(reason="provider allows duplicate records")
def test_provider_when_calling_create_record_with_duplicate_records_should_be_noop(self):
def test_provider_when_calling_create_record_with_duplicate_records_should_be_noop(
self,
):
return

@pytest.mark.skip(reason="provider does not recognize record sets")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.masonry.api"

[tool.poetry]
name = "dns-lexicon"
version = "3.4.0"
version = "3.4.1"
description = "Manipulate DNS records on various DNS providers in a standardized/agnostic way"
license = "MIT"
keywords = [
Expand Down

0 comments on commit 3795fff

Please sign in to comment.