Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
leonlolly committed Jan 15, 2024
1 parent 1c820d1 commit f690d5b
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 223 deletions.
2 changes: 0 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ logging-modules=logging
[SIMILARITIES]
min-similarity-lines=8
ignore-comments=yes
ignore-docstrings=yes
ignore-imports=no

[FORMAT]
Expand Down Expand Up @@ -79,7 +78,6 @@ contextmanager-decorators=contextlib.contextmanager

[MISCELLANEOUS]
notes=FIXME,XXX,TODO
disable=missing-function-docstring


[BASIC]
Expand Down
244 changes: 129 additions & 115 deletions wannadb_web/postgres/queries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Union

import bcrypt
from psycopg2 import sql

Expand All @@ -9,24 +11,28 @@ def getUserID(user: str):
result = execute_query(select_query, (user,))
if isinstance(result[0], int):
return int(result[0])
return Exception("No user found with that name")
return None


def getOrganisationID(organisation_name: str):
select_query = sql.SQL("SELECT id FROM organisations WHERE name = %s;")
return execute_query(select_query, (organisation_name,))


def getOrganisationName(organisation_id: int):
select_query = sql.SQL("SELECT name FROM organisations WHERE id = %s;")
response = execute_query(select_query, (organisation_id,))
if response is None:
return -1
return str(response[0])


def getMembersOfOrganisation(organisation_id: int):
select_query = sql.SQL("SELECT username FROM users WHERE id IN (SELECT userid FROM membership WHERE organisationid = %s);")
select_query = sql.SQL(
"SELECT username FROM users WHERE id IN (SELECT userid FROM membership WHERE organisationid = %s);")
return execute_query(select_query, (organisation_id,))


def getMemberIDsFromOrganisationID(organisationID: int):
select_query = sql.SQL("SELECT userid FROM membership WHERE organisationid = %s;")
return execute_query(select_query, (organisationID,))
Expand All @@ -36,6 +42,7 @@ def getUserNameSuggestion(prefix: str):
select_query = sql.SQL("SELECT username FROM users WHERE username LIKE %s;")
return execute_query(select_query, (prefix + "%",))


def getOrganisationIDsFromUserId(userID: int):
try:
select_query = sql.SQL("SELECT organisationid FROM membership WHERE userid = %s;")
Expand All @@ -50,6 +57,7 @@ def getOrganisationIDsFromUserId(userID: int):
except Exception as e:
return None, e


def getOrganisationFromUserId(user_id: int):
try:
select_query = sql.SQL(""" SELECT organisationid, o.name
Expand All @@ -68,21 +76,29 @@ def getOrganisationFromUserId(user_id: int):
except Exception as e:
return None, e


def checkPassword(user: str, password: str):
"""Checks if the password is correct for the given user
Returns:
user_id: int (if password is correct)
False: bool (if password is incorrect)
Exception: Exception (if something went wrong)
Raises:
None
"""
select_query = sql.SQL("SELECT password,id as pw FROM users WHERE username = %s ")
_password, _id = execute_query(select_query, (user,))[0]

try:
if _password:
stored_password = bytes(_password)
check = bcrypt.checkpw(password.encode('utf-8'), stored_password)
if check:
return int(_id)
result = execute_query(select_query, (user,))
_password, _id = result[0]

return False
if _password:
stored_password = bytes(_password)
check = bcrypt.checkpw(password.encode('utf-8'), stored_password)
if check:
return int(_id)

except Exception as e:
return e
return False


def checkOrganisationAuthorisation(organisationName: str, userName: str):
Expand All @@ -92,14 +108,12 @@ def checkOrganisationAuthorisation(organisationName: str, userName: str):
"organisationid = (Select id from organisations where name = (%s))")

result = execute_query(select_query, (organisationName, userName))
try:
if isinstance(result[0], int):
return int(result[0])
if result[0] is None:
return Exception("No authorisation found")
if isinstance(result[0], int):
return int(result[0])
if result[0] is None:
return None


except Exception as e:
return Exception("checkOrganisationAuthorisation failed because: \n", e)


def _getDocument(documentId: int):
Expand All @@ -108,22 +122,31 @@ def _getDocument(documentId: int):
where id = (%s)""")

result = execute_query(select_query, (documentId,))
try:
if result[0]:
if result[0][0]:
content = result[0][0]
return str(content)
else:
content = result[0][1]
return bytes(content)
else:
return None

except Exception as e:
print("_getDocument failed because: \n", e)
if result[0]:
if result[0][0]:
content = result[0][0]
return str(content)
else:
content = result[0][1]
return bytes(content)
else:
return None


def getDocument_by_name(document_name: str, organisation_id: int, user_id: int):
"""
Returns:
name: str
content: str or bytes
Raises:
Exception: if no document with that name is found
Exception: if multiple documents with that name are found
"""



