Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functional tests for auth. plugins #232

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 231 additions & 35 deletions tests/functional/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Copyright (C) 2017 Custodia Project Contributors - see LICENSE file
from __future__ import absolute_import

import grp
import os
import pwd
import shutil
import socket
import subprocess
import sys
import time
from enum import Enum
from string import Template

import pytest
Expand All @@ -17,8 +20,105 @@
from custodia.server.config import parse_config


def wait_pid(process, wait):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you move the functions out of the class? If the test class doesn't suite your needs, please talk to @raildo and modify the base class.

timeout = time.time() + wait
while time.time() < timeout:
pid, _ = os.waitpid(process.pid, os.WNOHANG)
if pid == process.pid:
return True
time.sleep(0.1)
return False


def wait_socket(process, custodia_socket, wait):
timeout = time.time() + wait
while time.time() < timeout:
if process.poll() is not None:
raise AssertionError(
"Premature termination of Custodia server")
try:
s = socket.socket(family=socket.AF_UNIX)
s.connect(custodia_socket)
except OSError:
pass
else:
return True
time.sleep(0.1)
raise OSError('Timeout error')


def translate_meta_uid(meta_uid):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead using translate_meta_uid(), I'd rather see some constants, e.g.:

USER_ID, USER_NAME, USER_WRONG_ID, USER_WRONG_NAME = get_test_user()

current_uid = None

if meta_uid == "correct_id":
current_uid = pwd.getpwuid(os.geteuid()).pw_uid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kinda pointless, SOCK_PEERCRED always returns the effective uid and gid.


if meta_uid == "incorrect_id":
actual_uid = pwd.getpwuid(os.geteuid()).pw_uid
for uid in [x.pw_uid for x in pwd.getpwall()]:
if uid != actual_uid:
current_uid = uid
break

if meta_uid == "correct_name":
current_uid = pwd.getpwuid(os.geteuid()).pw_name

if meta_uid == "incorrect_name":
actual_name = pwd.getpwuid(os.geteuid()).pw_name
for name in [x.pw_name for x in pwd.getpwall()]:
if name != actual_name:
current_uid = name
break

if meta_uid == "ignore":
current_uid = -1

return current_uid


def translate_meta_gid(meta_gid):
current_gid = None

if meta_gid == "correct_id":
current_gid = grp.getgrgid(os.getegid()).gr_gid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dito, os.getegid() is enough.


if meta_gid == "incorrect_id":
actual_user = pwd.getpwuid(os.geteuid()).pw_name
actual_gid = grp.getgrgid(os.getegid()).gr_gid
for gid in [g.gr_gid for g in grp.getgrall() if
actual_user not in g.gr_mem]:
if gid != actual_gid:
current_gid = gid
break

if meta_gid == "correct_name":
current_gid = grp.getgrgid(os.getegid()).gr_name

if meta_gid == "incorrect_name":
actual_user = pwd.getpwuid(os.geteuid()).pw_name
actual_group = grp.getgrgid(os.getegid()).gr_name
for name in [g.gr_name for g in grp.getgrall() if
actual_user not in g.gr_mem]:
if name != actual_group:
current_gid = name
break

if meta_gid == "ignore":
current_gid = -1

return current_gid


class UniqueNumber(object):
unique_number = 0

def get_unique_number(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is not necessary. If you add request to any test argument list, then you can access the test name as request.node.name. The name also contains the parameter values. This gives you an unique name that reflects the test case, too.

UniqueNumber.unique_number += 1
return UniqueNumber.unique_number


@pytest.mark.servertest
class CustodiaServerRunner(object):
class CustodiaServerRunner(UniqueNumber):
request_headers = {'REMOTE_USER': 'me'}
test_dir = 'tests/functional/tmp'
custodia_client = None
Expand All @@ -27,7 +127,6 @@ class CustodiaServerRunner(object):
args = None
config = None
custodia_conf = None
unique_number = 0

@classmethod
def setup_class(cls):
Expand All @@ -39,35 +138,6 @@ def setup_class(cls):
def teardown_class(cls):
shutil.rmtree(cls.test_dir)

def _wait_pid(self, process, wait):
timeout = time.time() + wait
while time.time() < timeout:
pid, _ = os.waitpid(process.pid, os.WNOHANG)
if pid == process.pid:
return True
time.sleep(0.1)
return False

def _wait_socket(self, process, wait):
timeout = time.time() + wait
while time.time() < timeout:
if process.poll() is not None:
raise AssertionError(
"Premature termination of Custodia server")
try:
s = socket.socket(family=socket.AF_UNIX)
s.connect(self.env['CUSTODIA_SOCKET'])
except OSError:
pass
else:
return True
time.sleep(0.1)
raise OSError('Timeout error')

def get_unique_number(self):
CustodiaServerRunner.unique_number = self.unique_number + 1
return CustodiaServerRunner.unique_number

@pytest.fixture(scope="class")
def simple_configuration(self):
with open('tests/functional/conf/template_simple.conf') as f:
Expand Down Expand Up @@ -110,19 +180,145 @@ def custodia_server(self, simple_configuration, request, dev_null):
stdout=stdout, stderr=stderr
)

