Skip to content

Commit

Permalink
giant refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
leonlolly committed Jan 23, 2024
1 parent c9c2c2f commit 305959e
Show file tree
Hide file tree
Showing 12 changed files with 555 additions and 525 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ FROM build as dev

#CMD [ "python", "app.py" ]

CMD ["mypy","--install-types", "--non-interactive"]

CMD ["flask", "--app", "app", "--debug", "run","--host","0.0.0.0", "--port", "8000" ]


Expand Down
22 changes: 0 additions & 22 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import logging
import os

from celery import Celery, Task
from flask import Flask, make_response, render_template_string
from flask_cors import CORS
from flask_debugtoolbar import DebugToolbarExtension

from wannadb.resources import ResourceManager
from wannadb_web.Redis.util import RedisConnection
from wannadb_web.routing.core import core_routes
from wannadb_web.routing.dev import dev_routes
from wannadb_web.routing.user import user_management
Expand All @@ -18,23 +14,6 @@
app = Flask(__name__)


def celery_init_app(_app: Flask) -> Celery:
_app.app_context()
RedisConnection()
ResourceManager()

class FlaskTask(Task):

def __call__(self, *args: object, **kwargs: object) -> object:
return self.run(*args, **kwargs)

celery_app = Celery(_app.name, task_cls=FlaskTask)
celery_app.config_from_object(_app.config) # Use the app's entire configuration
celery_app.set_default()
_app.extensions["celery"] = celery_app
return celery_app


