Skip to content

Commit

Permalink
Session Management Tested
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnGrubba committed Jul 23, 2024
1 parent 69d31b5 commit faebd98
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 8 deletions.
43 changes: 40 additions & 3 deletions src/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
import bcrypt
from tools import users_collection, sessions_collection
from bson import ObjectId


@pytest.fixture
Expand All @@ -24,16 +25,52 @@ def fixtureuser() -> dict:
return user_data


@pytest.fixture
def fixtureuser2() -> dict:
users_collection.find_one_and_delete({"email": "[email protected]"})
hashed_pswd = bcrypt.hashpw("Kennwort1!".encode("utf-8"), bcrypt.gensalt(5)).decode(
"utf-8"
)
user_data = {
"password": hashed_pswd,
"email": "[email protected]",
"username": "FixtureUser1",
"createdAt": datetime.datetime.now(),
}
result = users_collection.insert_one(user_data)
user_data["password"] = "Kennwort1!"
user_data["_id"] = str(result.inserted_id)
return user_data


@pytest.fixture
def fixturesessiontoken_user(fixtureuser) -> tuple[str, dict]:
sessions_collection.delete_many({})
# Generate a new session token
session_token = str(uuid.uuid4())
# Persist the session
session_id = sessions_collection.insert_one(
{
"session_token": session_token,
"user_id": ObjectId(fixtureuser["_id"]),
"createdAt": datetime.datetime.now(),
"device_information": {},
}
)
return session_token, fixtureuser, session_id.inserted_id


@pytest.fixture
def fixturesessiontoken_user2(fixtureuser2) -> tuple[str, dict]:
# Generate a new session token
session_token = str(uuid.uuid4())
# Persist the session
sessions_collection.insert_one(
session_id = sessions_collection.insert_one(
{
"session_token": session_token,
"user_id": fixtureuser["_id"],
"user_id": ObjectId(fixtureuser2["_id"]),
"createdAt": datetime.datetime.now(),
"device_information": {},
}
)
return session_token, fixtureuser
return session_token, fixtureuser2, session_id.inserted_id
8 changes: 7 additions & 1 deletion src/api/profile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@


# Unauthorized Route Test
def test_unauthorized_profile(fixtureuser):
def test_unauthorized_profile():
response = client.get("/profile")
assert response.status_code == 401


def test_get_profile_invalid_session():
client.cookies.set("session", "invalid")
response = client.get("/profile")
assert response.status_code == 401

Expand Down
36 changes: 36 additions & 0 deletions src/api/sessions_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from fastapi.testclient import TestClient

from .main import app


client = TestClient(app)


# SUCCESSFUL TESTS
def test_get_sessions(fixturesessiontoken_user):
client.cookies.set("session", fixturesessiontoken_user[0])
response = client.get("/sessions")
resp_js = response.json()
assert response.status_code == 200
assert isinstance(resp_js.get("sessions"), list)
assert len(resp_js.get("sessions")) == 1
# Don't leak out session_token
assert resp_js.get("sessions")[0].get("session_token") is None


def test_delete_session(fixturesessiontoken_user):
client.cookies.set("session", fixturesessiontoken_user[0])
response = client.delete("/sessions/" + str(fixturesessiontoken_user[2]))
assert response.status_code == 204


def test_delete_session_otheracc(fixturesessiontoken_user, fixturesessiontoken_user2):
client.cookies.set("session", fixturesessiontoken_user[0])
response = client.delete("/sessions/" + str(fixturesessiontoken_user2[2]))
assert response.status_code == 404


def test_delete_session_nonexistent(fixturesessiontoken_user):
client.cookies.set("session", fixturesessiontoken_user[0])
response = client.delete("/sessions/test")
assert response.status_code == 404
17 changes: 13 additions & 4 deletions src/crud/sessions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import uuid
from tools.db import sessions_collection
from tools import sessions_collection
import datetime
from tools.conf import SessionConfig
from fastapi import Request
from user_agents import parse
from bson import ObjectId
from bson import ObjectId, errors
from fastapi import HTTPException


def create_login_session(user_id: ObjectId, request: Request) -> str:
Expand Down Expand Up @@ -82,7 +83,7 @@ def delete_session(session_token: str) -> bool:
)


def get_user_sessions(user_id: str) -> list:
def get_user_sessions(user_id: ObjectId) -> list:
"""
Get all sessions for a user.
Expand All @@ -92,6 +93,10 @@ def get_user_sessions(user_id: str) -> list:
Returns:
list: List of Sessions
"""
try:
user_id = ObjectId(user_id)
except errors.InvalidId:
raise HTTPException(status_code=404, detail="User not found.")
return list(sessions_collection.find({"user_id": user_id}))


Expand All @@ -105,4 +110,8 @@ def get_session_by_id(session_id: str) -> dict:
Returns:
dict: Session Information
"""
return sessions_collection.find_one({"_id": ObjectId(session_id)})
try:
object_id = ObjectId(session_id)
except errors.InvalidId:
raise HTTPException(status_code=404, detail="Session not found.")
return sessions_collection.find_one({"_id": object_id})
2 changes: 2 additions & 0 deletions src/tools/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def bson_to_json(data: bson.BSON) -> dict:
# Iterate over the keys and convert the _id to a string
for key in original_json:
if isinstance(original_json[key], dict):
if len(list(original_json[key].keys())) == 0:
continue
if list(original_json[key].keys())[0] == "$oid":
original_json[key] = str(original_json[key]["$oid"])
elif list(original_json[key].keys())[0] == "$date":
Expand Down

0 comments on commit faebd98

Please sign in to comment.