Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
leonlolly committed Jan 16, 2024
1 parent a0fbee7 commit 74ee592
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 37 deletions.
1 change: 1 addition & 0 deletions backend-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ celery~=5.3.6
flower~=2.0.1
redis~=5.0.1
pickle5~=0.0.11
mypy==1.5.1
8 changes: 7 additions & 1 deletion wannadb_web/SQLite/Cache_DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@


class SQLiteCacheDBWrapper:

def __init__(self, user_id: int, db_file="wannadb_cache.db"):
"""Initialize the RedisCache instance for a specific user."""
self.db_identifier = f"{user_id}_{db_file}"
if db_file == ":memory:":
self.db_identifier = db_file
else:
self.db_identifier = f"{user_id}_{db_file}"
self.cache_db = SQLiteCacheDB(db_file=self.db_identifier)
if self.cache_db.conn is None:
raise Exception("Cache db could not be initialized")

def delete(self):
self.cache_db.conn.close()
Expand Down
7 changes: 3 additions & 4 deletions wannadb_web/postgres/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,14 @@ def addUser(user: str, password: str):
"""


pwBytes = password.encode('utf-8')
salt = bcrypt.gensalt()
pwHash = bcrypt.hashpw(pwBytes, salt)
# Needed this for the correct password check don't know why...
pwHash = pwHash.decode('utf-8')
pwHashcode = pwHash.decode('utf-8')

insert_data_query = sql.SQL("INSERT INTO users (username, password) VALUES (%s, %s) returning id;")
data_to_insert = (user, pwHash)
data_to_insert = (user, pwHashcode)
response = execute_transaction(insert_data_query, data_to_insert, commit=True)
if response is IntegrityError:
raise IntegrityError("User already exists")
Expand Down Expand Up @@ -220,7 +219,7 @@ def leaveOrganisation(organisationId: int, sessionToken: str):
userid = token.id

count_query = sql.SQL("SELECT COUNT(*) FROM membership WHERE userid = (%s) AND organisationid = (%s)")
count = execute_transaction(count_query,(userid, organisationId,), commit=True)
count = execute_transaction(count_query, (userid, organisationId,), commit=True)
count = int(count[0][0])
if count != 1:
return False, "You are not in this organisation"
Expand Down
17 changes: 9 additions & 8 deletions wannadb_web/routing/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,27 @@ def create_document():
}
"""
form = request.form
authorization = request.headers.get("authorization")
#authorization = form.get("authorization")
#authorization = request.headers.get("authorization")
authorization = form.get("authorization")
organisation_id = form.get("organisationId")
base_name = form.get("baseName")
document_ids = form.get("document_ids")
attributes = form.get("attributes")
attributes_string = form.get("attributes")
_token = tokenDecode(authorization)

attributes = []
for attribute_string in attributes_string:
attributes.append(Attribute(attribute_string))

statistics = Statistics(False)
user_id = _token.id

attributesDump = pickle.dumps(attributes)
statisticsDump = pickle.dumps(statistics)

# TODO BUG EXPected 5 arguments, got 7

task = create_document_base_task.apply_async(args=(user_id, document_ids, attributesDump, statisticsDump,))
#base_name,organisation_id))
task = create_document_base_task.apply_async(args=(user_id, document_ids, attributesDump, statisticsDump,
base_name,organisation_id))

return make_response({'task_id': task.id}, 202)

Expand All @@ -108,10 +111,8 @@ def task_status(task_id):# -> Any:
if task.status == "FAILURE":
return make_response(
{"state": "FAILURE", "meta": str(meta)}, 500)
print(meta)
if not isinstance(meta, bytes):
return make_response({"error": "task not correct"}, 404)

taskObject = TaskObject.from_dump(meta)
return make_response({"state": taskObject.state.value, "meta": taskObject.signals.to_json(), "msg": taskObject.msg},
200)
42 changes: 25 additions & 17 deletions wannadb_web/worker/Web_API.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import io
import json
import logging
import traceback
from typing import Optional