# Combine Flask and Celery configs
app.config.from_mapping(
SECRET_KEY='secret!',
Expand All @@ -52,7 +31,6 @@ def __call__(self, *args: object, **kwargs: object) -> object:
toolbar = DebugToolbarExtension(app)


celery = celery_init_app(app)

# Register the blueprints
app.register_blueprint(main_routes)
Expand Down
17 changes: 17 additions & 0 deletions celery_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import logging
import os

from celery import Celery

from wannadb_web.worker.tasks import BaseTask, TestTask, InitManager, CreateDocumentBase

logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")

app = Celery(__name__)

app.conf.broker_url = os.environ.get("CELERY_BROKER_URL")

app.register_task(BaseTask)
app.register_task(TestTask)
app.register_task(InitManager)
app.register_task(CreateDocumentBase)
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
dockerfile: Dockerfile
target: worker
tty: true
command: ['celery', '-A', 'app.celery', 'worker', '-l', 'info']
command: ['celery', '-A', 'celery_app', 'worker', '-l', 'info']
env_file:
- wannadb_web/.env/.dev
volumes:
Expand All @@ -42,7 +42,7 @@ services:
dockerfile: Dockerfile
target: worker
tty: true
command: ['celery', '-A', 'app.celery', 'flower']
command: ['celery', '-A', 'celery_app', 'flower']
env_file:
- wannadb_web/.env/.dev
volumes:
Expand Down
20 changes: 13 additions & 7 deletions wannadb_web/Redis/RedisCache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Union
import logging

from wannadb_web.Redis import util
Expand All @@ -9,20 +9,26 @@
class RedisCache:
def __init__(self, user_id: int) -> None:
"""Initialize the RedisCache instance for a specific user."""
self.redis_client = util.Redis_Connection.redis_client
self.redis_client = util.connectRedis()
self.user_space_key = f"user:{str(user_id)}"

def set(self, key: str, value: str) -> None:
def set(self, key: str, value: Union[str, bytes, int, float]) -> None:
"""Set a key-value pair in the user-specific space."""
user_key = f"{self.user_space_key}:{key}"
self.redis_client.set(user_key, value)
self.redis_client.set(name=user_key, value=value)

def get(self, key: str) -> Optional[str]:
def get(self, key: str) -> Optional[Union[str, bytes, int, float]]:
"""Get the value associated with a key in the user-specific space."""
user_key = f"{self.user_space_key}:{key}"
return self.redis_client.get(user_key)

def delete(self, key: str) -> None:
"""Delete the key-value pair associated with a key in the user-specific space."""
user_key = f"{self.user_space_key}:{key}"
self.redis_client.delete(user_key)


def close(self) -> None:
"""Close the Redis connection for the user-specific space."""
self
pass
self.redis_client.close()
self.redis_client = None
38 changes: 4 additions & 34 deletions wannadb_web/Redis/util.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import os
from typing import Optional
import logging
import os

import redis

CACHE_HOST = os.environ.get("CACHE_HOST", "127.0.0.1")
CACHE_PORT = int(os.environ.get("CACHE_PORT", 6379))
CACHE_DB = int(os.environ.get("CACHE_DB", 0))
CACHE_PASSWORD = os.environ.get("CACHE_PASSWORD")
CACHE_PASSWORD = int(os.environ.get("CACHE_PASSWORD", 0))

logger = logging.getLogger(__name__)
print(CACHE_HOST, CACHE_PORT, CACHE_DB, CACHE_PASSWORD)

Redis_Connection: Optional["RedisConnection"] = None
logger = logging.getLogger(__name__)


def connectRedis():
Expand All @@ -25,32 +24,3 @@ def connectRedis():
return redis_client
except Exception as e:
raise Exception("Redis connection failed because:", e)


class RedisConnection:
def __init__(self) -> None:
"""Initialize the Redis_Connection manager."""
global Redis_Connection
if Redis_Connection is not None:
logger.error("There can only be one Redis_Connection!")
raise RuntimeError("There can only be one Redis_Connection!")
else:
Redis_Connection = self
self.redis_client = connectRedis()
logger.info("Initialized the Redis_Connection.")

def __enter__(self) -> "RedisConnection":
"""Enter the Redis_Connection context."""
logger.info("Entered the Redis_Connection.")
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit the Redis_Connection context."""
logger.info("Kill all Redis connections")
global Redis_Connection
if Redis_Connection is None:
logger.error("Redis_Connection is None!")
raise RuntimeError("Redis_Connection is None!")
Redis_Connection.redis_client.close()
Redis_Connection = None
logger.info("Exited the resource manager.")
61 changes: 31 additions & 30 deletions wannadb_web/routing/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
"""
import logging.config
import pickle
from typing import Optional

from celery.result import AsyncResult
from flask import Blueprint, make_response, jsonify, url_for, request
from flask import Blueprint, make_response, request

from wannadb.data.data import Attribute
from wannadb.statistics import Statistics
from wannadb_web.Redis.RedisCache import RedisCache
from wannadb_web.util import tokenDecode
from wannadb_web.worker.data import nugget_to_json
from wannadb_web.worker.tasks import create_document_base_task, long_task, update_document_base
from wannadb_web.worker.util import TaskObject
from wannadb_web.worker.tasks import CreateDocumentBase

core_routes = Blueprint('core_routes', __name__, url_prefix='/core')

Expand Down Expand Up @@ -72,9 +72,10 @@ def create_document():
form = request.form
# authorization = request.headers.get("authorization")
authorization = form.get("authorization")
organisation_id = form.get("organisationId")
organisation_id: Optional[int] = form.get("organisationId")
base_name = form.get("baseName")
document_ids = form.get("document_ids")
document_ids: Optional[list[int]] = form.get("document_ids")
document_ids = [2, 3]
attributes_string = form.get("attributes")
if (organisation_id is None or base_name is None or document_ids is None or attributes_string is None
or authorization is None):
Expand All @@ -94,11 +95,12 @@ def create_document():
attributesDump = pickle.dumps(attributes)
statisticsDump = pickle.dumps(statistics)

task = create_document_base_task.apply_async(args=(user_id, document_ids, attributesDump, statisticsDump,
task = CreateDocumentBase(user_id).apply_async(args=(document_ids, attributesDump, statisticsDump,
base_name, organisation_id))

return make_response({'task_id': task.id}, 202)


@core_routes.route('/document_base/attributes', methods=['UPDATE'])
def document_base():
"""
Expand Down Expand Up @@ -144,30 +146,29 @@ def document_base():
attributesDump = pickle.dumps(attributes)
statisticsDump = pickle.dumps(statistics)

task = update_document_base.apply_async(args=(user_id, attributesDump, statisticsDump,
base_name, organisation_id))

return make_response({'task_id': task.id}, 202)


@core_routes.route('/longtask', methods=['POST'])
def longtask():
task = long_task.apply_async()
return jsonify(str(task.id)), 202, {'Location': url_for('core_routes.task_status',
task_id=task.id)}
# @core_routes.route('/longtask', methods=['POST'])
# def longtask():
# task = long_task.apply_async()
# return jsonify(str(task.id)), 202, {'Location': url_for('core_routes.task_status',
# task_id=task.id)}


@core_routes.route('/status/<task_id>')
def task_status(task_id):
@core_routes.route('/status/<task_id>', methods=['GET'])
def task_status(task_id: str):
task: AsyncResult = AsyncResult(task_id)
meta = task.info
if meta is None:
return make_response({"error": "task not found"}, 404)
if task.status == "FAILURE":
return make_response(
{"state": "FAILURE", "meta": str(meta)}, 500)
if not isinstance(meta, bytes):
return make_response({"error": "task not correct"}, 404)
taskObject = TaskObject.from_dump(meta)
return make_response({"state": taskObject.state.value, "meta": taskObject.signals.to_json()},
200)
status = task.status
print(task.info)
if status == "FAILURE":
return make_response({"state": "FAILURE", "meta": str(task.result)}, 500)
if status == "SUCCESS":
return make_response({"state": "SUCCESS", "meta": str(task.result)}, 200)
if status is None:
return make_response({"error": "task not found"}, 500)
return make_response({"state": task.status, "meta": str(task.result)}, 202)


@core_routes.route('/status/<task_id>', methods=['POST'])
def task_update(task_id: str):
redis_client = RedisCache(int(task_id)).redis_client
redis_client.set("input", "test")
1 change: 1 addition & 0 deletions wannadb_web/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Authorisation(Enum):


def tokenEncode(obj: dict[str, Any]):
obj["exp"] = datetime.datetime.now() + datetime.timedelta(hours=1)
return jwt.encode(obj, _jwtkey, algorithm="HS256")


Expand Down
Loading

0 comments on commit 305959e

Please sign in to comment.