From e575e6d28839c2e6e9c22d10ef2262f5b92806ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C4=8Cech?= Date: Tue, 15 Aug 2017 14:03:14 +0200 Subject: [PATCH 1/5] tests: Refactor of CustodiaServerRunner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is preparation extension of functional tests. Signed-off-by: Petr Čech --- tests/functional/base.py | 72 ++++++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/tests/functional/base.py b/tests/functional/base.py index 4bb0d30..7eaace4 100644 --- a/tests/functional/base.py +++ b/tests/functional/base.py @@ -17,8 +17,44 @@ from custodia.server.config import parse_config +def wait_pid(process, wait): + timeout = time.time() + wait + while time.time() < timeout: + pid, _ = os.waitpid(process.pid, os.WNOHANG) + if pid == process.pid: + return True + time.sleep(0.1) + return False + + +def wait_socket(process, custodia_socket, wait): + timeout = time.time() + wait + while time.time() < timeout: + if process.poll() is not None: + raise AssertionError( + "Premature termination of Custodia server") + try: + s = socket.socket(family=socket.AF_UNIX) + s.connect(custodia_socket) + except OSError: + pass + else: + return True + time.sleep(0.1) + raise OSError('Timeout error') + + +class UniqueNumber(object): + + unique_number = 0 + + def get_unique_number(self): + UniqueNumber.unique_number += 1 + return UniqueNumber.unique_number + + @pytest.mark.servertest -class CustodiaServerRunner(object): +class CustodiaServerRunner(UniqueNumber): request_headers = {'REMOTE_USER': 'me'} test_dir = 'tests/functional/tmp' custodia_client = None @@ -27,7 +63,6 @@ class CustodiaServerRunner(object): args = None config = None custodia_conf = None - unique_number = 0 @classmethod def setup_class(cls): @@ -39,35 +74,6 @@ def setup_class(cls): def teardown_class(cls): shutil.rmtree(cls.test_dir) - def _wait_pid(self, process, wait): - timeout = time.time() + wait - while time.time() < timeout: - pid, _ = os.waitpid(process.pid, os.WNOHANG) - if pid == process.pid: - return True - time.sleep(0.1) - return False - - def _wait_socket(self, process, wait): - timeout = time.time() + wait - while time.time() < timeout: - if process.poll() is not None: - raise AssertionError( - "Premature termination of Custodia server") - try: - s = socket.socket(family=socket.AF_UNIX) - s.connect(self.env['CUSTODIA_SOCKET']) - except OSError: - pass - else: - return True - time.sleep(0.1) - raise OSError('Timeout error') - - def get_unique_number(self): - CustodiaServerRunner.unique_number = self.unique_number + 1 - return CustodiaServerRunner.unique_number - @pytest.fixture(scope="class") def simple_configuration(self): with open('tests/functional/conf/template_simple.conf') as f: @@ -110,8 +116,8 @@ def custodia_server(self, simple_configuration, request, dev_null): stdout=stdout, stderr=stderr ) - self._wait_pid(self.process, 2) - self._wait_socket(self.process, 5) + wait_pid(self.process, 2) + wait_socket(self.process, self.env['CUSTODIA_SOCKET'], 5) arg = '{}/custodia.sock'.format(CustodiaServerRunner.test_dir) url = 'http+unix://{}'.format(url_escape(arg, '')) From 1a6e5242880fc36c2793fb351dda6b2bb019f51d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C4=8Cech?= Date: Tue, 15 Aug 2017 14:06:06 +0200 Subject: [PATCH 2/5] tests: Functional tests for SimpleCredsAuth plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This test suite tries entire configuration matrix of SimpleCredsAuth plugin. Signed-off-by: Petr Čech --- tests/functional/base.py | 143 +++++++++++++++++- .../conf/template_simple_creds_auth.conf | 32 ++++ tests/functional/test_plugin_auth.py | 50 ++++++ 3 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 tests/functional/conf/template_simple_creds_auth.conf create mode 100644 tests/functional/test_plugin_auth.py diff --git a/tests/functional/base.py b/tests/functional/base.py index 7eaace4..7e36f79 100644 --- a/tests/functional/base.py +++ b/tests/functional/base.py @@ -1,7 +1,9 @@ # Copyright (C) 2017 Custodia Project Contributors - see LICENSE file from __future__ import absolute_import +import grp import os +import pwd import shutil import socket import subprocess @@ -44,6 +46,68 @@ def wait_socket(process, custodia_socket, wait): raise OSError('Timeout error') +def translate_meta_uid(meta_uid): + current_uid = None + + if meta_uid == "correct_id": + current_uid = pwd.getpwuid(os.geteuid()).pw_uid + + if meta_uid == "incorrect_id": + actual_uid = pwd.getpwuid(os.geteuid()).pw_uid + for uid in [x.pw_uid for x in pwd.getpwall()]: + if uid != actual_uid: + current_uid = uid + break + + if meta_uid == "correct_name": + current_uid = pwd.getpwuid(os.geteuid()).pw_name + + if meta_uid == "incorrect_name": + actual_name = pwd.getpwuid(os.geteuid()).pw_name + for name in [x.pw_name for x in pwd.getpwall()]: + if name != actual_name: + current_uid = name + break + + if meta_uid == "ignore": + current_uid = -1 + + return current_uid + + +def translate_meta_gid(meta_gid): + current_gid = None + + if meta_gid == "correct_id": + current_gid = grp.getgrgid(os.getegid()).gr_gid + + if meta_gid == "incorrect_id": + actual_user = pwd.getpwuid(os.geteuid()).pw_name + actual_gid = grp.getgrgid(os.getegid()).gr_gid + for gid in [g.gr_gid for g in grp.getgrall() if + actual_user not in g.gr_mem]: + if gid != actual_gid: + current_gid = gid + break + + if meta_gid == "correct_name": + current_gid = grp.getgrgid(os.getegid()).gr_name + + if meta_gid == "incorrect_name": + actual_user = pwd.getpwuid(os.geteuid()).pw_name + actual_group = grp.getgrgid(os.getegid()).gr_name + for name in [g.gr_name for g in grp.getgrall() if + actual_user not in g.gr_mem]: + if name != actual_group: + current_gid = name + break + + if meta_gid == "ignore": + current_gid = -1 + + return current_gid + + class UniqueNumber(object): unique_number = 0 @@ -125,10 +189,85 @@ def custodia_server(self, simple_configuration, request, dev_null): def fin(): self.process.terminate() - if not self._wait_pid(self.process, 2): + if not wait_pid(self.process, 2): self.process.kill() - if not self._wait_pid(self.process, 2): + if not wait_pid(self.process, 2): raise AssertionError("Hard kill failed") request.addfinalizer(fin) return self.custodia_client + + +@pytest.mark.servertest +class CustodiaTestEnvironment(UniqueNumber): + test_dir = 'tests/functional/tmp_auth_plugin' + + @classmethod + def setup_class(cls): + if os.path.isdir(cls.test_dir): + shutil.rmtree(cls.test_dir) + os.makedirs(cls.test_dir) + + @classmethod + def teardown_class(cls): + shutil.rmtree(cls.test_dir) + + +class CustodiaServerWithSimpleCredsAuth(object): + def __init__(self, test_dir, meta_uid, meta_gid): + self.process = None + self.custodia_client = None + self.test_dir = test_dir + + self.out_fd = os.open(os.devnull, os.O_RDWR) + + with open( + 'tests/functional/conf/template_simple_creds_auth.conf') as f: + configstr = f.read() + + self.custodia_conf = os.path.join(self.test_dir, 'custodia.conf') + with (open(self.custodia_conf, 'w+')) as conffile: + t = Template(configstr) + conf = t.substitute( + {'TEST_DIR': self.test_dir, + 'UID': translate_meta_uid(meta_uid), + 'GID': translate_meta_gid(meta_gid)}) + conffile.write(conf) + + self.args = parse_args([self.custodia_conf]) + _, self.config = parse_config(self.args) + self.env = os.environ.copy() + self.env['CUSTODIA_SOCKET'] = self.config['server_socket'] + + def __enter__(self): + # Don't write server messages to stdout unless we are in debug mode + # pylint: disable=no-member + if pytest.config.getoption('debug') or \ + pytest.config.getoption('verbose'): + stdout = stderr = None + else: + stdout = stderr = self.out_fd + # pylint: enable=no-member + + self.process = subprocess.Popen( + [sys.executable, '-m', 'custodia.server', self.custodia_conf], + stdout=stdout, stderr=stderr + ) + + wait_pid(self.process, 2) + wait_socket(self.process, self.env['CUSTODIA_SOCKET'], 5) + + arg = '{}/custodia.sock'.format(self.test_dir) + url = 'http+unix://{}'.format(url_escape(arg, '')) + self.custodia_client = CustodiaHTTPClient(url) + + return self.custodia_client + + def __exit__(self, *args): + os.remove(self.custodia_conf) + self.process.terminate() + if not wait_pid(self.process, 2): + self.process.kill() + if not wait_pid(self.process, 2): + raise AssertionError("Hard kill failed") + os.close(self.out_fd) diff --git a/tests/functional/conf/template_simple_creds_auth.conf b/tests/functional/conf/template_simple_creds_auth.conf new file mode 100644 index 0000000..c61da67 --- /dev/null +++ b/tests/functional/conf/template_simple_creds_auth.conf @@ -0,0 +1,32 @@ +[DEFAULT] +logdir = ${TEST_DIR} +libdir = ${TEST_DIR} +rundir = ${TEST_DIR} +socketdir = ${TEST_DIR} + +[global] +server_socket = ${TEST_DIR}/custodia.sock +auditlog = ${TEST_DIR}/custodia.audit.log +server_string = Test_Custodia_Server +debug = false + +[auth:creds] +handler = SimpleCredsAuth +uid = ${UID} +gid = ${GID} + +# Allow requests for all paths under '/' and '/secrets/' +[authz:paths] +handler = SimplePathAuthz +paths = / /secrets/ + +# Store secrets in a sqlite database called custodia.db in the table 'secrets' +[store:simple] +handler = SqliteStore +dburi = ${TEST_DIR}/custodia.db +table = secrets + +# Serve starting from '/' and using the 'simple' store and the 'Root' handler +[/] +handler = Root +store = simple diff --git a/tests/functional/test_plugin_auth.py b/tests/functional/test_plugin_auth.py new file mode 100644 index 0000000..7c2ef3f --- /dev/null +++ b/tests/functional/test_plugin_auth.py @@ -0,0 +1,50 @@ +# Copyright (C) 2017 Custodia Project Contributors - see LICENSE file + +from __future__ import absolute_import + +import pytest + +from .base import CustodiaServerWithSimpleCredsAuth, CustodiaTestEnvironment + + +class TestBasicsAuthPlugins(CustodiaTestEnvironment): + @pytest.mark.parametrize("meta_uid,meta_gid,expected_access", [ + ('correct_id', 'correct_id', 'granted'), + ('correct_id', 'incorrect_id', 'granted'), + ('correct_id', 'correct_name', 'granted'), + ('correct_id', 'incorrect_name', 'granted'), + ('correct_id', 'ignore', 'granted'), + ('incorrect_id', 'correct_id', 'granted'), + ('incorrect_id', 'incorrect_id', 'denied'), + ('incorrect_id', 'correct_name', 'granted'), + ('incorrect_id', 'incorrect_name', 'denied'), + ('incorrect_id', 'ignore', 'denied'), + ('correct_name', 'correct_id', 'granted'), + ('correct_name', 'incorrect_id', 'granted'), + ('correct_name', 'correct_name', 'granted'), + ('correct_name', 'incorrect_name', 'granted'), + ('correct_name', 'ignore', 'granted'), + ('incorrect_name', 'correct_id', 'granted'), + ('incorrect_name', 'incorrect_id', 'denied'), + ('incorrect_name', 'correct_name', 'granted'), + ('incorrect_name', 'incorrect_name', 'denied'), + ('incorrect_name', 'ignore', 'denied'), + ('ignore', 'correct_id', 'granted'), + ('ignore', 'incorrect_id', 'denied'), + ('ignore', 'correct_name', 'granted'), + ('ignore', 'incorrect_name', 'denied'), + ('ignore', 'ignore', 'denied'), + ]) + def test_default_answer_simple_creds_auth(self, meta_uid, meta_gid, + expected_access): + + with CustodiaServerWithSimpleCredsAuth(self.test_dir, meta_uid, + meta_gid) as server: + + container = 'secrets/bucket{}/'.format(self.get_unique_number()) + + resp = server.post(container, headers={}) + if expected_access == 'granted': + assert resp.status_code == 201 + else: + assert resp.status_code == 403 From 1bf56357f3b822645aaf3cd1b165ca66a7a1c13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C4=8Cech?= Date: Wed, 16 Aug 2017 15:04:03 +0200 Subject: [PATCH 3/5] tests: Functional tests for SimpleHeaderAuth plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This test suite tests configuration options of SimpleHeaderAuth plugin. Signed-off-by: Petr Čech --- tests/functional/base.py | 54 +++++++++++++------ .../conf/template_simple_header_auth.conf | 32 +++++++++++ tests/functional/test_plugin_auth.py | 38 +++++++++++-- 3 files changed, 106 insertions(+), 18 deletions(-) create mode 100644 tests/functional/conf/template_simple_header_auth.conf diff --git a/tests/functional/base.py b/tests/functional/base.py index 7e36f79..eff7c4d 100644 --- a/tests/functional/base.py +++ b/tests/functional/base.py @@ -9,6 +9,7 @@ import subprocess import sys import time +from enum import Enum from string import Template import pytest @@ -109,7 +110,6 @@ def translate_meta_gid(meta_gid): class UniqueNumber(object): - unique_number = 0 def get_unique_number(self): @@ -213,32 +213,56 @@ def teardown_class(cls): shutil.rmtree(cls.test_dir) -class CustodiaServerWithSimpleCredsAuth(object): - def __init__(self, test_dir, meta_uid, meta_gid): +class AuthPlugin(Enum): + SimpleCredsAuth = 1 + SimpleHeaderAuth = 2 + + +class CustodiaServer(object): + def __init__(self, test_dir, conf_params): self.process = None self.custodia_client = None self.test_dir = test_dir + self.custodia_conf = os.path.join(self.test_dir, 'custodia.conf') + self.params = conf_params self.out_fd = os.open(os.devnull, os.O_RDWR) - with open( - 'tests/functional/conf/template_simple_creds_auth.conf') as f: - configstr = f.read() - - self.custodia_conf = os.path.join(self.test_dir, 'custodia.conf') - with (open(self.custodia_conf, 'w+')) as conffile: - t = Template(configstr) - conf = t.substitute( - {'TEST_DIR': self.test_dir, - 'UID': translate_meta_uid(meta_uid), - 'GID': translate_meta_gid(meta_gid)}) - conffile.write(conf) + self._create_configuration() self.args = parse_args([self.custodia_conf]) _, self.config = parse_config(self.args) self.env = os.environ.copy() self.env['CUSTODIA_SOCKET'] = self.config['server_socket'] + def _get_conf_template(self): + if self.params['auth_type'] == AuthPlugin.SimpleCredsAuth: + return 'tests/functional/conf/template_simple_creds_auth.conf' + if self.params['auth_type'] == AuthPlugin.SimpleHeaderAuth: + return 'tests/functional/conf/template_simple_header_auth.conf' + + def _create_configuration(self): + with open(self._get_conf_template()) as f: + configstr = f.read() + + if self.params['auth_type'] == AuthPlugin.SimpleCredsAuth: + with (open(self.custodia_conf, 'w+')) as conffile: + t = Template(configstr) + conf = t.substitute( + {'TEST_DIR': self.test_dir, + 'UID': translate_meta_uid(self.params['meta_uid']), + 'GID': translate_meta_gid(self.params['meta_gid'])}) + conffile.write(conf) + + if self.params['auth_type'] == AuthPlugin.SimpleHeaderAuth: + with (open(self.custodia_conf, 'w+')) as conffile: + t = Template(configstr) + conf = t.substitute( + {'TEST_DIR': self.test_dir, + 'HEADER': self.params['header_name'], + 'VALUE': self.params['header_value']}) + conffile.write(conf) + def __enter__(self): # Don't write server messages to stdout unless we are in debug mode # pylint: disable=no-member diff --git a/tests/functional/conf/template_simple_header_auth.conf b/tests/functional/conf/template_simple_header_auth.conf new file mode 100644 index 0000000..addd5ab --- /dev/null +++ b/tests/functional/conf/template_simple_header_auth.conf @@ -0,0 +1,32 @@ +[DEFAULT] +logdir = ${TEST_DIR} +libdir = ${TEST_DIR} +rundir = ${TEST_DIR} +socketdir = ${TEST_DIR} + +[global] +server_socket = ${TEST_DIR}/custodia.sock +auditlog = ${TEST_DIR}/custodia.audit.log +server_string = Test_Custodia_Server +debug = false + +[auth:header] +handler = SimpleHeaderAuth +header = ${HEADER} +value = ${VALUE} + +# Allow requests for all paths under '/' and '/secrets/' +[authz:paths] +handler = SimplePathAuthz +paths = / /secrets/ + +# Store secrets in a sqlite database called custodia.db in the table 'secrets' +[store:simple] +handler = SqliteStore +dburi = ${TEST_DIR}/custodia.db +table = secrets + +# Serve starting from '/' and using the 'simple' store and the 'Root' handler +[/] +handler = Root +store = simple diff --git a/tests/functional/test_plugin_auth.py b/tests/functional/test_plugin_auth.py index 7c2ef3f..c0aabb1 100644 --- a/tests/functional/test_plugin_auth.py +++ b/tests/functional/test_plugin_auth.py @@ -4,7 +4,7 @@ import pytest -from .base import CustodiaServerWithSimpleCredsAuth, CustodiaTestEnvironment +from .base import AuthPlugin, CustodiaServer, CustodiaTestEnvironment class TestBasicsAuthPlugins(CustodiaTestEnvironment): @@ -38,8 +38,11 @@ class TestBasicsAuthPlugins(CustodiaTestEnvironment): def test_default_answer_simple_creds_auth(self, meta_uid, meta_gid, expected_access): - with CustodiaServerWithSimpleCredsAuth(self.test_dir, meta_uid, - meta_gid) as server: + params = {'auth_type': AuthPlugin.SimpleCredsAuth, + 'meta_uid': meta_uid, + 'meta_gid': meta_gid} + + with CustodiaServer(self.test_dir, params) as server: container = 'secrets/bucket{}/'.format(self.get_unique_number()) @@ -48,3 +51,32 @@ def test_default_answer_simple_creds_auth(self, meta_uid, meta_gid, assert resp.status_code == 201 else: assert resp.status_code == 403 + + # TODO: After https://github.com/latchset/custodia/pull/230 + # this should be extend by comma-separated cases + @pytest.mark.parametrize("conf_n,conf_v,call_n,call_v,expected_access", [ + ('REMOTE_USER', 'me', 'REMOTE_USER', 'me', 'granted'), + ('REMOTE_USER', 'me', 'REMOTE_USER', 'you', 'denied'), + ('REMOTE_AUTH_USER', 'me', 'REMOTE_AUTH_USER', 'me', 'granted'), + ('REMOTE_AUTH_USER', 'me', 'REMOTE_USER', 'me', 'denied'), + ('REMOTE_USER', 'me you he', 'REMOTE_USER', 'me', 'granted'), + ('REMOTE_USER', 'me you he', 'REMOTE_USER', 'you', 'granted'), + ('REMOTE_USER', 'me you he', 'REMOTE_USER', 'he', 'granted'), + ('REMOTE_USER', 'me you he', 'REMOTE_USER', 'she', 'denied'), + ]) + def test_default_answer_simple_header_auth(self, conf_n, conf_v, call_n, + call_v, expected_access): + + params = {'auth_type': AuthPlugin.SimpleHeaderAuth, + 'header_name': conf_n, + 'header_value': conf_v} + + with CustodiaServer(self.test_dir, params) as server: + + container = 'secrets/bucket{}/'.format(self.get_unique_number()) + + resp = server.post(container, headers={call_n: call_v}) + if expected_access == 'granted': + assert resp.status_code == 201 + else: + assert resp.status_code == 403 From 6196bb95036728c2234251d942fe27683b6a4196 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C4=8Cech?= Date: Thu, 17 Aug 2017 14:15:52 +0200 Subject: [PATCH 4/5] tests: Functional tests for SimpleAuthKeys plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Petr Čech --- tests/functional/base.py | 18 ++++++++ .../conf/template_simple_auth_keys_auth.conf | 32 ++++++++++++++ tests/functional/test_plugin_auth.py | 44 +++++++++++++++++++ 3 files changed, 94 insertions(+) create mode 100644 tests/functional/conf/template_simple_auth_keys_auth.conf diff --git a/tests/functional/base.py b/tests/functional/base.py index eff7c4d..7b658d6 100644 --- a/tests/functional/base.py +++ b/tests/functional/base.py @@ -212,10 +212,16 @@ def setup_class(cls): def teardown_class(cls): shutil.rmtree(cls.test_dir) + def reset_environment(self): + if os.path.isdir(self.test_dir): + shutil.rmtree(self.test_dir) + os.makedirs(self.test_dir) + class AuthPlugin(Enum): SimpleCredsAuth = 1 SimpleHeaderAuth = 2 + SimpleAuthKeys = 3 class CustodiaServer(object): @@ -240,6 +246,8 @@ def _get_conf_template(self): return 'tests/functional/conf/template_simple_creds_auth.conf' if self.params['auth_type'] == AuthPlugin.SimpleHeaderAuth: return 'tests/functional/conf/template_simple_header_auth.conf' + if self.params['auth_type'] == AuthPlugin.SimpleAuthKeys: + return 'tests/functional/conf/template_simple_auth_keys_auth.conf' def _create_configuration(self): with open(self._get_conf_template()) as f: @@ -263,6 +271,15 @@ def _create_configuration(self): 'VALUE': self.params['header_value']}) conffile.write(conf) + if self.params['auth_type'] == AuthPlugin.SimpleAuthKeys: + with (open(self.custodia_conf, 'w+')) as conffile: + t = Template(configstr) + conf = t.substitute( + {'TEST_DIR': self.test_dir, + 'STORE_NAMESPACE': self.params['store_namespace'], + 'STORE': self.params['store']}) + conffile.write(conf) + def __enter__(self): # Don't write server messages to stdout unless we are in debug mode # pylint: disable=no-member @@ -295,3 +312,4 @@ def __exit__(self, *args): if not wait_pid(self.process, 2): raise AssertionError("Hard kill failed") os.close(self.out_fd) + os.remove(self.env['CUSTODIA_SOCKET']) diff --git a/tests/functional/conf/template_simple_auth_keys_auth.conf b/tests/functional/conf/template_simple_auth_keys_auth.conf new file mode 100644 index 0000000..1161653 --- /dev/null +++ b/tests/functional/conf/template_simple_auth_keys_auth.conf @@ -0,0 +1,32 @@ +[DEFAULT] +logdir = ${TEST_DIR} +libdir = ${TEST_DIR} +rundir = ${TEST_DIR} +socketdir = ${TEST_DIR} + +[global] +server_socket = ${TEST_DIR}/custodia.sock +auditlog = ${TEST_DIR}/custodia.audit.log +server_string = Test_Custodia_Server +debug = false + +[auth:sak] +handler = SimpleAuthKeys +store_namespace = ${STORE_NAMESPACE} +store = ${STORE} + +# Allow requests for all paths under '/' and '/secrets/' +[authz:paths] +handler = SimplePathAuthz +paths = / /secrets/ + +# Store secrets in a sqlite database called custodia.db in the table 'secrets' +[store:simple] +handler = SqliteStore +dburi = ${TEST_DIR}/custodia.db +table = secrets + +# Serve starting from '/' and using the 'simple' store and the 'Root' handler +[/] +handler = Root +store = simple diff --git a/tests/functional/test_plugin_auth.py b/tests/functional/test_plugin_auth.py index c0aabb1..895398e 100644 --- a/tests/functional/test_plugin_auth.py +++ b/tests/functional/test_plugin_auth.py @@ -80,3 +80,47 @@ def test_default_answer_simple_header_auth(self, conf_n, conf_v, call_n, assert resp.status_code == 201 else: assert resp.status_code == 403 + + @pytest.mark.parametrize("conf_k,conf_p,call_k,call_p,expected_access", [ + ('qid', 'P@ssw0rd', 'qid', 'P@ssw0rd', 'granted'), + ('qid', 'P@ssw0rd', 'qid_incorrect', 'P@ssw0rd', 'denied'), + ('qid', 'P@ssw0rd', 'qid', 'P@ssw0rd_incrorrect', 'denied'), + ]) + def test_default_answer_simple_auth_keys_auth(self, conf_k, conf_p, call_k, + call_p, expected_access): + + self.reset_environment() + + # For setup AuthKeys plugin we need authenticate via SimpleHeaderAuth + params = {'auth_type': AuthPlugin.SimpleHeaderAuth, + 'header_name': 'REMOTE_USER', + 'header_value': 'me'} + + with CustodiaServer(self.test_dir, params) as server: + # Create container + container = 'secrets/sak/' + resp = server.post(container, headers={'REMOTE_USER': 'me'}) + assert resp.status_code == 201 + + # Save Autheys + key = '{}{}'.format(container, conf_k) + resp = server.put(key, json={"type": "simple", + "value": conf_p}, + headers={'REMOTE_USER': 'me'}) + assert resp.status_code == 201 + + # Testing of AuthKeys plugin + params = {'auth_type': AuthPlugin.SimpleAuthKeys, + 'store_namespace': 'keys/sak', + 'store': 'simple'} + + with CustodiaServer(self.test_dir, params) as server: + container = 'secrets/bucket{}/'.format(self.get_unique_number()) + + resp = server.post(container, + headers={'CUSTODIA_AUTH_ID': call_k, + 'CUSTODIA_AUTH_KEY': call_p}) + if expected_access == 'granted': + assert resp.status_code == 201 + else: + assert resp.status_code == 403 From b6d26ca094800bb3fad43dffdf2d08da40269238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C4=8Cech?= Date: Fri, 18 Aug 2017 13:30:33 +0200 Subject: [PATCH 5/5] tests: Functional tests for SimpleClientCertAuth plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Petr Čech --- tests/functional/base.py | 9 ++++++ .../conf/template_simple_client_cert.conf | 31 +++++++++++++++++++ tests/functional/test_plugin_auth.py | 27 ++++++++++++++++ 3 files changed, 67 insertions(+) create mode 100644 tests/functional/conf/template_simple_client_cert.conf diff --git a/tests/functional/base.py b/tests/functional/base.py index 7b658d6..6585fb4 100644 --- a/tests/functional/base.py +++ b/tests/functional/base.py @@ -222,6 +222,7 @@ class AuthPlugin(Enum): SimpleCredsAuth = 1 SimpleHeaderAuth = 2 SimpleAuthKeys = 3 + SimpleClientCert = 4 class CustodiaServer(object): @@ -248,6 +249,8 @@ def _get_conf_template(self): return 'tests/functional/conf/template_simple_header_auth.conf' if self.params['auth_type'] == AuthPlugin.SimpleAuthKeys: return 'tests/functional/conf/template_simple_auth_keys_auth.conf' + if self.params['auth_type'] == AuthPlugin.SimpleClientCert: + return 'tests/functional/conf/template_simple_client_cert.conf' def _create_configuration(self): with open(self._get_conf_template()) as f: @@ -280,6 +283,12 @@ def _create_configuration(self): 'STORE': self.params['store']}) conffile.write(conf) + if self.params['auth_type'] == AuthPlugin.SimpleClientCert: + with (open(self.custodia_conf, 'w+')) as conffile: + t = Template(configstr) + conf = t.substitute({'TEST_DIR': self.test_dir}) + conffile.write(conf) + def __enter__(self): # Don't write server messages to stdout unless we are in debug mode # pylint: disable=no-member diff --git a/tests/functional/conf/template_simple_client_cert.conf b/tests/functional/conf/template_simple_client_cert.conf new file mode 100644 index 0000000..ae7d81b --- /dev/null +++ b/tests/functional/conf/template_simple_client_cert.conf @@ -0,0 +1,31 @@ +[DEFAULT] +logdir = ${TEST_DIR} +libdir = ${TEST_DIR} +rundir = ${TEST_DIR} +socketdir = ${TEST_DIR} + +[global] +server_socket = ${TEST_DIR}/custodia.sock +auditlog = ${TEST_DIR}/custodia.audit.log +server_string = Test_Custodia_Server +tls_cafile = ../ca/custodia-ca.pem +debug = false + +[auth:client] +handler = SimpleClientCertAuth + +# Allow requests for all paths under '/' and '/secrets/' +[authz:paths] +handler = SimplePathAuthz +paths = / /secrets/ + +# Store secrets in a sqlite database called custodia.db in the table 'secrets' +[store:simple] +handler = SqliteStore +dburi = ${TEST_DIR}/custodia.db +table = secrets + +# Serve starting from '/' and using the 'simple' store and the 'Root' handler +[/] +handler = Root +store = simple diff --git a/tests/functional/test_plugin_auth.py b/tests/functional/test_plugin_auth.py index 895398e..834314c 100644 --- a/tests/functional/test_plugin_auth.py +++ b/tests/functional/test_plugin_auth.py @@ -2,6 +2,9 @@ from __future__ import absolute_import +from cryptography import x509 +from cryptography.hazmat.backends import default_backend + import pytest from .base import AuthPlugin, CustodiaServer, CustodiaTestEnvironment @@ -124,3 +127,27 @@ def test_default_answer_simple_auth_keys_auth(self, conf_k, conf_p, call_k, assert resp.status_code == 201 else: assert resp.status_code == 403 + + def test_default_answer_simple_client_cert_auth(self): + + params = {'auth_type': AuthPlugin.SimpleClientCert} + + expected_access = True + + with open('tests/ca/custodia-ca.pem', 'rb') as pem_file: + pem_data = pem_file.read() + + cert = x509.load_pem_x509_certificate(pem_data, default_backend()) + + with CustodiaServer(self.test_dir, params) as server: + + container = 'secrets/bucket{}/'.format(self.get_unique_number()) + + resp = server.post(container, + headers={ + 'CUSTODIA_CERT_AUTH': str( + cert.public_key())}) + if expected_access == 'granted': + assert resp.status_code == 201 + else: + assert resp.status_code == 403