from wannadb import resources
from wannadb.configuration import Pipeline
from wannadb.data.data import Attribute, Document, DocumentBase
from wannadb.interaction import EmptyInteractionCallback, InteractionCallback
from wannadb.interaction import EmptyInteractionCallback
from wannadb.matching.distance import SignalsMeanDistance
from wannadb.matching.matching import RankingBasedMatcher
from wannadb.preprocessing.embedding import BERTContextSentenceEmbedder, RelativePositionEmbedder, \
Expand All @@ -17,7 +19,6 @@
from wannadb.preprocessing.normalization import CopyNormalizer
from wannadb.preprocessing.other_processing import ContextSentenceCacher
from wannadb.statistics import Statistics
from wannadb.status import StatusCallback
from wannadb_web.Redis.RedisCache import RedisCache
from wannadb_web.SQLite import Cache_DB
from wannadb_web.SQLite.Cache_DB import SQLiteCacheDBWrapper
Expand All @@ -30,18 +31,26 @@

class WannaDB_WebAPI:

def __init__(self, user_id: int, task_object:TaskObject):
def __init__(self, user_id: int, task_object: TaskObject, document_base_name: str, organisation_id: int):
logger.info("WannaDB_WebAPI initialized")
self.user_id = user_id
self.sqLiteCacheDBWrapper = SQLiteCacheDBWrapper(user_id, db_file=":memory:")
self.redisCache = RedisCache(user_id)
self.status_callback = task_object.status_callback
self.interaction_callback = task_object.interaction_callback
self.signals = task_object.signals
self.document_base_name = document_base_name
self.organisation_id = organisation_id
if resources.MANAGER is None:
self.signals.error.emit("Resource Manager not initialized!")
raise Exception("Resource Manager not initialized!")
if self.sqLiteCacheDBWrapper.cache_db.conn is None:
self.signals.error.emit("Cache db could not be initialized!")
raise Exception("Cache db could not be initialized!")

def create_document_base(self, documents: list[Document], attributes: list[Attribute], statistics: Statistics):
logger.debug("Called slot 'create_document_base'.")
self.signals.status.emit("Creating document base...", -1)
self.signals.status.emit("create_document_base")
try:
self.sqLiteCacheDBWrapper.reset_cache_db()