self._wait_pid(self.process, 2)
self._wait_socket(self.process, 5)
wait_pid(self.process, 2)
wait_socket(self.process, self.env['CUSTODIA_SOCKET'], 5)

arg = '{}/custodia.sock'.format(CustodiaServerRunner.test_dir)
url = 'http+unix://{}'.format(url_escape(arg, ''))
self.custodia_client = CustodiaHTTPClient(url)

def fin():
self.process.terminate()
if not self._wait_pid(self.process, 2):
if not wait_pid(self.process, 2):
self.process.kill()
if not self._wait_pid(self.process, 2):
if not wait_pid(self.process, 2):
raise AssertionError("Hard kill failed")

request.addfinalizer(fin)
return self.custodia_client


@pytest.mark.servertest
class CustodiaTestEnvironment(UniqueNumber):
test_dir = 'tests/functional/tmp_auth_plugin'

@classmethod
def setup_class(cls):
if os.path.isdir(cls.test_dir):
shutil.rmtree(cls.test_dir)
os.makedirs(cls.test_dir)

@classmethod
def teardown_class(cls):
shutil.rmtree(cls.test_dir)

def reset_environment(self):
if os.path.isdir(self.test_dir):
shutil.rmtree(self.test_dir)
os.makedirs(self.test_dir)


class AuthPlugin(Enum):
SimpleCredsAuth = 1
SimpleHeaderAuth = 2
SimpleAuthKeys = 3
SimpleClientCert = 4


class CustodiaServer(object):
def __init__(self, test_dir, conf_params):
self.process = None
self.custodia_client = None
self.test_dir = test_dir
self.custodia_conf = os.path.join(self.test_dir, 'custodia.conf')
self.params = conf_params

