Skip to content

Commit

Permalink
Add functional tests
Browse files Browse the repository at this point in the history
There are set of simple functional tests.
It is neccesary to run it on box with Custodia installed.

Signed-off-by: Petr Čech <[email protected]>
  • Loading branch information
Petr Čech authored and tiran committed Aug 11, 2017
1 parent 0dccc81 commit a1aeade
Show file tree
Hide file tree
Showing 7 changed files with 645 additions and 0 deletions.
Empty file added tests/functional/__init__.py
Empty file.
128 changes: 128 additions & 0 deletions tests/functional/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (C) 2017 Custodia Project Contributors - see LICENSE file
from __future__ import absolute_import

import os
import shutil
import socket
import subprocess
import sys
import time
from string import Template

import pytest

from custodia.client import CustodiaHTTPClient
from custodia.compat import url_escape
from custodia.server.args import parse_args
from custodia.server.config import parse_config


@pytest.mark.servertest
class CustodiaServerRunner(object):
request_headers = {'REMOTE_USER': 'me'}
test_dir = 'tests/functional/tmp'
custodia_client = None
env = None
process = None
args = None
config = None
custodia_conf = None
unique_number = 0

@classmethod
def setup_class(cls):
if os.path.isdir(cls.test_dir):
shutil.rmtree(cls.test_dir)
os.makedirs(cls.test_dir)

@classmethod
def teardown_class(cls):
shutil.rmtree(cls.test_dir)

def _wait_pid(self, process, wait):
timeout = time.time() + wait
while time.time() < timeout:
pid, _ = os.waitpid(process.pid, os.WNOHANG)
if pid == process.pid:
return True
time.sleep(0.1)
return False

def _wait_socket(self, process, wait):
timeout = time.time() + wait
while time.time() < timeout:
if process.poll() is not None:
raise AssertionError(
"Premature termination of Custodia server")
try:
s = socket.socket(family=socket.AF_UNIX)
s.connect(self.env['CUSTODIA_SOCKET'])
except OSError:
pass
else:
return True
time.sleep(0.1)
raise OSError('Timeout error')

def get_unique_number(self):
CustodiaServerRunner.unique_number = self.unique_number + 1
return CustodiaServerRunner.unique_number

@pytest.fixture(scope="class")
def simple_configuration(self):
with open('tests/functional/conf/template_simple.conf') as f:
configstr = f.read()

self.custodia_conf = os.path.join(self.test_dir, 'custodia.conf')
with (open(self.custodia_conf, 'w+')) as conffile:
t = Template(configstr)
conf = t.substitute({'TEST_DIR': self.test_dir})
conffile.write(conf)

self.args = parse_args([self.custodia_conf])
_, self.config = parse_config(self.args)
self.env = os.environ.copy()
self.env['CUSTODIA_SOCKET'] = self.config['server_socket']

@pytest.fixture(scope="session")
def dev_null(self, request):
fd = os.open(os.devnull, os.O_RDWR)

def close_dev_null():
os.close(fd)

request.addfinalizer(close_dev_null)
return fd

@pytest.fixture(scope="class")
def custodia_server(self, simple_configuration, request, dev_null):
# Don't write server messages to stdout unless we are in debug mode
# pylint: disable=no-member
if pytest.config.getoption('debug') or \
pytest.config.getoption('verbose'):
stdout = stderr = None
else:
stdout = stderr = dev_null
# pylint: enable=no-member

self.process = subprocess.Popen(
[sys.executable, '-m', 'custodia.server', self.custodia_conf],
stdout=stdout, stderr=stderr
)

self._wait_pid(self.process, 2)
self._wait_socket(self.process, 5)

arg = '{}/custodia.sock'.format(CustodiaServerRunner.test_dir)
url = 'http+unix://{}'.format(url_escape(arg, ''))
self.custodia_client = CustodiaHTTPClient(url)

def fin():
self.process.terminate()
if not self._wait_pid(self.process, 2):
self.process.kill()
if not self._wait_pid(self.process, 2):
raise AssertionError("Hard kill failed")

request.addfinalizer(fin)
return self.custodia_client
32 changes: 32 additions & 0 deletions tests/functional/conf/template_simple.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[DEFAULT]
logdir = ${TEST_DIR}
libdir = ${TEST_DIR}
rundir = ${TEST_DIR}
socketdir = ${TEST_DIR}

[global]
server_socket = ${TEST_DIR}/custodia.sock
auditlog = ${TEST_DIR}/custodia.audit.log
server_string = Test_Custodia_Server
debug = false

