-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(oonifindings): replace service tests to use postgresql
- Loading branch information
Showing
6 changed files
with
215 additions
and
78 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,6 @@ | |
import jwt | ||
|
||
from fastapi.testclient import TestClient | ||
from clickhouse_driver import Client as ClickhouseClient | ||
|
||
from oonifindings.common.config import Settings | ||
from oonifindings.common.auth import hash_email_address | ||
|
@@ -15,75 +14,46 @@ | |
THIS_DIR = Path(__file__).parent.resolve() | ||
|
||
|
||
def is_clickhouse_running(url): | ||
try: | ||
with ClickhouseClient.from_url(url) as client: | ||
client.execute("SELECT 1") | ||
return True | ||
except Exception: | ||
return False | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def clickhouse_server(docker_ip, docker_services): | ||
port = docker_services.port_for("clickhouse", 9000) | ||
url = "clickhouse://{}:{}".format(docker_ip, port) | ||
docker_services.wait_until_responsive( | ||
timeout=30.0, pause=0.1, check=lambda: is_clickhouse_running(url) | ||
) | ||
yield url | ||
|
||
|
||
def run_migration(path: Path, click: ClickhouseClient): | ||
sql_no_comment = "\n".join( | ||
filter(lambda x: not x.startswith("--"), path.read_text().split("\n")) | ||
) | ||
queries = sql_no_comment.split(";") | ||
for q in queries: | ||
q = q.strip() | ||
if not q: | ||
continue | ||
click.execute(q) | ||
def make_override_get_settings(**kw): | ||
def override_get_settings(): | ||
return Settings(**kw) | ||
|
||
return override_get_settings | ||
|
||
def create_db_for_fixture(conn_url): | ||
try: | ||
with ClickhouseClient.from_url(conn_url) as client: | ||
migrations_dir = THIS_DIR / "migrations" | ||
for fn in migrations_dir.iterdir(): | ||
migration_path = fn.resolve() | ||
run_migration(migration_path, click=client) | ||
return conn_url | ||
except Exception: | ||
pytest.skip("database migration failed") | ||
|
||
@pytest.fixture | ||
def alembic_migration(postgresql): | ||
from alembic import command | ||
from alembic.config import Config | ||
|
||
@pytest.fixture(scope="session") | ||
def db(clickhouse_server): | ||
yield create_db_for_fixture(conn_url=clickhouse_server) | ||
db_url = f"postgresql://{postgresql.info.user}:@{postgresql.info.host}:{postgresql.info.port}/{postgresql.info.dbname}" | ||
|
||
migrations_path = ( | ||
Path(__file__).parent.parent / "src" / "oonifindings" / "common" / "alembic" | ||
).resolve() | ||
|
||
def make_override_get_settings(**kw): | ||
def override_get_settings(): | ||
return Settings(**kw) | ||
alembic_cfg = Config() | ||
alembic_cfg.set_main_option("script_location", str(migrations_path)) | ||
alembic_cfg.set_main_option("sqlalchemy.url", db_url) | ||
|
||
return override_get_settings | ||
command.upgrade(alembic_cfg, "head") | ||
yield db_url | ||
|
||
|
||
@pytest.fixture | ||
def client_with_bad_settings(): | ||
app.dependency_overrides[get_settings] = make_override_get_settings( | ||
clickhouse_url = "clickhouse://badhost:9000" | ||
postgresql_url="postgresql://badhost:9000" | ||
) | ||
|
||
client = TestClient(app) | ||
yield client | ||
|
||
|
||
@pytest.fixture | ||
def client(db): | ||
def client(alembic_migration): | ||
app.dependency_overrides[get_settings] = make_override_get_settings( | ||
clickhouse_url=db, | ||
postgresql_url=alembic_migration, | ||
jwt_encryption_key="super_secure", | ||
prometheus_metrics_password="super_secure", | ||
account_id_hashing_key="super_secure" | ||
|
@@ -97,31 +67,33 @@ def create_jwt(payload: dict) -> str: | |
return jwt.encode(payload, "super_secure", algorithm="HS256") | ||
|
||
|
||
def create_session_token(account_id: str, role: str) -> str: | ||
def create_session_token(account_id: str, email: str, role: str) -> str: | ||
now = int(time.time()) | ||
payload = { | ||
"nbf": now, | ||
"iat": now, | ||
"exp": now + 10 * 86400, | ||
"aud": "user_auth", | ||
"account_id": account_id, | ||
"email_address": email, | ||
"login_time": None, | ||
"role": role, | ||
} | ||
return create_jwt(payload) | ||
|
||
|
||
@pytest.fixture | ||
def client_with_user_role(client): | ||
client = TestClient(app) | ||
jwt_token = create_session_token("0" * 16, "user") | ||
jwt_token = create_session_token("0" * 16, "[email protected]", "user") | ||
client.headers = {"Authorization": f"Bearer {jwt_token}"} | ||
yield client | ||
|
||
|
||
@pytest.fixture | ||
def client_with_admin_role(client): | ||
client = TestClient(app) | ||
jwt_token = create_session_token("0" * 16, "admin") | ||
jwt_token = create_session_token("1" * 16, "[email protected]", "admin") | ||
client.headers = {"Authorization": f"Bearer {jwt_token}"} | ||
yield client | ||
|
||
|
@@ -132,7 +104,7 @@ def client_with_hashed_email(client): | |
def _hashed_email(email: str, role: str): | ||
client = TestClient(app) | ||
account_id = hash_email_address(email, "super_secure") | ||
jwt_token = create_session_token(account_id, role) | ||
jwt_token = create_session_token(account_id, email, role) | ||
client.headers = {"Authorization": f"Bearer {jwt_token}"} | ||
return client | ||
|
||
|
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
from copy import deepcopy | ||
from datetime import datetime, timedelta | ||
import pathlib | ||
import pytest | ||
|
||
import sqlalchemy as sa | ||
from sqlalchemy.orm import sessionmaker | ||
from sqlalchemy import create_engine | ||
|
||
from oonifindings import models | ||
from oonifindings.routers.v1 import utcnow_seconds | ||
|
||
sample_start_time = (utcnow_seconds() + timedelta(minutes=-1)) | ||
|
||
SAMPLE_EMAIL = "[email protected]" | ||
|
||
SAMPLE_OONIFINDING = { | ||
"title": "sample oonifinding", | ||
"short_description": "sample oonifinding description", | ||
"reported_by": "sample user", | ||
"email_address": SAMPLE_EMAIL, | ||
"text": "this is a sample oonifinding incident", | ||
"published": 0, | ||
"event_type": "incident", | ||
"start_time": sample_start_time, | ||
"ASNs": [], | ||
"CCs": [ | ||
"IN", "TZ", | ||
], | ||
"tags": [], | ||
"test_names": [ | ||
"webconnectivity", | ||
], | ||
"domains": [ | ||
"www.google.com" | ||
], | ||
"links": [] | ||
} | ||
|
||
def config_alembic(db_url): | ||
from alembic.config import Config | ||
|
||
migrations_path = ( | ||
pathlib.Path(__file__).parent.parent / "src" / "oonifindings" / "common" / "alembic" | ||
).resolve() | ||
|
||
alembic_cfg = Config() | ||
alembic_cfg.set_main_option("script_location", str(migrations_path)) | ||
alembic_cfg.set_main_option("sqlalchemy.url", db_url) | ||
return alembic_cfg | ||
|
||
|
||
def upgrade_to_head(db_url): | ||
from alembic import command | ||
|
||
command.upgrade(config_alembic(db_url), "head") | ||
|
||
|
||
def get_db(pg_url): | ||
engine = create_engine(pg_url) | ||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | ||
|
||
return SessionLocal() | ||
|
||
|
||
def test_downgrade(postgresql): | ||
from alembic import command | ||
|
||
db_url = f"postgresql://{postgresql.info.user}:@{postgresql.info.host}:{postgresql.info.port}/{postgresql.info.dbname}" | ||
|
||
command.upgrade(config_alembic(db_url), "head") | ||
command.downgrade(config_alembic(db_url), "-1") | ||
|
||
|
||
def test_upgrade_to_head(postgresql): | ||
db_url = f"postgresql://{postgresql.info.user}:@{postgresql.info.host}:{postgresql.info.port}/{postgresql.info.dbname}" | ||
upgrade_to_head(db_url) | ||
db = get_db(db_url) | ||
|
||
finding = deepcopy(SAMPLE_OONIFINDING) | ||
|
||
new_row = db.query(models.OONIFinding).first() | ||
|
||
db_finding = models.OONIFinding( | ||
**finding, | ||
incident_id="000000000", | ||
create_time=utcnow_seconds(), | ||
update_time=utcnow_seconds(), | ||
creator_account_id="000000000", | ||
) | ||
db.add(db_finding) | ||
db.commit() | ||
|
||
new_row = db.query(models.OONIFinding).first() | ||
assert new_row | ||
|
||
db.close() | ||
|
||
with pytest.raises(sa.exc.StatementError): | ||
db_finding = models.OONIFinding( | ||
**finding, | ||
incident_id="000000000", | ||
create_time="NOT A DATE", | ||
update_time=utcnow_seconds(), | ||
creator_account_id="000000000", | ||
) | ||
db.add(db_finding) | ||
db.commit() | ||
db.rollback() | ||
|
||
with pytest.raises(sa.exc.StatementError): | ||
naive_datetime = datetime.now() | ||
db_finding = models.OONIFinding( | ||
**finding, | ||
incident_id="000000000", | ||
create_time=naive_datetime, | ||
update_time=utcnow_seconds(), | ||
creator_account_id="000000000", | ||
) | ||
db.add(db_finding) | ||
db.commit() | ||
db.rollback() | ||
|
||
with pytest.raises(sa.exc.StatementError): | ||
db_finding = models.OONIFinding( | ||
**finding, | ||
incident_id="000000000", | ||
create_time=None, | ||
update_time=utcnow_seconds(), | ||
creator_account_id="000000000", | ||
) | ||
db.add(db_finding) | ||
db.commit() | ||
db.rollback() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.