self.out_fd = os.open(os.devnull, os.O_RDWR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks a fd when enter fails. My original code used a global scoped fixture to have a single FD for all tests.


self._create_configuration()

self.args = parse_args([self.custodia_conf])
_, self.config = parse_config(self.args)
self.env = os.environ.copy()
self.env['CUSTODIA_SOCKET'] = self.config['server_socket']

def _get_conf_template(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this approach. You should rather create specialized subclasses in each test module:

class SimpleCredsCustodiaServer(CustodiaServer):
    template = 'tests/functional/conf/template_simple_creds_auth.conf'

    def extra_parameters(self):
        return {
            'UID': translate_meta_uid(self.params['meta_uid']),
            'GID': translate_meta_gid(self.params['meta_gid'])
         }

if self.params['auth_type'] == AuthPlugin.SimpleCredsAuth:
return 'tests/functional/conf/template_simple_creds_auth.conf'
if self.params['auth_type'] == AuthPlugin.SimpleHeaderAuth:
return 'tests/functional/conf/template_simple_header_auth.conf'
if self.params['auth_type'] == AuthPlugin.SimpleAuthKeys:
return 'tests/functional/conf/template_simple_auth_keys_auth.conf'
if self.params['auth_type'] == AuthPlugin.SimpleClientCert:
return 'tests/functional/conf/template_simple_client_cert.conf'

def _create_configuration(self):
with open(self._get_conf_template()) as f:
configstr = f.read()

if self.params['auth_type'] == AuthPlugin.SimpleCredsAuth:
with (open(self.custodia_conf, 'w+')) as conffile:
t = Template(configstr)
conf = t.substitute(
{'TEST_DIR': self.test_dir,
'UID': translate_meta_uid(self.params['meta_uid']),
'GID': translate_meta_gid(self.params['meta_gid'])})
conffile.write(conf)

if self.params['auth_type'] == AuthPlugin.SimpleHeaderAuth:
with (open(self.custodia_conf, 'w+')) as conffile:
t = Template(configstr)
conf = t.substitute(
{'TEST_DIR': self.test_dir,
'HEADER': self.params['header_name'],
'VALUE': self.params['header_value']})
conffile.write(conf)

if self.params['auth_type'] == AuthPlugin.SimpleAuthKeys:
with (open(self.custodia_conf, 'w+')) as conffile:
t = Template(configstr)
conf = t.substitute(
{'TEST_DIR': self.test_dir,
'STORE_NAMESPACE': self.params['store_namespace'],
'STORE': self.params['store']})
conffile.write(conf)

if self.params['auth_type'] == AuthPlugin.SimpleClientCert:
with (open(self.custodia_conf, 'w+')) as conffile:
t = Template(configstr)
conf = t.substitute({'TEST_DIR': self.test_dir})
conffile.write(conf)

def __enter__(self):
# Don't write server messages to stdout unless we are in debug mode
# pylint: disable=no-member
if pytest.config.getoption('debug') or \
pytest.config.getoption('verbose'):
stdout = stderr = None
else:
stdout = stderr = self.out_fd
# pylint: enable=no-member

self.process = subprocess.Popen(
[sys.executable, '-m', 'custodia.server', self.custodia_conf],
stdout=stdout, stderr=stderr
)

wait_pid(self.process, 2)
wait_socket(self.process, self.env['CUSTODIA_SOCKET'], 5)

arg = '{}/custodia.sock'.format(self.test_dir)
url = 'http+unix://{}'.format(url_escape(arg, ''))
self.custodia_client = CustodiaHTTPClient(url)

return self.custodia_client

def __exit__(self, *args):
os.remove(self.custodia_conf)
self.process.terminate()
if not wait_pid(self.process, 2):
self.process.kill()
if not wait_pid(self.process, 2):
raise AssertionError("Hard kill failed")
os.close(self.out_fd)
os.remove(self.env['CUSTODIA_SOCKET'])
32 changes: 32 additions & 0 deletions tests/functional/conf/template_simple_auth_keys_auth.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[DEFAULT]
logdir = ${TEST_DIR}
libdir = ${TEST_DIR}
rundir = ${TEST_DIR}
socketdir = ${TEST_DIR}

[global]
server_socket = ${TEST_DIR}/custodia.sock
auditlog = ${TEST_DIR}/custodia.audit.log
server_string = Test_Custodia_Server
debug = false

[auth:sak]
handler = SimpleAuthKeys
store_namespace = ${STORE_NAMESPACE}
store = ${STORE}

# Allow requests for all paths under '/' and '/secrets/'
[authz:paths]
handler = SimplePathAuthz
paths = / /secrets/

# Store secrets in a sqlite database called custodia.db in the table 'secrets'
[store:simple]
handler = SqliteStore
dburi = ${TEST_DIR}/custodia.db
table = secrets

# Serve starting from '/' and using the 'simple' store and the 'Root' handler
[/]
handler = Root
store = simple
31 changes: 31 additions & 0 deletions tests/functional/conf/template_simple_client_cert.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[DEFAULT]
logdir = ${TEST_DIR}
libdir = ${TEST_DIR}
rundir = ${TEST_DIR}
socketdir = ${TEST_DIR}

[global]
server_socket = ${TEST_DIR}/custodia.sock
auditlog = ${TEST_DIR}/custodia.audit.log
server_string = Test_Custodia_Server
tls_cafile = ../ca/custodia-ca.pem
debug = false

[auth:client]
handler = SimpleClientCertAuth

# Allow requests for all paths under '/' and '/secrets/'
[authz:paths]
handler = SimplePathAuthz
paths = / /secrets/

# Store secrets in a sqlite database called custodia.db in the table 'secrets'
[store:simple]
handler = SqliteStore
dburi = ${TEST_DIR}/custodia.db
table = secrets

# Serve starting from '/' and using the 'simple' store and the 'Root' handler
[/]
handler = Root
store = simple
Loading