Expand All @@ -51,10 +60,9 @@ def create_document_base(self, documents: list[Document], attributes: list[Attri
if not document_base.validate_consistency():
logger.error("Document base is inconsistent!")
error = "Document base is inconsistent!"
return error

# load default preprocessing phase
self.signals.status.emit("Loading preprocessing phase...", -1)
self.signals.status.emit("Loading preprocessing phase...")

# noinspection PyTypeChecker
preprocessing_phase = Pipeline([
Expand All @@ -73,20 +81,21 @@ def create_document_base(self, documents: list[Document], attributes: list[Attri
preprocessing_phase(document_base, EmptyInteractionCallback(), self.status_callback, statistics)

self.signals.document_base_to_ui.emit(document_base)
self.signals.statistics_to_ui.emit(statistics)
self.signals.finished.emit("Finished!")
self.signals.statistics.emit(statistics)
self.signals.finished.emit(1)
self.signals.status.emit("Finished!")

except Exception as e:
self.signals.error.emit(e)
traceback_str = traceback.format_exc()
self.signals.error.emit(str(e) + "\n" + traceback_str)

def load_document_base_from_bson(self, document_id: int, user_id: int):
def load_document_base_from_bson(self):
logger.debug("Called function 'load_document_base_from_bson'.")
wrapper_cache_db: Optional[SQLiteCacheDBWrapper] = None
try:
wrapper_cache_db = Cache_DB.Cache_Manager.user(user_id)
cache_db = wrapper_cache_db.cache_db
self.sqLiteCacheDBWrapper.reset_cache_db()

document = getDocument(document_id, user_id)
document = getDocument(self.document_id, user_id)
get
if isinstance(document, str):
logger.error("document is not a DocumentBase!")
return -1
Expand All @@ -96,7 +105,6 @@ def load_document_base_from_bson(self, document_id: int, user_id: int):
logger.error("Document base is inconsistent!")
return -1

wrapper_cache_db.reset_cache_db()

for attribute in document_base.attributes:
cache_db.create_table_by_name(attribute.name)
Expand All @@ -112,10 +120,10 @@ def load_document_base_from_bson(self, document_id: int, user_id: int):
if wrapper_cache_db is not None:
wrapper_cache_db.disconnect()

def save_document_base_to_bson(self, name: str, organisation_id: int, document_base: DocumentBase, user_id: int):
def save_document_base_to_bson(self, document_base_name: str, organisation_id: int, document_base: DocumentBase, user_id: int):
logger.debug("Called function 'save_document_base_to_bson'.")
try:
document_id = addDocument(name, document_base.to_bson(), organisation_id, user_id)
document_id = addDocument(document_base_name, document_base.to_bson(), organisation_id, user_id)
if document_id is None:
logger.error("Document base could not be saved to BSON!")
elif document_id == -1:
Expand Down
64 changes: 58 additions & 6 deletions wannadb_web/worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
import logging
import pickle
import random
import time

from celery import current_app

from wannadb.data.data import Document, Attribute
from wannadb.resources import ResourceManager
from wannadb.statistics import Statistics
from wannadb_web.Redis.util import RedisConnection
from wannadb_web.postgres.queries import getDocuments
from wannadb_web.util import tokenDecode
from wannadb_web.worker.Web_API import WannaDB_WebAPI
from wannadb_web.worker.util import TaskObject, State, TaskUpdate
from wannadb_web.worker.util import State, TaskUpdate
from wannadb_web.worker.util import TaskObject


class U:

def update_state(*args, **kwargs):
print('update_state called with args: ', args, ' and kwargs: ', kwargs)
print("meta: ", TaskObject.from_dump(kwargs.get("meta")).signals.to_json())


logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# RedisConnection()
# ResourceManager()
# authorization = (
# "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyIjoibGVvbiIsImlkIjoxfQ.YM9gwcXeFSku-bz4RUKkymYvA6Af13sxH-BRlnjCCEA")
# _token = tokenDecode(authorization)
# _base_name = "base_name"
# document_ids = [2, 3]
# attribute = Attribute("a")
# statistics = Statistics(False)
# user_id = 1
# attributesDump = pickle.dumps([attribute])
# statisticsDump = pickle.dumps(statistics)
# uuuuuuuu = U()


@current_app.task(bind=True)
def create_document_base_task(self, user_id, document_ids: [int], attributes_dump: bytes, statistics_dump: bytes):
def create_document_base_task(self, user_id, document_ids: list[int], attributes_dump: bytes, statistics_dump: bytes,
base_name: str, organisation_id: int):
"""
define values
"""

attributes: list[Attribute] = pickle.loads(attributes_dump)
statistics: Statistics = pickle.loads(statistics_dump)

Expand All @@ -34,10 +59,17 @@ def task_callback_fn(state: str, meta: TaskObject):

task_object = TaskObject(task_callback)

api = WannaDB_WebAPI(1, task_object)
"""
init api
"""

api = WannaDB_WebAPI(1, task_object, base_name, organisation_id)

task_object.update(state=State.PENDING, msg="api created")
try:
"""
decoding
"""
if not isinstance(attributes[0], Attribute):
task_object.update(State.FAILURE, "Invalid attributes")
raise Exception("Invalid attributes")
Expand All @@ -54,13 +86,33 @@ def task_callback_fn(state: str, meta: TaskObject):
documents.append(Document(doc[0], doc[1]))
else:
print("No documents found")
# raise Exception("No documents found")
"""
Creating document base
"""

api.create_document_base(documents, attributes, statistics)
if task_object.signals.error.msg:
task_object.update(State.FAILURE, api.signals)

"""
saving document base
"""

#api.save_document_base_to_bson()

"""
response
"""

if task_object.signals.finished.msg:
task_object.update(State.SUCCESS, task_object.signals.finished.msg)
else:
task_object.update(State.ERROR, "task_object signals not set?")
return task_object.to_dump()

except Exception as e:
self.update_state(state=State.FAILURE.value, meta={'exception': str(e)})
task_object.update(State.FAILURE, "Exception: " + str(e))
task_object.to_dump()


@current_app.task(bind=True)
Expand Down
2 changes: 1 addition & 1 deletion wannadb_web/worker/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def update(self, state: State, msg=""):
self.msg = msg
self.task_update_fn(self.state.value, self)
else:
raise Exception("update error State is none")
raise Exception(f"update error State is {type(state)}")

def to_dump(self):
state = self.state
Expand Down

0 comments on commit 74ee592

Please sign in to comment.