# Accepts any request that specifies an arbitrary REMOTE_USER header
[auth:header]
handler = SimpleHeaderAuth
header = REMOTE_USER

# Allow requests for all paths under '/' and '/secrets/'
[authz:paths]
handler = SimplePathAuthz
paths = / /secrets/

# Store secrets in a sqlite database called custodia.db in the table 'secrets'
[store:simple]
handler = SqliteStore
dburi = ${TEST_DIR}/custodia.db
table = secrets

# Serve starting from '/' and using the 'simple' store and the 'Root' handler
[/]
handler = Root
store = simple
49 changes: 49 additions & 0 deletions tests/functional/test_basics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (C) 2017 Custodia Project Contributors - see LICENSE file

from __future__ import absolute_import

import json

from .base import CustodiaServerRunner


class TestBasics(CustodiaServerRunner):
def test_default_answer(self, custodia_server):
resp = custodia_server.get('http://localhost/',
headers=self.request_headers)
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'message' in data
assert data['message'] == 'Quis custodiet ipsos custodes?'

def test_server_string(self, custodia_server):
resp = custodia_server.get('http://localhost/',
headers=self.request_headers)
assert resp.status_code == 200
assert 'Server' in resp.headers
assert resp.headers['Server'] == 'Test_Custodia_Server'
data = json.loads(resp.text)
assert 'message' in data
assert data['message'] == 'Quis custodiet ipsos custodes?'

def test_raw_data_method(self, custodia_server):
resp = custodia_server.post('secrets/bucket/',
headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.put('secrets/bucket/mykey',
json={"type": "simple",
"value": 'P@ssw0rd'},
headers={'Content-Type':
'application/octet-stream',
'REMOTE_USER': 'me'})
assert resp.status_code == 201

resp = custodia_server.get('secrets/bucket/mykey', headers={
'Content-Type': 'application/octet-stream', 'REMOTE_USER': 'me'})
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'type' in data
assert data['type'] == 'simple'
assert 'value' in data
assert data['value'] == 'P@ssw0rd'
155 changes: 155 additions & 0 deletions tests/functional/test_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright (C) 2017 Custodia Project Contributors - see LICENSE file

from __future__ import absolute_import

import json

from .base import CustodiaServerRunner


class TestContainer(CustodiaServerRunner):
def test_create_container(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 201

def test_create_container_again(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 200

def test_create_container_invalid_format(self, custodia_server):
invalid_container = 'secrets/bucket{}'.format(self.get_unique_number())

resp = custodia_server.post(invalid_container,
headers=self.request_headers)
assert resp.status_code == 405

def test_create_container_forbidden_key(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())

resp = custodia_server.post(container, headers={})
assert resp.status_code == 403

def test_list_container(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())
mykey = '{}mykey'.format(container)
yourkey = '{}yourkey'.format(container)

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
assert resp.text == '[]'

resp = custodia_server.put(mykey, json={"type": "simple",
"value": 'P@ssw0rd'},
headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'mykey' in data

resp = custodia_server.put(yourkey, json={"type": "simple",
"value": 'AnotherP@ssw0rd'},
headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'mykey' in data
assert 'yourkey' in data

def test_remove_container(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
assert resp.text == '[]'

resp = custodia_server.delete(container, headers=self.request_headers)
assert resp.status_code == 204

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 404

def test_remove_container_not_empty(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())
mykey = '{}mykey'.format(container)

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.put(mykey, json={"type": "simple",
"value": 'P@ssw0rd'},
headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'mykey' in data

resp = custodia_server.delete(container, headers=self.request_headers)
assert resp.status_code == 409

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'mykey' in data

def test_remove_container_not_found(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
assert resp.text == '[]'

resp = custodia_server.delete(container, headers=self.request_headers)
assert resp.status_code == 204

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 404

resp = custodia_server.delete(container, headers=self.request_headers)
assert resp.status_code == 404

def test_remove_container_forbidden_key(self, custodia_server):
container = 'secrets/bucket{}/'.format(self.get_unique_number())
mykey = '{}mykey'.format(container)

resp = custodia_server.post(container, headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.put(mykey, json={"type": "simple",
"value": 'P@ssw0rd'},
headers=self.request_headers)
assert resp.status_code == 201

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'mykey' in data

resp = custodia_server.delete(container, headers={})
assert resp.status_code == 403

resp = custodia_server.get(container, headers=self.request_headers)
assert resp.status_code == 200
data = json.loads(resp.text)
assert 'mykey' in data
Loading

0 comments on commit a1aeade

Please sign in to comment.