diff --git a/app/config.py b/app/config.py index 0b63a07..10668e0 100644 --- a/app/config.py +++ b/app/config.py @@ -3,7 +3,7 @@ class Settings(BaseSettings): pflink_mongodb: MongoDsn = 'mongodb://localhost:27017' - version: str = "4.0.3" + version: str = "4.0.4" mongo_username: str = "admin" mongo_password: str = "admin" log_level: str = "DEBUG" diff --git a/app/controllers/cube.py b/app/controllers/cube.py index 392b88b..8c9c2ef 100644 --- a/app/controllers/cube.py +++ b/app/controllers/cube.py @@ -1,3 +1,6 @@ +import pymongo.results +from app.config import settings +from pymongo import MongoClient from app.controllers.subprocesses.utils import ( get_cube_url_from_pfdcm, do_cube_create_user, @@ -6,6 +9,21 @@ from app.models import cube import requests import json +from typing import List + +MONGO_DETAILS = str(settings.pflink_mongodb) +client = MongoClient(MONGO_DETAILS, username=settings.mongo_username, password=settings.mongo_password) +database = client.database +cube_collection = database.get_collection("cubes_collection") + + +# helper methods to add and retrieve + +def cube_add_helper(cube_data: cube.CubeService) -> dict: + return { + "_id": cube_data["service_name"], + "service_URL": cube_data["service_URL"], + } async def get_plugins(pfdcm: str, cube_name: str): @@ -50,3 +68,36 @@ def get_cube_client(pfdcm: str, cube_name: str): raise Exception(ex) +def add_cube_service(cube_service_data: cube.CubeService) -> dict: + """ + DB constraint: Only unique names allowed + """ + try: + cube_svc: pymongo.results.InsertOneResult = cube_collection.insert_one(cube_add_helper(cube_service_data)) + if cube_svc.acknowledged: + inserted_cube_svc: dict = cube_collection.find_one({"_id": cube_svc.inserted_id}) + return inserted_cube_svc + else: + raise Exception("Could not store new record.") + except Exception as ex: + return {"error": str(ex)} + + +def delete_cube_service(service_name: str) -> bool: + """Remove an existing cube service""" + pass + + +def update_cube_service(service_name: str, new_data: dict) -> dict: + """Update an existing cube service""" + pass + + +def retrieve_cube_service(service_name: str) -> dict: + """Retrieve an existing cube service entry from the DB""" + pass + + +def retrieve_cube_services() -> List[str]: + """Retrieve all the cube services present in the DB""" + pass diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 6dd5cc0..46b854a 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -28,7 +28,7 @@ dictConfig(log.log_config) logger = logging.getLogger('pflink-logger') -d = {'workername': 'WORKFLOW_MGR', 'key' : "",'log_color': "\33[33m"} +d = {'workername': 'WORKFLOW_MGR', 'key': "", 'log_color': "\33[33m"} def define_parameters(): @@ -40,6 +40,7 @@ def define_parameters(): parser.add_argument('--test', default=False, action='store_true') return parser + def shorten(s, width=100, placeholder='[...]'): """ Validate a given feed name for size = 100 chars @@ -47,6 +48,7 @@ def shorten(s, width=100, placeholder='[...]'): """ return s[:width] if len(s) <= width else s[:width - len(placeholder)] + placeholder + def str_to_param_dict(params: str) -> dict: """ Convert CLI arguments passed as string to a dictionary of parameters @@ -69,7 +71,6 @@ def str_to_param_dict(params: str) -> dict: class WorkflowError(Exception): def __init__(self, message, errors): - # Call the base class constructor with the parameters it needs super(WorkflowError, self).__init__(message) @@ -81,6 +82,7 @@ class WorkflowManager: """ This module manages different states of a workflow by constantly checking the status of a workflow in the DB. """ + def __init__(self, args): """ Initialize class variables @@ -99,7 +101,7 @@ def run(self): self.manage_workflow(key, self.args.test) response = self.__workflow.response logger.info(f"Workflow manager exited with status {response.status} and\ - current workflow state as {response.workflow_state}",extra=d) + current workflow state as {response.workflow_state}", extra=d) def fetch_and_load(self, db_key: str, test: bool): """ @@ -209,7 +211,7 @@ def get_feed_name(self) -> str: feed_name = shorten(feed_name) return feed_name except WorkflowError as er: - raise WorkflowError("Feed name could not be created.",er) + raise WorkflowError("Feed name could not be created.", er) def run_plugin_or_pipeline_instance(self, prev_id: str): """ @@ -223,7 +225,7 @@ def run_plugin_or_pipeline_instance(self, prev_id: str): plugin_params = str_to_param_dict(request.workflow_info.plugin_params) plugin_params["previous_id"] = prev_id plugin_id = self.get_plugin_id(request.workflow_info.plugin_name, - request.workflow_info.plugin_version) + request.workflow_info.plugin_version) self.run_plugin(plugin_id, prev_id, plugin_params) if request.workflow_info.pipeline_name: pipeline_id = self.get_pipeline_id(request.workflow_info.pipeline_name) @@ -291,7 +293,7 @@ def get_plugin_id(self, name: str, version: str = "") -> str: plugin_id: str = self.__client.getPluginId(plugin_search_params) return plugin_id except Exception as er: - raise WorkflowError("Plugin could not be found.",er) + raise WorkflowError("Plugin could not be found.", er) def get_data_path(self, request: WorkflowRequestSchema) -> str: """ @@ -305,7 +307,7 @@ def get_data_path(self, request: WorkflowRequestSchema) -> str: data_path = self.__client.getSwiftPath(pacs_search_params) return data_path except WorkflowError as er: - raise WorkflowError("Data path could not be found.",er) + raise WorkflowError("Data path could not be found.", er) def get_pipeline_id(self, name: str) -> str: """Method to search for a particular pipeline and return its ID""" @@ -314,7 +316,7 @@ def get_pipeline_id(self, name: str) -> str: pipeline_id = self.__client.getPipelineId(pipeline_search_params) return str(pipeline_id) except WorkflowError as er: - raise WorkflowError("Pipeline could not be found.",er) + raise WorkflowError("Pipeline could not be found.", er) def run_pipeline(self, pipeline_id: str, name: str, prev_id: str) -> dict: """Run a pipeline instance of a previous plugin instance ID""" @@ -324,7 +326,7 @@ def run_pipeline(self, pipeline_id: str, name: str, prev_id: str) -> dict: feed_resp: dict = self.__client.createWorkflow(pipeline_id, pipeline_params) return feed_resp except WorkflowError as er: - raise WorkflowError("Pipeline could not be run.",er) + raise WorkflowError("Pipeline could not be run.", er) def run_plugin(self, plugin_id: str, prev_id: str, plugin_params: dict) -> str: """Run a plugin instance on a previous plugin instance ID""" @@ -334,7 +336,7 @@ def run_plugin(self, plugin_id: str, prev_id: str, plugin_params: dict) -> str: resp = self.__client.createFeed(plugin_id, plugin_params) return resp except WorkflowError as err: - raise WorkflowError("Plugin could not be run.",err) + raise WorkflowError("Plugin could not be run.", err) def create_new_feed(self, feed_name: str, data_path: str, dircopy_id: str) -> (str, str): """ @@ -346,7 +348,7 @@ def create_new_feed(self, feed_name: str, data_path: str, dircopy_id: str) -> (s feed_response = self.__client.createFeed(dircopy_id, feed_params) return feed_response["feed_id"], feed_response["id"] except WorkflowError as er: - raise WorkflowError("Feed could not be created.",er) + raise WorkflowError("Feed could not be created.", er) def task_producer(self): """ @@ -437,5 +439,3 @@ def main(): Main entry point of this script """ main() - - diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index 7b7a5e2..cf86182 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -39,8 +39,7 @@ def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): collection = test_collection if test else workflow_collection index = collection.create_index([('$**', TEXT)], name='search_index', default_language='english') - workflows = [] - # query, rank, response = search.compound_queries(search_params) + workflows = collection.aggregate( [ {"$match": {"$text": {"$search": search_params.keywords}}}, diff --git a/app/models/cube.py b/app/models/cube.py index 06f3bf8..c621c12 100644 --- a/app/models/cube.py +++ b/app/models/cube.py @@ -1,4 +1,5 @@ -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, validator +from typing import List class Plugin(BaseModel): @@ -7,5 +8,24 @@ class Plugin(BaseModel): version: str = Field(...) +class CubeService(BaseModel): + """This class represents a CUBE service""" + service_name: str = Field(...) + service_URL: str = Field(...) + @validator('*') + def check_for_empty_string(cls, v): + assert v != '', "Empty strings not allowed." + return v + +class CubeServiceResponse(BaseModel): + """This class represents a CUBE service response from `pflink`""" + data: dict + message: str = "" + + +class CubeServiceCollection(BaseModel): + """This class represents the collection of CUBE services available""" + data: List[str] + message: str = ""