select_query = sql.SQL("""SELECT name,content,content_byte
FROM documents d
JOIN membership m ON d.organisationid = m.organisationid
Expand Down Expand Up @@ -154,53 +177,48 @@ def getDocument(document_id: int, user_id: int):
""")

result = execute_query(select_query, (document_id, user_id,))
try:
if len(result) > 0:
for document in result:
name = document[0]
if document[1]:
content = document[1]
return str(name), str(content)
elif document[2]:
content = document[2]
return str(name), bytes(content)
else:
return None
except Exception as e:
print("getDocument failed because:\n", e)

def getDocumentsForOrganization(organisation_id: int):
try:
select_query = sql.SQL("""SELECT id, name,content,content_byte
FROM documents
WHERE organisationid = (%s)
""")
result = execute_query(select_query, (organisation_id,))

if result == None or len(result) == 0:
return []

doc_array = []

if len(result) > 0:
for document in result:
id = document[0]
name = document[1]
content = '';
if document[2]:
name = document[0]
if document[1]:
content = document[1]
return str(name), str(content)
elif document[2]:
content = document[2]
elif document[3]:
content = document[3]
doc_array.append({
"id": id,
"name": name,
"content": content
})
return doc_array
return str(name), bytes(content)
else:
return None

except Exception as e:
print("getDocumentsForOrganization failed because:\n", e)

def getDocumentsForOrganization(organisation_id: int):

select_query = sql.SQL("""SELECT id, name,content,content_byte
FROM documents
WHERE organisationid = (%s)
""")
result = execute_query(select_query, (organisation_id,))

if result is None or len(result) == 0:
return []

doc_array = []

for document in result:
id = document[0]
name = document[1]
content = ''
if document[2]:
content = document[2]
elif document[3]:
content = document[3]
doc_array.append({
"id": id,
"name": name,
"content": content
})
return doc_array


def updateDocumentContent(doc_id: int, new_content):
try:
select_query = sql.SQL("""SELECT content, content_byte
Expand All @@ -213,9 +231,9 @@ def updateDocumentContent(doc_id: int, new_content):
content_type = "content"
if result[0][0] == None:
content_type = "content_byte"
update_query = sql.SQL("UPDATE documents SET "+content_type+" = (%s) WHERE id = (%s)")
update_query = sql.SQL("UPDATE documents SET " + content_type + " = (%s) WHERE id = (%s)")
execute_transaction(update_query, (new_content, doc_id,), commit=True, fetch=False)
return True
return True
except Exception as e:
print("updateDocumentContent failed because:\n", e)
return False
Expand All @@ -229,26 +247,24 @@ def getDocuments(document_ids: list[int], user_id: int):
({",".join(str(_id) for _id in document_ids)})
""")
result = execute_query(select_query, (user_id,))
try:
if isinstance(result, list) and isinstance(result[0], tuple):
if len(result) > 0:
if result[0][1]:
documents = []
for document in result:
name = document[0]
content = document[1]
documents.append((str(name), str(content)))
return documents
elif result[0][2]:
b_documents = []
for document in result:
name = document[0]
content = document[2]
b_documents.append((str(name), bytes(content)))
return b_documents
return Exception("no documents found")
except Exception as e:
return Exception("getDocuments failed because:\n", e)
if isinstance(result, list) and isinstance(result[0], tuple):
if len(result) > 0:
if result[0][1]:
documents = []
for document in result:
name = document[0]
content = document[1]
documents.append((str(name), str(content)))
return documents
elif result[0][2]:
b_documents = []
for document in result:
name = document[0]
content = document[2]
b_documents.append((str(name), bytes(content)))
return b_documents
return []



def getDocument_ids(organisation_id: int, user_id: int):
Expand All @@ -259,23 +275,21 @@ def getDocument_ids(organisation_id: int, user_id: int):
""")

result = execute_query(select_query, (organisation_id, user_id,))
try:
if isinstance(result, list) and isinstance(result[0], tuple):
if len(result) > 0:
if result[0][1]:
documents = []
for document in result:
name = document[0]
content = document[1]
documents.append((str(name), str(content)))
return documents
elif result[0][2]:
b_documents = []
for document in result:
name = document[0]
content = document[2]
b_documents.append((str(name), bytes(content)))
return b_documents
return []
except Exception as e:
return Exception("getDocuments failed because:\n", e)
if isinstance(result, list) and isinstance(result[0], tuple):
if len(result) > 0:
if result[0][1]:
documents = []
for document in result:
name = document[0]
content = document[1]
documents.append((str(name), str(content)))
return documents
elif result[0][2]:
b_documents = []
for document in result:
name = document[0]
content = document[2]
b_documents.append((str(name), bytes(content)))
return b_documents
return []

Loading

0 comments on commit f690d5b

Please sign in to comment.