From 11f8eb452a3c3b93b363a0e4635028839067e47b Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Mon, 7 Aug 2023 15:01:43 -0400 Subject: [PATCH 01/19] Added date-timestamp in workflow records --- app/config.py | 2 +- app/controllers/subprocesses/utils.py | 3 +++ app/controllers/subprocesses/wf_manager.py | 12 +++++++++++- app/controllers/workflow.py | 18 ++++++++++++++++++ app/models/workflow.py | 3 +++ app/routes/testing.py | 11 ++++++++++- app/routes/workflow.py | 10 +++++++++- requirements.txt | 1 + 8 files changed, 56 insertions(+), 4 deletions(-) diff --git a/app/config.py b/app/config.py index eb9e7c0..36bbef7 100644 --- a/app/config.py +++ b/app/config.py @@ -3,7 +3,7 @@ class Settings(BaseSettings): pflink_mongodb: MongoDsn = 'mongodb://localhost:27017' - version: str = "3.6.0" + version: str = "3.6.1" class Auth(BaseSettings): diff --git a/app/controllers/subprocesses/utils.py b/app/controllers/subprocesses/utils.py index a725eef..dc7a898 100644 --- a/app/controllers/subprocesses/utils.py +++ b/app/controllers/subprocesses/utils.py @@ -1,3 +1,4 @@ +import datetime import hashlib import json import requests @@ -41,6 +42,7 @@ def workflow_retrieve_helper(workflow: dict) -> WorkflowDBSchema: return WorkflowDBSchema( key=workflow["_id"], fingerprint=workflow["fingerprint"], + creation_time=datetime.datetime.min if not workflow.get("creation_time") else workflow["creation_time"], request=request, response=workflow["response"], stale=workflow["stale"], @@ -59,6 +61,7 @@ def workflow_add_helper(workflow: WorkflowDBSchema) -> dict: return { "_id": workflow.key, "fingerprint": workflow.fingerprint, + "creation_time": workflow.creation_time, "request": d_request, "response": workflow.response.__dict__, "stale": workflow.stale, diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 0e16e12..d55f875 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -8,11 +8,12 @@ import time import requests from app.controllers.subprocesses.python_chris_client import PythonChrisClient +from app.controllers.subprocesses.subprocess_helper import get_process_count from app.models.workflow import ( Error, State, WorkflowRequestSchema, - WorkflowInfoSchema, + WorkflowDBSchema, ) from app.controllers.subprocesses.utils import ( request_to_dict, @@ -51,6 +52,7 @@ def manage_workflow(db_key: str, test: bool): return request = workflow.request + pfdcm_url = retrieve_pfdcm_url(request.pfdcm_info.pfdcm_service) cube_url = get_cube_url_from_pfdcm(pfdcm_url, request.pfdcm_info.cube_service) @@ -58,6 +60,7 @@ def manage_workflow(db_key: str, test: bool): workflow.started = True update_workflow(key, workflow) MAX_RETRIES -= 1 + logging.info(f"RETRY#{MAX_RETRIES}") match workflow.response.workflow_state: @@ -97,10 +100,16 @@ def manage_workflow(db_key: str, test: bool): workflow.response.status = False update_workflow(key, workflow) + update_status(request) time.sleep(10) workflow = retrieve_workflow(key) + # Reset workflow if pflink reached MAX no. of retries + if MAX_RETRIES==0: + workflow.started = False + update_workflow(key, workflow) + def update_status(request: WorkflowRequestSchema): """ @@ -212,6 +221,7 @@ def do_cube_create_feed(request: WorkflowRequestSchema, cube_url: str) -> dict: plugin_search_params = {"name": "pl-dircopy"} plugin_id = client.getPluginId(plugin_search_params) + logging.info(f"Creating a new feed with feed name: {feed_name}") # create a feed feed_params = {'title': feed_name, 'dir': data_path} feed_response = client.createFeed(plugin_id, feed_params) diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index aa8fbd4..12c25a8 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -11,6 +11,7 @@ UserResponseSchema, State, ) +from app.controllers.subprocesses.subprocess_helper import get_process_count from app.controllers.subprocesses import utils from app.config import settings @@ -60,6 +61,17 @@ async def delete_workflows(test: bool = False): return {"Message": f"{delete_count} record(s) deleted!"} +async def delete_workflow(workflow_key: str, test: bool = False): + """ + Delete a workflow record from DB + """ + collection = test_collection if test else workflow_collection + delete_count = 0 + for workflow in collection.find(): + collection.delete_one({"_id": workflow_key}) + delete_count += 1 + return {"Message": f"{delete_count} record(s) deleted!"} + def request_to_hash(request: WorkflowRequestSchema) -> str: """ Create a hash key using md5 hash function on a workflow request object @@ -143,6 +155,9 @@ def manage_workflow(str_data: str, mode: str): """ Manage a workflow request in a separate subprocess """ + proc_count = get_process_count("wf_manager", str_data) + logging.info(f"{proc_count} subprocess of workflow manager running on the system.") + if proc_count > 0: return d_cmd = ["python", "app/controllers/subprocesses/wf_manager.py", "--data", str_data] if mode: d_cmd.append(mode) @@ -154,6 +169,9 @@ def update_workflow_status(str_data: str, mode: str): """ Update the current status of a workflow request in a separate process """ + proc_count = get_process_count("status", str_data) + logging.info(f"{proc_count} subprocess of status manager running on the system.") + if proc_count>0: return d_cmd = ["python", "app/controllers/subprocesses/status.py", "--data", str_data] if mode: d_cmd.append(mode) diff --git a/app/models/workflow.py b/app/models/workflow.py index f6ef016..7a42ba5 100644 --- a/app/models/workflow.py +++ b/app/models/workflow.py @@ -1,3 +1,5 @@ +import datetime +import pytz from pydantic import BaseModel, Field, ValidationError, validator from enum import Enum from app.models.auth import User @@ -156,6 +158,7 @@ class WorkflowDBSchema(BaseModel): """The DB model of a workflow object""" key: str = "" fingerprint: str = "" + creation_time: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) request: WorkflowRequestSchema response: WorkflowStatusResponseSchema stale: bool = True diff --git a/app/routes/testing.py b/app/routes/testing.py index 83a788c..07452c5 100644 --- a/app/routes/testing.py +++ b/app/routes/testing.py @@ -42,7 +42,7 @@ async def test_get_workflows(): return workflows -@router.delete("", response_description="All workflows deleted") +@router.delete("/list", response_description="All workflows deleted") async def test_delete_workflows(): """ Delete all workflow records from the test database table @@ -51,3 +51,12 @@ async def test_delete_workflows(): return response +@router.delete("", response_description="Selected workflow deleted successfully") +async def test_delete_workflow(workflow_key: str): + """ + Delete a single workflow record from the test database table + """ + response = await workflow.delete_workflow(workflow_key, test=True) + return response + + diff --git a/app/routes/workflow.py b/app/routes/workflow.py index f3af2df..cb99edc 100644 --- a/app/routes/workflow.py +++ b/app/routes/workflow.py @@ -31,10 +31,18 @@ async def get_workflows(): return workflows -@router.delete("", response_description="All workflows deleted") +@router.delete("/list", response_description="All workflows deleted") async def delete_workflows(): """ Delete all workflow records from the prod database table """ response = await workflow.delete_workflows() + return response + +@router.delete("", response_description="Selected workflow deleted successfully") +async def delete_workflow(workflow_key: str): + """ + Delete a single workflow record from the prod database table + """ + response = await workflow.delete_workflow(workflow_key) return response \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 51fe0eb..8233771 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ requests python-chrisclient asyncio psutil +pytz # dev pytest==7.2.0 From b5ba7cc3b92de233f0de3abc44c90beb1cfe1c42 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Mon, 7 Aug 2023 18:52:50 -0400 Subject: [PATCH 02/19] WIP changes for paramter based searching --- app/routes/workflow.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/app/routes/workflow.py b/app/routes/workflow.py index cb99edc..0ba20c1 100644 --- a/app/routes/workflow.py +++ b/app/routes/workflow.py @@ -4,6 +4,7 @@ WorkflowRequestSchema, WorkflowStatusResponseSchema, ) +from app.controllers.subprocesses import utils router = APIRouter() @@ -23,12 +24,21 @@ async def create_workflow(data: WorkflowRequestSchema) -> WorkflowStatusResponse @router.get("/list", response_description="All workflows retrieved") -async def get_workflows(): +async def get_workflows(search_params: dict): """ Fetch all workflows currently present in the database """ - workflows = workflow.retrieve_workflows() + workflows = workflow.retrieve_workflows(search_params) return workflows + + +@router.get("", response_description="Workflow retrieved successfully") +async def get_workflows(workflow_key: str): + """ + Fetch workflow recorded by using hash of a request. + """ + workflow = utils.retrieve_workflow(workflow_key) + return workflow @router.delete("/list", response_description="All workflows deleted") @@ -45,4 +55,4 @@ async def delete_workflow(workflow_key: str): Delete a single workflow record from the prod database table """ response = await workflow.delete_workflow(workflow_key) - return response \ No newline at end of file + return response From 3f3d6c6d04f621b028ad808dedd49eadc6f800f6 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Tue, 15 Aug 2023 17:25:27 -0400 Subject: [PATCH 03/19] Added search filters --- app/controllers/workflow.py | 32 ++++++++++++++----- app/models/workflow.py | 30 ++++++++++++++++++ app/routes/testing.py | 5 +-- app/routes/workflow.py | 62 +++++++++++++++++++++++++++++++++++-- 4 files changed, 117 insertions(+), 12 deletions(-) diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index 12c25a8..229288f 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -1,3 +1,5 @@ +import datetime + from pymongo import MongoClient import json import logging @@ -10,6 +12,10 @@ Error, UserResponseSchema, State, + WorkflowSearchSchema, + PFDCMInfoSchema, + PACSqueryCore, + WorkflowInfoSchema, ) from app.controllers.subprocesses.subprocess_helper import get_process_count from app.controllers.subprocesses import utils @@ -33,12 +39,25 @@ # Retrieve all workflows present in the DB -def retrieve_workflows(test: bool = False): +def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): collection = test_collection if test else workflow_collection workflows = [] - for workflow in collection.find(): - workflows.append(utils.workflow_retrieve_helper(workflow)) - return workflows + if search_params.cube_username: + workflows = collection.find({"request.cube_user_info.username": search_params.cube_username}) + elif search_params.pipeline_name: + workflows = collection.find({"request.workflow_info.pipeline_name": search_params.pipeline_name}) + elif search_params.plugin_name: + workflows = collection.find({"request.workflow_info.plugin_name": search_params.plugin_name}) + elif search_params.plugin_version: + workflows = collection.find({"request.workflow_info.plugin_version": search_params.plugin_version}) + elif search_params.plugin_params: + workflows = collection.find({"request.workflow_info.plugin_params": search_params.plugin_params}) + elif search_params.date: + workflows = collection.find({"date": search_params.date}) + search_results = [] + for wrkflo in workflows: search_results.append(wrkflo['_id']) + + return search_results # Add new workflow in the DB @@ -67,9 +86,8 @@ async def delete_workflow(workflow_key: str, test: bool = False): """ collection = test_collection if test else workflow_collection delete_count = 0 - for workflow in collection.find(): - collection.delete_one({"_id": workflow_key}) - delete_count += 1 + collection.delete_one({"_id": workflow_key}) + delete_count += 1 return {"Message": f"{delete_count} record(s) deleted!"} def request_to_hash(request: WorkflowRequestSchema) -> str: diff --git a/app/models/workflow.py b/app/models/workflow.py index 7a42ba5..8d345a8 100644 --- a/app/models/workflow.py +++ b/app/models/workflow.py @@ -163,3 +163,33 @@ class WorkflowDBSchema(BaseModel): response: WorkflowStatusResponseSchema stale: bool = True started: bool = False + + +class WorkflowSearchSchema(BaseModel): + """A schema to search Workflow DB records""" + AccessionNumber: str = "" + PatientID: str = "" + PatientName: str = "" + PatientBirthDate: str = "" + PatientAge: str = "" + PatientSex: str = "" + StudyDate: str = "" + StudyDescription: str = "" + StudyInstanceUID: str = "" + Modality: str = "" + ModalitiesInStudy: str = "" + PerformedStationAETitle: str = "" + NumberOfSeriesRelatedInstances: str = "" + InstanceNumber: str = "" + SeriesDate: str = "" + SeriesDescription: str = "" + SeriesInstanceUID: str = "" + ProtocolName: str = "" + AcquisitionProtocolDescription: str = "" + AcquisitionProtocolName: str = "" + plugin_name: str = "" + plugin_version: str = "" + plugin_params: str = "" + pipeline_name: str = "" + cube_username: str = "" + date: str = "" diff --git a/app/routes/testing.py b/app/routes/testing.py index 07452c5..8135ba0 100644 --- a/app/routes/testing.py +++ b/app/routes/testing.py @@ -2,6 +2,7 @@ from app.models.workflow import ( WorkflowRequestSchema, WorkflowStatusResponseSchema, + WorkflowSearchSchema, ) from app.controllers import workflow router = APIRouter() @@ -34,11 +35,11 @@ async def test_create_workflow( @router.get("/list", response_description="All workflows retrieved") -async def test_get_workflows(): +async def test_get_workflows(search_params: WorkflowSearchSchema): """ Fetch all workflows currently present in the database """ - workflows = workflow.retrieve_workflows(test=True) + workflows = workflow.retrieve_workflows(search_params, test=True) return workflows diff --git a/app/routes/workflow.py b/app/routes/workflow.py index cb99edc..8d4f679 100644 --- a/app/routes/workflow.py +++ b/app/routes/workflow.py @@ -3,6 +3,7 @@ from app.models.workflow import ( WorkflowRequestSchema, WorkflowStatusResponseSchema, + WorkflowSearchSchema, ) router = APIRouter() @@ -22,12 +23,67 @@ async def create_workflow(data: WorkflowRequestSchema) -> WorkflowStatusResponse return response -@router.get("/list", response_description="All workflows retrieved") -async def get_workflows(): +@router.get("/search", response_description="All workflows retrieved") +async def get_workflows( + AccessionNumber: str = "", + PatientID: str = "", + PatientName: str = "", + PatientBirthDate: str = "", + PatientAge: str = "", + PatientSex: str = "", + StudyDate: str = "", + StudyDescription: str = "", + StudyInstanceUID: str = "", + Modality: str = "", + ModalitiesInStudy: str = "", + PerformedStationAETitle: str = "", + NumberOfSeriesRelatedInstances: str = "", + InstanceNumber: str = "", + SeriesDate: str = "", + SeriesDescription: str = "", + SeriesInstanceUID: str = "", + ProtocolName: str = "", + AcquisitionProtocolDescription: str = "", + AcquisitionProtocolName: str = "", + plugin_name: str = "", + plugin_version: str = "", + plugin_params: str = "", + pipeline_name: str = "", + cube_username: str = "", + date: str = "", +): """ Fetch all workflows currently present in the database """ - workflows = workflow.retrieve_workflows() + search_params = WorkflowSearchSchema( + AccessionNumber = AccessionNumber, + PatientID = PatientID, + PatientName = PatientName, + PatientBirthDate = PatientBirthDate, + PatientAge = PatientAge, + PatientSex = PatientSex, + StudyDate = StudyDate, + StudyDescription = StudyDescription, + StudyInstanceUID = StudyInstanceUID, + Modality = Modality, + ModalitiesInStudy = ModalitiesInStudy, + PerformedStationAETitle = PerformedStationAETitle, + NumberOfSeriesRelatedInstances = NumberOfSeriesRelatedInstances, + InstanceNumber = InstanceNumber, + SeriesDate = SeriesDate, + SeriesDescription = SeriesDescription, + SeriesInstanceUID = SeriesInstanceUID, + ProtocolName = ProtocolName, + AcquisitionProtocolDescription = AcquisitionProtocolDescription, + AcquisitionProtocolName = AcquisitionProtocolName, + plugin_name = plugin_name, + plugin_version = plugin_version, + plugin_params = plugin_params, + pipeline_name = pipeline_name, + cube_username = cube_username, + date = date, + ) + workflows = workflow.retrieve_workflows(search_params) return workflows From 0545f15bf1df1accf30a23eaa90602ef69c343d6 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Wed, 16 Aug 2023 17:50:48 -0400 Subject: [PATCH 04/19] Added search queries --- app/controllers/search.py | 27 +++++++++++++++++++++++++++ app/controllers/workflow.py | 17 +++++++++++++++-- app/routes/workflow.py | 11 ++++++----- docker-compose.yml | 2 +- requirements.txt | 2 +- 5 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 app/controllers/search.py diff --git a/app/controllers/search.py b/app/controllers/search.py new file mode 100644 index 0000000..585caad --- /dev/null +++ b/app/controllers/search.py @@ -0,0 +1,27 @@ +from app.models.workflow import WorkflowSearchSchema + +def compound_queries(query_params: WorkflowSearchSchema): + query = { "$match": { "$text": + {"$search": query_params.cube_username} + + } + } + response = { + "$project": { + "_id": 1, + "request.cube_user_info.username": 1 + } + } + return query, response + +def index_search(query_params: dict): + query = { + "$match": { + "index": "some_index", + "text": { + "query": "chris", + "path": "request.cube_user_info.username" + } + } + } + return query \ No newline at end of file diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index 229288f..0dbb5cf 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -1,6 +1,6 @@ import datetime -from pymongo import MongoClient +from pymongo import MongoClient, TEXT import json import logging import subprocess @@ -17,6 +17,7 @@ PACSqueryCore, WorkflowInfoSchema, ) +from app.controllers import search from app.controllers.subprocesses.subprocess_helper import get_process_count from app.controllers.subprocesses import utils from app.config import settings @@ -39,7 +40,7 @@ # Retrieve all workflows present in the DB -def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): +def retrieve_workflows_by(search_params: WorkflowSearchSchema, test: bool = False): collection = test_collection if test else workflow_collection workflows = [] if search_params.cube_username: @@ -59,6 +60,18 @@ def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): return search_results +def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): + collection = test_collection if test else workflow_collection + index = collection.create_index([('request.cube_user_info.username',TEXT)], name='search_index', default_language='english') + collection.create_index("some_index") + workflows = [] + query, response = search.compound_queries(search_params) + workflows = collection.aggregate([query, response]) + search_results = [] + for wrkflo in workflows: search_results.append(str(wrkflo)) + + return search_results + # Add new workflow in the DB def add_workflow(workflow_data: WorkflowDBSchema, test: bool = False) -> WorkflowDBSchema: diff --git a/app/routes/workflow.py b/app/routes/workflow.py index 2d39bab..bd2cf5c 100644 --- a/app/routes/workflow.py +++ b/app/routes/workflow.py @@ -89,7 +89,7 @@ async def get_workflows( @router.get("", response_description="Workflow retrieved successfully") -async def get_workflows(workflow_key: str): +async def get_workflow(workflow_key: str): """ Fetch workflow recorded by using hash of a request. """ @@ -97,13 +97,14 @@ async def get_workflows(workflow_key: str): return workflow -@router.delete("/list", response_description="All workflows deleted") -async def delete_workflows(): + +# @router.delete("/list", response_description="All workflows deleted") +# async def delete_workflows(): """ Delete all workflow records from the prod database table """ - response = await workflow.delete_workflows() - return response +# response = await workflow.delete_workflows() +# return response @router.delete("", response_description="Selected workflow deleted successfully") async def delete_workflow(workflow_key: str): diff --git a/docker-compose.yml b/docker-compose.yml index 8f74cf8..8805053 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,7 @@ services: pflink: pflink-db: - image: mongo + image: mongo:4.4.6 environment: - PUID=1000 - PGID=1000 diff --git a/requirements.txt b/requirements.txt index 8233771..8932941 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ -motor==2.5.1 pydantic==1.10.11 requests python-chrisclient asyncio psutil pytz +pymongo # dev pytest==7.2.0 From 5eaaeb44c643ba83475974a169a1eadbdbe94ed1 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Fri, 18 Aug 2023 17:16:20 -0400 Subject: [PATCH 05/19] added keyword searh and ranks --- app/controllers/search.py | 20 +++++++++----- app/controllers/workflow.py | 17 ++++++++---- app/models/workflow.py | 26 +----------------- app/routes/workflow.py | 54 +++---------------------------------- 4 files changed, 29 insertions(+), 88 deletions(-) diff --git a/app/controllers/search.py b/app/controllers/search.py index 585caad..8904162 100644 --- a/app/controllers/search.py +++ b/app/controllers/search.py @@ -1,18 +1,24 @@ from app.models.workflow import WorkflowSearchSchema def compound_queries(query_params: WorkflowSearchSchema): - query = { "$match": { "$text": - {"$search": query_params.cube_username} + query = { "$match": { "$text": + {"$search": query_params.keywords} + + + }} + + rank = { + "$sort": {"score": {"$meta": "textScore"}} + } - } - } response = { "$project": { - "_id": 1, - "request.cube_user_info.username": 1 + "_id": 1 } } - return query, response + + + return query, rank, response def index_search(query_params: dict): query = { diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index 0dbb5cf..cdc2bfb 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -62,13 +62,20 @@ def retrieve_workflows_by(search_params: WorkflowSearchSchema, test: bool = Fals def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): collection = test_collection if test else workflow_collection - index = collection.create_index([('request.cube_user_info.username',TEXT)], name='search_index', default_language='english') - collection.create_index("some_index") + index = collection.create_index([('$**',TEXT)], + name='search_index', default_language='english') workflows = [] - query, response = search.compound_queries(search_params) - workflows = collection.aggregate([query, response]) + query, rank, response = search.compound_queries(search_params) + workflows = collection.aggregate( + [ + { "$match": {"$text": { "$search": search_params.keywords } } }, + { "$project": { "_id": 1 , "score": { "$meta": "textScore" }} }, + {"$sort": {"score": -1}}, + ] +) search_results = [] - for wrkflo in workflows: search_results.append(str(wrkflo)) + for wrkflo in workflows: + search_results.append(str(wrkflo)) return search_results diff --git a/app/models/workflow.py b/app/models/workflow.py index 8d345a8..8f06846 100644 --- a/app/models/workflow.py +++ b/app/models/workflow.py @@ -167,29 +167,5 @@ class WorkflowDBSchema(BaseModel): class WorkflowSearchSchema(BaseModel): """A schema to search Workflow DB records""" - AccessionNumber: str = "" - PatientID: str = "" - PatientName: str = "" - PatientBirthDate: str = "" - PatientAge: str = "" - PatientSex: str = "" - StudyDate: str = "" - StudyDescription: str = "" - StudyInstanceUID: str = "" - Modality: str = "" - ModalitiesInStudy: str = "" - PerformedStationAETitle: str = "" - NumberOfSeriesRelatedInstances: str = "" - InstanceNumber: str = "" - SeriesDate: str = "" - SeriesDescription: str = "" - SeriesInstanceUID: str = "" - ProtocolName: str = "" - AcquisitionProtocolDescription: str = "" - AcquisitionProtocolName: str = "" - plugin_name: str = "" - plugin_version: str = "" - plugin_params: str = "" - pipeline_name: str = "" - cube_username: str = "" + keywords: str = "" date: str = "" diff --git a/app/routes/workflow.py b/app/routes/workflow.py index bd2cf5c..64e15f4 100644 --- a/app/routes/workflow.py +++ b/app/routes/workflow.py @@ -26,62 +26,14 @@ async def create_workflow(data: WorkflowRequestSchema) -> WorkflowStatusResponse @router.get("/search", response_description="All workflows retrieved") async def get_workflows( - AccessionNumber: str = "", - PatientID: str = "", - PatientName: str = "", - PatientBirthDate: str = "", - PatientAge: str = "", - PatientSex: str = "", - StudyDate: str = "", - StudyDescription: str = "", - StudyInstanceUID: str = "", - Modality: str = "", - ModalitiesInStudy: str = "", - PerformedStationAETitle: str = "", - NumberOfSeriesRelatedInstances: str = "", - InstanceNumber: str = "", - SeriesDate: str = "", - SeriesDescription: str = "", - SeriesInstanceUID: str = "", - ProtocolName: str = "", - AcquisitionProtocolDescription: str = "", - AcquisitionProtocolName: str = "", - plugin_name: str = "", - plugin_version: str = "", - plugin_params: str = "", - pipeline_name: str = "", - cube_username: str = "", + keywords: str = "", date: str = "", ): """ - Fetch all workflows currently present in the database + Fetch all workflows currently present in the database matching the search criteria """ search_params = WorkflowSearchSchema( - AccessionNumber = AccessionNumber, - PatientID = PatientID, - PatientName = PatientName, - PatientBirthDate = PatientBirthDate, - PatientAge = PatientAge, - PatientSex = PatientSex, - StudyDate = StudyDate, - StudyDescription = StudyDescription, - StudyInstanceUID = StudyInstanceUID, - Modality = Modality, - ModalitiesInStudy = ModalitiesInStudy, - PerformedStationAETitle = PerformedStationAETitle, - NumberOfSeriesRelatedInstances = NumberOfSeriesRelatedInstances, - InstanceNumber = InstanceNumber, - SeriesDate = SeriesDate, - SeriesDescription = SeriesDescription, - SeriesInstanceUID = SeriesInstanceUID, - ProtocolName = ProtocolName, - AcquisitionProtocolDescription = AcquisitionProtocolDescription, - AcquisitionProtocolName = AcquisitionProtocolName, - plugin_name = plugin_name, - plugin_version = plugin_version, - plugin_params = plugin_params, - pipeline_name = pipeline_name, - cube_username = cube_username, + keywords = keywords, date = date, ) workflows = workflow.retrieve_workflows(search_params) From fd49eb79226fb1f04e94328878a67b608856bd49 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Mon, 21 Aug 2023 18:07:16 -0400 Subject: [PATCH 06/19] Added service retry flag to handle connection errors --- app/config.py | 2 +- app/controllers/subprocesses/wf_manager.py | 9 +++++++++ app/models/workflow.py | 1 + 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/app/config.py b/app/config.py index 36bbef7..c638933 100644 --- a/app/config.py +++ b/app/config.py @@ -3,7 +3,7 @@ class Settings(BaseSettings): pflink_mongodb: MongoDsn = 'mongodb://localhost:27017' - version: str = "3.6.1" + version: str = "3.7.0" class Auth(BaseSettings): diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index d55f875..1766fb0 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -105,6 +105,15 @@ def manage_workflow(db_key: str, test: bool): time.sleep(10) workflow = retrieve_workflow(key) + # Reset workflow status if max service_retry is not reached + if workflow.service_retry > 0 and not workflow.response.status: + logging.info(f"ERROR: {workflow.response.error} . {workflow.service_retry} retries left.") + workflow.service_retry -= 1 + workflow.response.error = "" + workflow.response.status = True + update_workflow(key, workflow) + + # Reset workflow if pflink reached MAX no. of retries if MAX_RETRIES==0: workflow.started = False diff --git a/app/models/workflow.py b/app/models/workflow.py index 8f06846..e920326 100644 --- a/app/models/workflow.py +++ b/app/models/workflow.py @@ -161,6 +161,7 @@ class WorkflowDBSchema(BaseModel): creation_time: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) request: WorkflowRequestSchema response: WorkflowStatusResponseSchema + service_retry: int = 5 stale: bool = True started: bool = False From 19c144c3483423fb0dff3327393ff68414997459 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Wed, 23 Aug 2023 08:49:45 -0400 Subject: [PATCH 07/19] log formatting and error logging --- app/controllers/subprocesses/status.py | 18 ++++----- app/controllers/subprocesses/wf_manager.py | 29 +++++++------- app/controllers/workflow.py | 15 +++----- app/log_config.py | 23 +++++++++++ app/main.py | 8 +++- app/models/log.py | 44 ++++++++++++++++++++++ 6 files changed, 103 insertions(+), 34 deletions(-) create mode 100644 app/log_config.py create mode 100644 app/models/log.py diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index 6909a9f..d6e5293 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -6,7 +6,9 @@ import logging import random import requests - +from logging.config import dictConfig +from app.models.log import LogConfig +from app.log_config import log_config from app.models.workflow import ( State, WorkflowRequestSchema, @@ -26,12 +28,8 @@ ) from app.controllers.subprocesses.subprocess_helper import get_process_count -log_format = "%(asctime)s: %(message)s" -logging.basicConfig( - format=log_format, - level=logging.INFO, - datefmt="%H:%M:%S" -) +dictConfig(log_config) +logger = logging.getLogger('pflink-logger') parser = argparse.ArgumentParser(description='Process arguments passed through CLI') parser.add_argument('--data', type=str) @@ -50,7 +48,7 @@ def update_workflow_status(key: str, test: bool): if is_status_subprocess_running(workflow): return - logging.info(f"WORKING on updating the status for {key}, locking--") + logger.info(f"WORKING on updating the status for {key}, locking DB flag") update_status_flag(key, workflow, False, test) if test: @@ -60,7 +58,7 @@ def update_workflow_status(key: str, test: bool): workflow.response = update_workflow_progress(updated_status) update_status_flag(key, workflow, True, test) - logging.info(f"UPDATED status for {key}, releasing lock") + logger.info(f"UPDATED status for {key}, releasing lock") def update_workflow_progress(response: WorkflowStatusResponseSchema): @@ -175,6 +173,7 @@ def _get_pfdcm_status(request: WorkflowRequestSchema): d_response["service_name"] = request.pfdcm_info.pfdcm_service return d_response except Exception as ex: + logger.error(f"{Error.pfdcm.value} {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}") return {"error": Error.pfdcm.value + f" {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}"} @@ -208,6 +207,7 @@ def _get_feed_status(request: WorkflowRequestSchema, feed_id: str) -> dict: resp["errored_plugins"] = str(l_error) return resp except Exception as ex: + logger.error(f"{Error.cube.value} {str(ex)}") return {"error": Error.cube.value + str(ex)} diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 1766fb0..285c710 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -9,6 +9,9 @@ import requests from app.controllers.subprocesses.python_chris_client import PythonChrisClient from app.controllers.subprocesses.subprocess_helper import get_process_count +from logging.config import dictConfig +from app.models.log import LogConfig +from app.log_config import log_config from app.models.workflow import ( Error, State, @@ -25,13 +28,8 @@ do_cube_create_user, retrieve_pfdcm_url, ) - -log_format = "%(asctime)s: %(message)s" -logging.basicConfig( - format=log_format, - level=logging.INFO, - datefmt="%H:%M:%S" -) +dictConfig(log_config) +logger = logging.getLogger('pflink-logger') parser = argparse.ArgumentParser(description='Process arguments') parser.add_argument('--data', type=str) @@ -60,20 +58,23 @@ def manage_workflow(db_key: str, test: bool): workflow.started = True update_workflow(key, workflow) MAX_RETRIES -= 1 - logging.info(f"RETRY#{MAX_RETRIES}") + logger.info(f"RETRY#{MAX_RETRIES}") match workflow.response.workflow_state: case State.INITIALIZING: if workflow.stale: + logger.info("Requesting PACS retrieve.") do_pfdcm_retrieve(request, pfdcm_url) case State.RETRIEVING: if workflow.response.state_progress == "100%" and workflow.stale: + logger.info("Requesting PACS push.") do_pfdcm_push(request, pfdcm_url) case State.PUSHING: if workflow.response.state_progress == "100%" and workflow.stale: + logger.info("Requesting PACS register.") do_pfdcm_register(request, pfdcm_url) case State.REGISTERING: @@ -85,7 +86,7 @@ def manage_workflow(db_key: str, test: bool): workflow.response.feed_id = feed_id update_workflow(key, workflow) except Exception as ex: - logging.info(Error.feed.value) + logger.error(Error.feed.value) workflow.response.error = Error.feed.value + str(ex) workflow.response.status = False update_workflow(key, workflow) @@ -95,7 +96,7 @@ def manage_workflow(db_key: str, test: bool): try: do_cube_start_analysis(pl_inst_id, request, cube_url) except Exception as ex: - logging.info(Error.analysis.value + str(ex)) + logger.error(Error.analysis.value + str(ex)) workflow.response.error = Error.analysis.value + str(ex) workflow.response.status = False update_workflow(key, workflow) @@ -107,7 +108,7 @@ def manage_workflow(db_key: str, test: bool): # Reset workflow status if max service_retry is not reached if workflow.service_retry > 0 and not workflow.response.status: - logging.info(f"ERROR: {workflow.response.error} . {workflow.service_retry} retries left.") + logger.error(f"{workflow.response.error} . {workflow.service_retry} retries left.") workflow.service_retry -= 1 workflow.response.error = "" workflow.response.status = True @@ -180,7 +181,7 @@ def pfdcm_do(verb: str, then_args: dict, request: WorkflowRequestSchema, url: st response = requests.post(pfdcm_dicom_api, json=body, headers=headers) et = time.time() elapsed_time = et - st - logging.info(f'Execution time to {verb}:{elapsed_time} seconds') + logger.debug(f'Execution time to {verb}:{elapsed_time} seconds') def do_pfdcm_retrieve(dicom: WorkflowRequestSchema, pfdcm_url: str): @@ -230,7 +231,7 @@ def do_cube_create_feed(request: WorkflowRequestSchema, cube_url: str) -> dict: plugin_search_params = {"name": "pl-dircopy"} plugin_id = client.getPluginId(plugin_search_params) - logging.info(f"Creating a new feed with feed name: {feed_name}") + logger.info(f"Creating a new feed with feed name: {feed_name}") # create a feed feed_params = {'title': feed_name, 'dir': data_path} feed_response = client.createFeed(plugin_id, feed_params) @@ -259,7 +260,7 @@ def __run_plugin_instance(previous_id: str, request: WorkflowRequestSchema, clie # convert CLI params from string to a JSON dictionary feed_params = str_to_param_dict(request.workflow_info.plugin_params) feed_params["previous_id"] = previous_id - logging.info(f"Creating new analysis with parameters: {feed_params}") + logger.debug(f"Creating new analysis with parameters: {feed_params}") feed_resp = client.createFeed(plugin_id, feed_params) diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index cdc2bfb..da1fe3f 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -28,13 +28,7 @@ workflow_collection = database.get_collection("workflows_collection") test_collection = database.get_collection("tests_collection") -log_format = "%(asctime)s: %(message)s" -logging.basicConfig( - format=log_format, - level=logging.INFO, - datefmt="%H:%M:%S" -) - +logger = logging.getLogger('pflink-logger') # DB methods @@ -69,7 +63,7 @@ def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): workflows = collection.aggregate( [ { "$match": {"$text": { "$search": search_params.keywords } } }, - { "$project": { "_id": 1 , "score": { "$meta": "textScore" }} }, + { "$project": {"_id": 1 , "score": { "$meta": "textScore" }} }, {"$sort": {"score": -1}}, ] ) @@ -110,6 +104,7 @@ async def delete_workflow(workflow_key: str, test: bool = False): delete_count += 1 return {"Message": f"{delete_count} record(s) deleted!"} + def request_to_hash(request: WorkflowRequestSchema) -> str: """ Create a hash key using md5 hash function on a workflow request object @@ -194,7 +189,7 @@ def manage_workflow(str_data: str, mode: str): Manage a workflow request in a separate subprocess """ proc_count = get_process_count("wf_manager", str_data) - logging.info(f"{proc_count} subprocess of workflow manager running on the system.") + logger.debug(f"{proc_count} subprocess of workflow manager running on the system.") if proc_count > 0: return d_cmd = ["python", "app/controllers/subprocesses/wf_manager.py", "--data", str_data] if mode: @@ -208,7 +203,7 @@ def update_workflow_status(str_data: str, mode: str): Update the current status of a workflow request in a separate process """ proc_count = get_process_count("status", str_data) - logging.info(f"{proc_count} subprocess of status manager running on the system.") + logger.debug(f"{proc_count} subprocess of status manager running on the system.") if proc_count>0: return d_cmd = ["python", "app/controllers/subprocesses/status.py", "--data", str_data] if mode: diff --git a/app/log_config.py b/app/log_config.py new file mode 100644 index 0000000..cd0d4da --- /dev/null +++ b/app/log_config.py @@ -0,0 +1,23 @@ + +log_config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "default": { + "()": "uvicorn.logging.DefaultFormatter", + "fmt": "%(levelprefix)s %(asctime)s %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + + }, + }, + "handlers": { + "default": { + "formatter": "default", + "class": "logging.StreamHandler", + "stream": "ext://sys.stderr", + }, + }, + "loggers": { + "pflink-logger": {"handlers": ["default"], "level": "DEBUG"}, + }, +} \ No newline at end of file diff --git a/app/main.py b/app/main.py index 71680c0..c152eef 100644 --- a/app/main.py +++ b/app/main.py @@ -8,6 +8,9 @@ from app.routes.cube import router as CubeRouter from app.config import settings from app.controllers import auth +from logging.config import dictConfig +from app.models.log import LogConfig +from app.log_config import log_config description = """ `pflink` is an application to interact with `CUBE` and `pfdcm` 🚀 @@ -62,7 +65,10 @@ "the records from DB" } ] - + +#dictConfig(LogConfig().dict()) +dictConfig(log_config) + app = FastAPI( title='pflink', version=settings.version, diff --git a/app/models/log.py b/app/models/log.py new file mode 100644 index 0000000..bdafcd9 --- /dev/null +++ b/app/models/log.py @@ -0,0 +1,44 @@ +from pydantic import BaseSettings +import logging + + +class LogConfig(BaseSettings): + """Logging configuration to be set for the server""" + grey = "\x1b[38;20m" + yellow = "\x1b[33;20m" + red = "\x1b[31;20m" + bold_red = "\x1b[31;1m" + reset = "\x1b[0m" + format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s %(pathname)s.%(funcName)s:%(lineno)d)" + + FORMATS = { + logging.DEBUG: grey + format + reset, + logging.INFO: grey + format + reset, + logging.WARNING: yellow + format + reset, + logging.ERROR: red + format + reset, + logging.CRITICAL: bold_red + format + reset + } + LOGGER_NAME: str = "pflink" + # LOG_FORMAT: str = "%(levelprefix)s | %(asctime)s | %(message)s" + LOG_LEVEL: str = "DEBUG" + + # Logging config + version = 1 + disable_existing_loggers = False + formatters = { + "default": { + "()": "uvicorn.logging.DefaultFormatter", + "fmt": format, + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + } + handlers = { + "default": { + "formatter": "default", + "class": "logging.StreamHandler", + "stream": "ext://sys.stderr", + }, + } + loggers = { + LOGGER_NAME: {"handlers": ["default"], "level": LOG_LEVEL}, + } From 2bb8552309ef495e8ca3d23c1d1bd4f4f3f44343 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Wed, 23 Aug 2023 19:38:56 -0400 Subject: [PATCH 08/19] Added key and worker name to logs --- app/controllers/subprocesses/status.py | 22 ++++++++-- app/controllers/subprocesses/utils.py | 2 + app/controllers/subprocesses/wf_manager.py | 47 +++++++++++++++------- app/controllers/workflow.py | 21 ++++++++-- app/log_config.py | 3 +- 5 files changed, 70 insertions(+), 25 deletions(-) diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index d6e5293..327284d 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -6,6 +6,8 @@ import logging import random import requests +import time +import pprint from logging.config import dictConfig from app.models.log import LogConfig from app.log_config import log_config @@ -30,6 +32,8 @@ from app.controllers.subprocesses.subprocess_helper import get_process_count dictConfig(log_config) logger = logging.getLogger('pflink-logger') +d = {'workername': 'STATUS_MGR', 'log_color': "\33[%dm", 'key': ""} + parser = argparse.ArgumentParser(description='Process arguments passed through CLI') parser.add_argument('--data', type=str) @@ -48,7 +52,7 @@ def update_workflow_status(key: str, test: bool): if is_status_subprocess_running(workflow): return - logger.info(f"WORKING on updating the status for {key}, locking DB flag") + logger.info(f"UPDATING the status for {key}, locking DB flag", extra=d) update_status_flag(key, workflow, False, test) if test: @@ -58,7 +62,7 @@ def update_workflow_status(key: str, test: bool): workflow.response = update_workflow_progress(updated_status) update_status_flag(key, workflow, True, test) - logger.info(f"UPDATED status for {key}, releasing lock") + logger.info(f"UPDATED status for {key}, releasing lock", extra=d) def update_workflow_progress(response: WorkflowStatusResponseSchema): @@ -168,12 +172,19 @@ def _get_pfdcm_status(request: WorkflowRequestSchema): "json_response": True } } + pretty_json = pprint.pformat(pfdcm_body) + logger.debug(f"POSTing the below request at {pfdcm_status_url} to get status: {pretty_json}", extra=d) + st = time.time() response = requests.post(pfdcm_status_url, json=pfdcm_body, headers=headers) + et = time.time() + elapsed_time = et - st + logger.debug(f'Execution time to get status:{elapsed_time} seconds', extra=d) d_response = json.loads(response.text) + #logger.debug(f"Response from pfdcm: {d_response}") d_response["service_name"] = request.pfdcm_info.pfdcm_service return d_response except Exception as ex: - logger.error(f"{Error.pfdcm.value} {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}") + logger.error(f"{Error.pfdcm.value} {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}", extra=d) return {"error": Error.pfdcm.value + f" {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}"} @@ -200,14 +211,16 @@ def _get_feed_status(request: WorkflowRequestSchema, feed_id: str) -> dict: feed_name = substitute_dicom_tags(requested_feed_name, pacs_details) # search for feed + logger.debug(f"Request CUBE at {cube_url} for feed id: {feed_id} and feed name: {feed_name}", extra=d) resp = cl.getFeed({"id": feed_id, "name_exact": feed_name}) + logger.debug(f"Response from CUBE : {resp}", extra=d) if resp["errored_jobs"] or resp["cancelled_jobs"]: l_inst_resp = cl.getPluginInstances({"feed_id": feed_id}) l_error = [d_instance['plugin_name'] for d_instance in l_inst_resp['data'] if d_instance['status']=='finishedWithError' or d_instance['status'] == 'cancelled'] resp["errored_plugins"] = str(l_error) return resp except Exception as ex: - logger.error(f"{Error.cube.value} {str(ex)}") + logger.error(f"{Error.cube.value} {str(ex)}", extra=d) return {"error": Error.cube.value + str(ex)} @@ -397,4 +410,5 @@ def __get_progress_from_text(progress: str): """ dict_data = json.loads(args.data) wf_key = dict_to_hash(dict_data) + d['key'] = wf_key update_workflow_status(wf_key, args.test) diff --git a/app/controllers/subprocesses/utils.py b/app/controllers/subprocesses/utils.py index dc7a898..4afee1e 100644 --- a/app/controllers/subprocesses/utils.py +++ b/app/controllers/subprocesses/utils.py @@ -45,6 +45,7 @@ def workflow_retrieve_helper(workflow: dict) -> WorkflowDBSchema: creation_time=datetime.datetime.min if not workflow.get("creation_time") else workflow["creation_time"], request=request, response=workflow["response"], + service_retry=workflow["service_retry"], stale=workflow["stale"], started=workflow["started"], ) @@ -64,6 +65,7 @@ def workflow_add_helper(workflow: WorkflowDBSchema) -> dict: "creation_time": workflow.creation_time, "request": d_request, "response": workflow.response.__dict__, + "service_retry": workflow.service_retry, "stale": workflow.stale, "started": workflow.started, } diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 285c710..564ca8b 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -7,6 +7,7 @@ import subprocess import time import requests +import pprint from app.controllers.subprocesses.python_chris_client import PythonChrisClient from app.controllers.subprocesses.subprocess_helper import get_process_count from logging.config import dictConfig @@ -30,6 +31,7 @@ ) dictConfig(log_config) logger = logging.getLogger('pflink-logger') +d = {'workername': 'WORKFLOW_MGR', 'key' : ""} parser = argparse.ArgumentParser(description='Process arguments') parser.add_argument('--data', type=str) @@ -47,6 +49,10 @@ def manage_workflow(db_key: str, test: bool): workflow = retrieve_workflow(db_key) if workflow.started or not workflow.response.status or test: # Do nothing and return + reason = f"Workflow request failed. Error: {workflow.response.error}" if not workflow.response.status \ + else f"Workflow already started. The current status is: {workflow.response.workflow_state}" + logger.warning(f"Cannot restart this workflow request. : {reason}" + f". Kindly delete this request to restart using the delete API end point: {key}", extra=d) return request = workflow.request @@ -55,26 +61,27 @@ def manage_workflow(db_key: str, test: bool): cube_url = get_cube_url_from_pfdcm(pfdcm_url, request.pfdcm_info.cube_service) while not workflow.response.workflow_state == State.ANALYZING and MAX_RETRIES > 0 and workflow.response.status: + logger.debug(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", extra=d) workflow.started = True update_workflow(key, workflow) MAX_RETRIES -= 1 - logger.info(f"RETRY#{MAX_RETRIES}") + logger.debug(f"{MAX_RETRIES} iterations left.", extra=d) match workflow.response.workflow_state: case State.INITIALIZING: if workflow.stale: - logger.info("Requesting PACS retrieve.") + logger.info("Requesting PACS retrieve.", extra=d) do_pfdcm_retrieve(request, pfdcm_url) case State.RETRIEVING: if workflow.response.state_progress == "100%" and workflow.stale: - logger.info("Requesting PACS push.") + logger.info("Requesting PACS push.", extra=d) do_pfdcm_push(request, pfdcm_url) case State.PUSHING: if workflow.response.state_progress == "100%" and workflow.stale: - logger.info("Requesting PACS register.") + logger.info("Requesting PACS register.", extra=d) do_pfdcm_register(request, pfdcm_url) case State.REGISTERING: @@ -86,7 +93,7 @@ def manage_workflow(db_key: str, test: bool): workflow.response.feed_id = feed_id update_workflow(key, workflow) except Exception as ex: - logger.error(Error.feed.value) + logger.error(Error.feed.value, extra=d) workflow.response.error = Error.feed.value + str(ex) workflow.response.status = False update_workflow(key, workflow) @@ -96,19 +103,19 @@ def manage_workflow(db_key: str, test: bool): try: do_cube_start_analysis(pl_inst_id, request, cube_url) except Exception as ex: - logger.error(Error.analysis.value + str(ex)) + logger.error(Error.analysis.value + str(ex), extra=d) workflow.response.error = Error.analysis.value + str(ex) workflow.response.status = False update_workflow(key, workflow) - + logger.info(f"Calling status update subprocess for request# {key}", extra=d) update_status(request) time.sleep(10) workflow = retrieve_workflow(key) # Reset workflow status if max service_retry is not reached if workflow.service_retry > 0 and not workflow.response.status: - logger.error(f"{workflow.response.error} . {workflow.service_retry} retries left.") + logger.warning(f"Retrying request.{workflow.service_retry}/5 retries left.", extra=d) workflow.service_retry -= 1 workflow.response.error = "" workflow.response.status = True @@ -117,8 +124,10 @@ def manage_workflow(db_key: str, test: bool): # Reset workflow if pflink reached MAX no. of retries if MAX_RETRIES==0: + logger.debug(f"Maximum retry limit reached. Resetting request flag to NOT STARTED.", extra=d) workflow.started = False update_workflow(key, workflow) + logger.info("Exiting manager subprocess", extra=d) def update_status(request: WorkflowRequestSchema): @@ -128,10 +137,15 @@ def update_status(request: WorkflowRequestSchema): """ d_data = request_to_dict(request) str_data = json.dumps(d_data) - process = subprocess.Popen( - ['python', - 'app/controllers/subprocesses/status.py', - "--data", str_data]) + proc_count = get_process_count("status", str_data) + logger.debug(f"{proc_count} subprocess of status manager running on the system.", extra=d) + if proc_count > 0: + logger.info(f"No new status subprocess started.", extra=d) + return + d_cmd = ["python", "app/controllers/subprocesses/status.py", "--data", str_data] + pretty_cmd = pprint.pformat(d_cmd) + logger.debug(f"New status subprocess started with command: {pretty_cmd}", extra=d) + process = subprocess.Popen(d_cmd) def pfdcm_do(verb: str, then_args: dict, request: WorkflowRequestSchema, url: str): @@ -177,11 +191,13 @@ def pfdcm_do(verb: str, then_args: dict, request: WorkflowRequestSchema, url: st "json_response": False } } + pretty_json = pprint.pformat(body) + logger.debug(f"POSTing the below request at {pfdcm_dicom_api} to {verb}: {pretty_json}", extra=d) st = time.time() response = requests.post(pfdcm_dicom_api, json=body, headers=headers) et = time.time() elapsed_time = et - st - logger.debug(f'Execution time to {verb}:{elapsed_time} seconds') + logger.debug(f'Execution time to {verb}:{elapsed_time} seconds', extra=d) def do_pfdcm_retrieve(dicom: WorkflowRequestSchema, pfdcm_url: str): @@ -231,7 +247,7 @@ def do_cube_create_feed(request: WorkflowRequestSchema, cube_url: str) -> dict: plugin_search_params = {"name": "pl-dircopy"} plugin_id = client.getPluginId(plugin_search_params) - logger.info(f"Creating a new feed with feed name: {feed_name}") + logger.info(f"Creating a new feed with feed name: {feed_name}", extra=d) # create a feed feed_params = {'title': feed_name, 'dir': data_path} feed_response = client.createFeed(plugin_id, feed_params) @@ -260,7 +276,7 @@ def __run_plugin_instance(previous_id: str, request: WorkflowRequestSchema, clie # convert CLI params from string to a JSON dictionary feed_params = str_to_param_dict(request.workflow_info.plugin_params) feed_params["previous_id"] = previous_id - logger.debug(f"Creating new analysis with parameters: {feed_params}") + logger.debug(f"Creating new analysis with plugin: {plugin_search_params} and parameters: {feed_params}", extra=d) feed_resp = client.createFeed(plugin_id, feed_params) @@ -303,5 +319,6 @@ def str_to_param_dict(params: str) -> dict: """ d_data = json.loads(args.data) key = dict_to_hash(d_data) + d['key'] = key manage_workflow(key, args.test) diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index da1fe3f..2c3b121 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -4,6 +4,7 @@ import json import logging import subprocess +import pprint from app.models.workflow import ( WorkflowRequestSchema, @@ -29,6 +30,7 @@ test_collection = database.get_collection("tests_collection") logger = logging.getLogger('pflink-logger') +d = {'workername': 'PFLINK', 'log_color': '\33[%dm', 'key': ""} # DB methods @@ -128,6 +130,7 @@ async def post_workflow( """ # create a hash key using the request db_key = request_to_hash(request) + d['key'] = db_key workflow = utils.retrieve_workflow(db_key, test) if not workflow: fingerprint = get_fingerprint(request) @@ -189,9 +192,14 @@ def manage_workflow(str_data: str, mode: str): Manage a workflow request in a separate subprocess """ proc_count = get_process_count("wf_manager", str_data) - logger.debug(f"{proc_count} subprocess of workflow manager running on the system.") - if proc_count > 0: return + logger.debug(f"{proc_count} subprocess of workflow manager running on the system.", extra=d) + if proc_count > 0: + logger.info(f"No new manager subprocess started.", extra=d) + return + d_cmd = ["python", "app/controllers/subprocesses/wf_manager.py", "--data", str_data] + pretty_cmd = pprint.pformat(d_cmd) + logger.debug(f"New manager subprocess started with command: {pretty_cmd}", extra=d) if mode: d_cmd.append(mode) subproc = subprocess.Popen(d_cmd) @@ -203,9 +211,14 @@ def update_workflow_status(str_data: str, mode: str): Update the current status of a workflow request in a separate process """ proc_count = get_process_count("status", str_data) - logger.debug(f"{proc_count} subprocess of status manager running on the system.") - if proc_count>0: return + logger.debug(f"{proc_count} subprocess of status manager running on the system.", extra=d) + if proc_count>0: + logger.info(f"No new status subprocess started.", extra=d) + return + d_cmd = ["python", "app/controllers/subprocesses/status.py", "--data", str_data] + pretty_cmd = pprint.pformat(d_cmd) + logger.debug(f"New status subprocess started with command: {pretty_cmd}", extra=d) if mode: d_cmd.append(mode) subproc = subprocess.Popen(d_cmd) diff --git a/app/log_config.py b/app/log_config.py index cd0d4da..3406938 100644 --- a/app/log_config.py +++ b/app/log_config.py @@ -1,11 +1,10 @@ - log_config = { "version": 1, "disable_existing_loggers": False, "formatters": { "default": { "()": "uvicorn.logging.DefaultFormatter", - "fmt": "%(levelprefix)s %(asctime)s %(message)s", + "fmt": "%(levelprefix)s | %(workername)s | %(asctime)s | key: %(key)s | %(message)s", "datefmt": "%Y-%m-%d %H:%M:%S", }, From a623facd18ec465504e92f78641cf959cba33352 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Thu, 24 Aug 2023 16:57:16 -0400 Subject: [PATCH 09/19] Formatted json objects and added colored logs --- app/controllers/subprocesses/status.py | 11 +++++++---- app/controllers/subprocesses/wf_manager.py | 16 +++++++++++----- app/controllers/workflow.py | 4 +++- app/log_config.py | 2 +- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index 327284d..2572b26 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -32,7 +32,7 @@ from app.controllers.subprocesses.subprocess_helper import get_process_count dictConfig(log_config) logger = logging.getLogger('pflink-logger') -d = {'workername': 'STATUS_MGR', 'log_color': "\33[%dm", 'key': ""} +d = {'workername': 'STATUS_MGR', 'log_color': "\33[36m", 'key': ""} parser = argparse.ArgumentParser(description='Process arguments passed through CLI') @@ -52,7 +52,7 @@ def update_workflow_status(key: str, test: bool): if is_status_subprocess_running(workflow): return - logger.info(f"UPDATING the status for {key}, locking DB flag", extra=d) + logger.info(f"Working on fetching the current status, locking DB flag", extra=d) update_status_flag(key, workflow, False, test) if test: @@ -61,8 +61,10 @@ def update_workflow_status(key: str, test: bool): updated_status = get_current_status(workflow.request, workflow.response) workflow.response = update_workflow_progress(updated_status) + pretty_response = pprint.pformat(workflow.response.__dict__) + logger.debug(f"Updated response: {pretty_response}", extra=d) update_status_flag(key, workflow, True, test) - logger.info(f"UPDATED status for {key}, releasing lock", extra=d) + logger.info(f"Finished writing updated status to the DB, releasing lock", extra=d) def update_workflow_progress(response: WorkflowStatusResponseSchema): @@ -213,7 +215,8 @@ def _get_feed_status(request: WorkflowRequestSchema, feed_id: str) -> dict: # search for feed logger.debug(f"Request CUBE at {cube_url} for feed id: {feed_id} and feed name: {feed_name}", extra=d) resp = cl.getFeed({"id": feed_id, "name_exact": feed_name}) - logger.debug(f"Response from CUBE : {resp}", extra=d) + pretty_response = pprint.pformat(resp) + logger.debug(f"Response from CUBE : {pretty_response}", extra=d) if resp["errored_jobs"] or resp["cancelled_jobs"]: l_inst_resp = cl.getPluginInstances({"feed_id": feed_id}) l_error = [d_instance['plugin_name'] for d_instance in l_inst_resp['data'] if d_instance['status']=='finishedWithError' or d_instance['status'] == 'cancelled'] diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 564ca8b..a359dec 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -31,7 +31,7 @@ ) dictConfig(log_config) logger = logging.getLogger('pflink-logger') -d = {'workername': 'WORKFLOW_MGR', 'key' : ""} +d = {'workername': 'WORKFLOW_MGR', 'key' : "",'log_color': "\33[33m"} parser = argparse.ArgumentParser(description='Process arguments') parser.add_argument('--data', type=str) @@ -44,6 +44,7 @@ def manage_workflow(db_key: str, test: bool): Manage workflow: Schedule task based on status from the DB """ + SLEEP_TIME = 10 MAX_RETRIES = 50 pl_inst_id = 0 workflow = retrieve_workflow(db_key) @@ -52,7 +53,7 @@ def manage_workflow(db_key: str, test: bool): reason = f"Workflow request failed. Error: {workflow.response.error}" if not workflow.response.status \ else f"Workflow already started. The current status is: {workflow.response.workflow_state}" logger.warning(f"Cannot restart this workflow request. : {reason}" - f". Kindly delete this request to restart using the delete API end point: {key}", extra=d) + f". Kindly delete this request to restart using the delete API end point", extra=d) return request = workflow.request @@ -61,7 +62,7 @@ def manage_workflow(db_key: str, test: bool): cube_url = get_cube_url_from_pfdcm(pfdcm_url, request.pfdcm_info.cube_service) while not workflow.response.workflow_state == State.ANALYZING and MAX_RETRIES > 0 and workflow.response.status: - logger.debug(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", extra=d) + workflow.started = True update_workflow(key, workflow) MAX_RETRIES -= 1 @@ -108,10 +109,13 @@ def manage_workflow(db_key: str, test: bool): workflow.response.status = False update_workflow(key, workflow) - logger.info(f"Calling status update subprocess for request# {key}", extra=d) + logger.info(f"Calling status update subprocess.", extra=d) update_status(request) - time.sleep(10) + logger.info(f"Sleeping for {SLEEP_TIME} seconds", extra=d) + time.sleep(SLEEP_TIME) + workflow = retrieve_workflow(key) + logger.debug(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", extra=d) # Reset workflow status if max service_retry is not reached if workflow.service_retry > 0 and not workflow.response.status: @@ -129,6 +133,8 @@ def manage_workflow(db_key: str, test: bool): update_workflow(key, workflow) logger.info("Exiting manager subprocess", extra=d) + logger.info(f"Exiting while loop. End of workflow_manager.", extra=d) + def update_status(request: WorkflowRequestSchema): """ diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index 2c3b121..f0d0608 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -30,7 +30,7 @@ test_collection = database.get_collection("tests_collection") logger = logging.getLogger('pflink-logger') -d = {'workername': 'PFLINK', 'log_color': '\33[%dm', 'key': ""} +d = {'workername': 'PFLINK' ,'log_color': "\33[32m", 'key': ""} # DB methods @@ -168,6 +168,8 @@ def create_new_workflow( response = WorkflowStatusResponseSchema() new_workflow = WorkflowDBSchema(key=key, fingerprint=fingerprint, request=request, response=response) workflow = add_workflow(new_workflow, test) + pretty_response = pprint.pformat(workflow.response.__dict__) + logger.info(f"New workflow record created. Initial workflow response: {pretty_response}", extra=d) return workflow diff --git a/app/log_config.py b/app/log_config.py index 3406938..b7c116e 100644 --- a/app/log_config.py +++ b/app/log_config.py @@ -4,7 +4,7 @@ "formatters": { "default": { "()": "uvicorn.logging.DefaultFormatter", - "fmt": "%(levelprefix)s | %(workername)s | %(asctime)s | key: %(key)s | %(message)s", + "fmt": "%(log_color)s%(levelprefix)s | %(workername)s | %(asctime)s | key: %(key)s | %(message)s\33[0m", "datefmt": "%Y-%m-%d %H:%M:%S", }, From 1b6427562ec3b03235437bc4f43b7b0592880b6a Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Thu, 24 Aug 2023 17:02:19 -0400 Subject: [PATCH 10/19] Deleted additional log model and updated log --- app/controllers/subprocesses/status.py | 1 - app/controllers/subprocesses/wf_manager.py | 3 +- app/main.py | 2 - app/models/log.py | 44 ---------------------- 4 files changed, 1 insertion(+), 49 deletions(-) delete mode 100644 app/models/log.py diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index 2572b26..cf56913 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -9,7 +9,6 @@ import time import pprint from logging.config import dictConfig -from app.models.log import LogConfig from app.log_config import log_config from app.models.workflow import ( State, diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index a359dec..4d44a3b 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -11,7 +11,6 @@ from app.controllers.subprocesses.python_chris_client import PythonChrisClient from app.controllers.subprocesses.subprocess_helper import get_process_count from logging.config import dictConfig -from app.models.log import LogConfig from app.log_config import log_config from app.models.workflow import ( Error, @@ -203,7 +202,7 @@ def pfdcm_do(verb: str, then_args: dict, request: WorkflowRequestSchema, url: st response = requests.post(pfdcm_dicom_api, json=body, headers=headers) et = time.time() elapsed_time = et - st - logger.debug(f'Execution time to {verb}:{elapsed_time} seconds', extra=d) + logger.debug(f'Execution time to request {verb}:{elapsed_time} seconds', extra=d) def do_pfdcm_retrieve(dicom: WorkflowRequestSchema, pfdcm_url: str): diff --git a/app/main.py b/app/main.py index c152eef..41fce29 100644 --- a/app/main.py +++ b/app/main.py @@ -9,7 +9,6 @@ from app.config import settings from app.controllers import auth from logging.config import dictConfig -from app.models.log import LogConfig from app.log_config import log_config description = """ @@ -66,7 +65,6 @@ } ] -#dictConfig(LogConfig().dict()) dictConfig(log_config) app = FastAPI( diff --git a/app/models/log.py b/app/models/log.py deleted file mode 100644 index bdafcd9..0000000 --- a/app/models/log.py +++ /dev/null @@ -1,44 +0,0 @@ -from pydantic import BaseSettings -import logging - - -class LogConfig(BaseSettings): - """Logging configuration to be set for the server""" - grey = "\x1b[38;20m" - yellow = "\x1b[33;20m" - red = "\x1b[31;20m" - bold_red = "\x1b[31;1m" - reset = "\x1b[0m" - format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s %(pathname)s.%(funcName)s:%(lineno)d)" - - FORMATS = { - logging.DEBUG: grey + format + reset, - logging.INFO: grey + format + reset, - logging.WARNING: yellow + format + reset, - logging.ERROR: red + format + reset, - logging.CRITICAL: bold_red + format + reset - } - LOGGER_NAME: str = "pflink" - # LOG_FORMAT: str = "%(levelprefix)s | %(asctime)s | %(message)s" - LOG_LEVEL: str = "DEBUG" - - # Logging config - version = 1 - disable_existing_loggers = False - formatters = { - "default": { - "()": "uvicorn.logging.DefaultFormatter", - "fmt": format, - "datefmt": "%Y-%m-%d %H:%M:%S", - }, - } - handlers = { - "default": { - "formatter": "default", - "class": "logging.StreamHandler", - "stream": "ext://sys.stderr", - }, - } - loggers = { - LOGGER_NAME: {"handlers": ["default"], "level": LOG_LEVEL}, - } From e833ea2a30c321cb373a2d121e0b9be9616e45cf Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Tue, 29 Aug 2023 15:32:51 -0400 Subject: [PATCH 11/19] Model updates --- app/controllers/subprocesses/status.py | 9 ++++--- app/controllers/subprocesses/wf_manager.py | 7 ++++-- app/controllers/workflow.py | 28 ++++------------------ 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index cf56913..22e21c3 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -185,7 +185,8 @@ def _get_pfdcm_status(request: WorkflowRequestSchema): d_response["service_name"] = request.pfdcm_info.pfdcm_service return d_response except Exception as ex: - logger.error(f"{Error.pfdcm.value} {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}", extra=d) + logger.error(f"{Error.pfdcm.value} {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}", + extra=d) return {"error": Error.pfdcm.value + f" {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}"} @@ -216,9 +217,10 @@ def _get_feed_status(request: WorkflowRequestSchema, feed_id: str) -> dict: resp = cl.getFeed({"id": feed_id, "name_exact": feed_name}) pretty_response = pprint.pformat(resp) logger.debug(f"Response from CUBE : {pretty_response}", extra=d) - if resp["errored_jobs"] or resp["cancelled_jobs"]: + if resp.get("errored_jobs") or resp.get("cancelled_jobs"): l_inst_resp = cl.getPluginInstances({"feed_id": feed_id}) - l_error = [d_instance['plugin_name'] for d_instance in l_inst_resp['data'] if d_instance['status']=='finishedWithError' or d_instance['status'] == 'cancelled'] + l_error = [d_instance['plugin_name'] for d_instance in l_inst_resp['data'] + if d_instance['status']=='finishedWithError' or d_instance['status'] == 'cancelled'] resp["errored_plugins"] = str(l_error) return resp except Exception as ex: @@ -335,6 +337,7 @@ def _parse_response( status.workflow_state = State.FEED_DELETED status.status = False status.error = Error.feed_deleted.value + status.state_progress = "0%" return status diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 4d44a3b..765f42b 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -110,11 +110,13 @@ def manage_workflow(db_key: str, test: bool): logger.info(f"Calling status update subprocess.", extra=d) update_status(request) + logger.info(f"Sleeping for {SLEEP_TIME} seconds", extra=d) time.sleep(SLEEP_TIME) workflow = retrieve_workflow(key) - logger.debug(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", extra=d) + logger.debug(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", + extra=d) # Reset workflow status if max service_retry is not reached if workflow.service_retry > 0 and not workflow.response.status: @@ -281,7 +283,8 @@ def __run_plugin_instance(previous_id: str, request: WorkflowRequestSchema, clie # convert CLI params from string to a JSON dictionary feed_params = str_to_param_dict(request.workflow_info.plugin_params) feed_params["previous_id"] = previous_id - logger.debug(f"Creating new analysis with plugin: {plugin_search_params} and parameters: {feed_params}", extra=d) + logger.debug(f"Creating new analysis with plugin: {plugin_search_params} and parameters: {feed_params}", + extra=d) feed_resp = client.createFeed(plugin_id, feed_params) diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index f0d0608..ca2a7ef 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -36,25 +36,6 @@ # Retrieve all workflows present in the DB -def retrieve_workflows_by(search_params: WorkflowSearchSchema, test: bool = False): - collection = test_collection if test else workflow_collection - workflows = [] - if search_params.cube_username: - workflows = collection.find({"request.cube_user_info.username": search_params.cube_username}) - elif search_params.pipeline_name: - workflows = collection.find({"request.workflow_info.pipeline_name": search_params.pipeline_name}) - elif search_params.plugin_name: - workflows = collection.find({"request.workflow_info.plugin_name": search_params.plugin_name}) - elif search_params.plugin_version: - workflows = collection.find({"request.workflow_info.plugin_version": search_params.plugin_version}) - elif search_params.plugin_params: - workflows = collection.find({"request.workflow_info.plugin_params": search_params.plugin_params}) - elif search_params.date: - workflows = collection.find({"date": search_params.date}) - search_results = [] - for wrkflo in workflows: search_results.append(wrkflo['_id']) - - return search_results def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): collection = test_collection if test else workflow_collection @@ -149,12 +130,12 @@ async def post_workflow( return create_response_with_error(error_type, workflow.response) mode, str_data = get_suproc_params(test, request) - # run workflow manager subprocess on the workflow - sub_mng = manage_workflow(str_data, mode) - # run status_update subprocess on the workflow sub_updt = update_workflow_status(str_data, mode) # debug_process(sub_updt) + + # run workflow manager subprocess on the workflow + sub_mng = manage_workflow(str_data, mode) return workflow.response @@ -247,7 +228,8 @@ def check_for_duplicates(request_hash: str, test: bool = False): if workflows: for workflow in workflows: record = utils.workflow_retrieve_helper(workflow) - user_response = UserResponseSchema(username=record.request.cube_user_info.username, response=record.response.__dict__) + user_response = UserResponseSchema(username=record.request.cube_user_info.username, + response=record.response.__dict__) user_responses.append(user_response) return user_responses From 4d0542ff2fd89da164f67a433c7e9dbbb56f853f Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Wed, 30 Aug 2023 15:29:16 -0400 Subject: [PATCH 12/19] Code walk changes and ndjson logs --- app/config.py | 27 ++++++++++++++++++++++ app/controllers/pfdcm.py | 15 +++--------- app/controllers/subprocesses/status.py | 4 ++-- app/controllers/subprocesses/wf_manager.py | 4 ++-- app/controllers/workflow.py | 12 ++++++---- app/log_config.py | 22 ------------------ app/main.py | 5 ++-- 7 files changed, 44 insertions(+), 45 deletions(-) delete mode 100644 app/log_config.py diff --git a/app/config.py b/app/config.py index c638933..18dcc2d 100644 --- a/app/config.py +++ b/app/config.py @@ -12,5 +12,32 @@ class Auth(BaseSettings): password: str = 'pflink1234' +class LogConfig(BaseSettings): + level: str = "INFO" + log_config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "default": { + "()": "uvicorn.logging.DefaultFormatter", + "fmt": '%(log_color)s {"level":"%(levelprefix)s", "worker":"%(workername)s", "timestamp":"%(asctime)s", "key":"%(key)s", "msg":"%(message)s"}\33[0m', + "datefmt": "%Y-%m-%d %H:%M:%S", + + }, + }, + "handlers": { + "default": { + "formatter": "default", + "class": "logging.StreamHandler", + "stream": "ext://sys.stderr", + }, + }, + "loggers": { + "pflink-logger": {"handlers": ["default"], "level": level}, + }, + } + + settings = Settings() auth = Auth() +log = LogConfig() diff --git a/app/controllers/pfdcm.py b/app/controllers/pfdcm.py index 4b8bdd4..ea8a255 100644 --- a/app/controllers/pfdcm.py +++ b/app/controllers/pfdcm.py @@ -14,20 +14,13 @@ def pfdcm_helper(pfdcm) -> dict: - key = str_to_hash(pfdcm["service_name"]) return { - "_id": key, + "_id": pfdcm["service_name"], "service_name": pfdcm["service_name"], "service_address": pfdcm["service_address"], } -def str_to_hash(str_data: str) -> str: - hash_request = hashlib.md5(str_data.encode()) - key = hash_request.hexdigest() - return key - - # Retrieve all pfdcm records present in the database async def retrieve_pfdcms(): pfdcms = [pfdcm["service_name"] for pfdcm in pfdcm_collection.find()] @@ -41,8 +34,7 @@ async def add_pfdcm(pfdcm_data: dict) -> dict: """ try: pfdcm = pfdcm_collection.insert_one(pfdcm_helper(pfdcm_data)) - new_pfdcm = pfdcm_collection.find_one({"_id": pfdcm.inserted_id}) - return pfdcm_helper(new_pfdcm) + return pfdcm_helper(pfdcm) except: return {} @@ -141,9 +133,8 @@ async def delete_pfdcm(service_name: str): Delete a pfdcm record from the DB """ delete_count = 0 - key = str_to_hash(service_name) for pfdcm in pfdcm_collection.find(): - if pfdcm["_id"] == key: + if pfdcm["_id"] == service_name: pfdcm_collection.delete_one({"_id": pfdcm["_id"]}) delete_count += 1 return {"Message": f"{delete_count} record(s) deleted!"} diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index 22e21c3..4dae175 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -9,7 +9,7 @@ import time import pprint from logging.config import dictConfig -from app.log_config import log_config +from app.config import log from app.models.workflow import ( State, WorkflowRequestSchema, @@ -29,7 +29,7 @@ ) from app.controllers.subprocesses.subprocess_helper import get_process_count -dictConfig(log_config) +dictConfig(log.log_config) logger = logging.getLogger('pflink-logger') d = {'workername': 'STATUS_MGR', 'log_color': "\33[36m", 'key': ""} diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 765f42b..a79e82b 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -11,7 +11,7 @@ from app.controllers.subprocesses.python_chris_client import PythonChrisClient from app.controllers.subprocesses.subprocess_helper import get_process_count from logging.config import dictConfig -from app.log_config import log_config +from app.config import log from app.models.workflow import ( Error, State, @@ -28,7 +28,7 @@ do_cube_create_user, retrieve_pfdcm_url, ) -dictConfig(log_config) +dictConfig(log.log_config) logger = logging.getLogger('pflink-logger') d = {'workername': 'WORKFLOW_MGR', 'key' : "",'log_color': "\33[33m"} diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index ca2a7ef..c7a1131 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -113,7 +113,13 @@ async def post_workflow( db_key = request_to_hash(request) d['key'] = db_key workflow = utils.retrieve_workflow(db_key, test) - if not workflow: + + mode, str_data = get_suproc_params(test, request) + if workflow: + # if there is an existing record in the DB, just run a status subprocess + sub_updt = update_workflow_status(str_data, mode) + + else: fingerprint = get_fingerprint(request) duplicates = check_for_duplicates(fingerprint, test) if duplicates and not request.ignore_duplicate: @@ -124,14 +130,12 @@ async def post_workflow( return response workflow = create_new_workflow(db_key, fingerprint, request, test) + # 'error_type' is an optional test-only parameter that forces the workflow to error out # at a given error state if error_type: return create_response_with_error(error_type, workflow.response) - mode, str_data = get_suproc_params(test, request) - # run status_update subprocess on the workflow - sub_updt = update_workflow_status(str_data, mode) # debug_process(sub_updt) # run workflow manager subprocess on the workflow diff --git a/app/log_config.py b/app/log_config.py deleted file mode 100644 index b7c116e..0000000 --- a/app/log_config.py +++ /dev/null @@ -1,22 +0,0 @@ -log_config = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "default": { - "()": "uvicorn.logging.DefaultFormatter", - "fmt": "%(log_color)s%(levelprefix)s | %(workername)s | %(asctime)s | key: %(key)s | %(message)s\33[0m", - "datefmt": "%Y-%m-%d %H:%M:%S", - - }, - }, - "handlers": { - "default": { - "formatter": "default", - "class": "logging.StreamHandler", - "stream": "ext://sys.stderr", - }, - }, - "loggers": { - "pflink-logger": {"handlers": ["default"], "level": "DEBUG"}, - }, -} \ No newline at end of file diff --git a/app/main.py b/app/main.py index 41fce29..4caf129 100644 --- a/app/main.py +++ b/app/main.py @@ -6,10 +6,9 @@ from app.routes.testing import router as WorkflowTestRouter from app.routes.auth import router as AuthRouter from app.routes.cube import router as CubeRouter -from app.config import settings +from app.config import settings, log from app.controllers import auth from logging.config import dictConfig -from app.log_config import log_config description = """ `pflink` is an application to interact with `CUBE` and `pfdcm` 🚀 @@ -65,7 +64,7 @@ } ] -dictConfig(log_config) +dictConfig(log.log_config) app = FastAPI( title='pflink', From 9cdb99d29abf8711b71470f94d349daa6827c133 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Thu, 31 Aug 2023 11:55:39 -0400 Subject: [PATCH 13/19] Updated logs to ndjson format --- app/config.py | 2 +- app/controllers/pfdcm.py | 14 ++++++++++---- app/routes/pfdcm.py | 5 ++--- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/app/config.py b/app/config.py index 18dcc2d..43625c1 100644 --- a/app/config.py +++ b/app/config.py @@ -20,7 +20,7 @@ class LogConfig(BaseSettings): "formatters": { "default": { "()": "uvicorn.logging.DefaultFormatter", - "fmt": '%(log_color)s {"level":"%(levelprefix)s", "worker":"%(workername)s", "timestamp":"%(asctime)s", "key":"%(key)s", "msg":"%(message)s"}\33[0m', + "fmt": '%(log_color)s {"levelname":"%(levelname)s", "worker":"%(workername)s", "timestamp":"%(asctime)s", "key":"%(key)s", "msg":"%(message)s"}\33[0m', "datefmt": "%Y-%m-%d %H:%M:%S", }, diff --git a/app/controllers/pfdcm.py b/app/controllers/pfdcm.py index ea8a255..e93c03d 100644 --- a/app/controllers/pfdcm.py +++ b/app/controllers/pfdcm.py @@ -1,6 +1,8 @@ import json import hashlib import httpx +import pymongo.results + from app.config import settings from pymongo import MongoClient @@ -33,10 +35,14 @@ async def add_pfdcm(pfdcm_data: dict) -> dict: DB constraint: Only unique names allowed """ try: - pfdcm = pfdcm_collection.insert_one(pfdcm_helper(pfdcm_data)) - return pfdcm_helper(pfdcm) - except: - return {} + pfdcm: pymongo.results.InsertOneResult = pfdcm_collection.insert_one(pfdcm_helper(pfdcm_data)) + if pfdcm.acknowledged: + inserted_pfdcm: dict = pfdcm_collection.find_one({"_id": pfdcm.inserted_id}) + return inserted_pfdcm + else: + raise Exception("Could not store new record.") + except Exception as ex: + return {"error": str(ex)} # Retrieve a pfdcm record with a matching service name diff --git a/app/routes/pfdcm.py b/app/routes/pfdcm.py index 85c6665..4a19781 100644 --- a/app/routes/pfdcm.py +++ b/app/routes/pfdcm.py @@ -21,9 +21,8 @@ async def add_pfdcm_data(pfdcm_data: PfdcmQuerySchema = Body(...)) -> PfdcmQuery """ pfdcm_data = jsonable_encoder(pfdcm_data) new_pfdcm = await pfdcm.add_pfdcm(pfdcm_data) - if not new_pfdcm: - return PfdcmQueryResponseSchema(data={}, message=f"service_name must be unique." - f" {pfdcm_data['service_name']} already exists.") + if new_pfdcm.get("error"): + return PfdcmQueryResponseSchema(data={}, message=new_pfdcm["error"]) return PfdcmQueryResponseSchema(data=new_pfdcm, message="New record created.") From 93a1558caf0116084c8087e6ac531afab2ec4663 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Thu, 31 Aug 2023 17:45:02 -0400 Subject: [PATCH 14/19] updated mongo & modified error handling in pfdcm services --- app/config.py | 7 +++-- app/controllers/pfdcm.py | 7 ++--- app/controllers/search.py | 12 ++++---- app/controllers/subprocesses/utils.py | 2 +- app/controllers/subprocesses/wf_manager.py | 7 ++--- app/controllers/workflow.py | 35 +++++++++++----------- app/models/workflow.py | 1 - app/routes/pfdcm.py | 12 ++++---- app/routes/workflow.py | 4 +-- docker-compose.yml | 17 ++++++++--- 10 files changed, 55 insertions(+), 49 deletions(-) diff --git a/app/config.py b/app/config.py index 43625c1..41f7db4 100644 --- a/app/config.py +++ b/app/config.py @@ -4,6 +4,8 @@ class Settings(BaseSettings): pflink_mongodb: MongoDsn = 'mongodb://localhost:27017' version: str = "3.7.0" + mongo_username: str = "admin" + mongo_password: str = "admin" class Auth(BaseSettings): @@ -13,14 +15,15 @@ class Auth(BaseSettings): class LogConfig(BaseSettings): - level: str = "INFO" + level: str = "DEBUG" log_config = { "version": 1, "disable_existing_loggers": False, "formatters": { "default": { "()": "uvicorn.logging.DefaultFormatter", - "fmt": '%(log_color)s {"levelname":"%(levelname)s", "worker":"%(workername)s", "timestamp":"%(asctime)s", "key":"%(key)s", "msg":"%(message)s"}\33[0m', + "fmt": '%(log_color)s {"levelname":"%(levelname)s", "worker":"%(workername)s", "timestamp":"%(' + 'asctime)s", "key":"%(key)s", "msg":"%(message)s"}\33[0m', "datefmt": "%Y-%m-%d %H:%M:%S", }, diff --git a/app/controllers/pfdcm.py b/app/controllers/pfdcm.py index e93c03d..0a92da7 100644 --- a/app/controllers/pfdcm.py +++ b/app/controllers/pfdcm.py @@ -1,5 +1,4 @@ import json -import hashlib import httpx import pymongo.results @@ -7,7 +6,7 @@ from pymongo import MongoClient MONGO_DETAILS = str(settings.pflink_mongodb) -client = MongoClient(MONGO_DETAILS) +client = MongoClient(MONGO_DETAILS, username=settings.mongo_username, password=settings.mongo_password) database = client.database pfdcm_collection = database.get_collection("pfdcms_collection") @@ -64,8 +63,8 @@ async def hello_pfdcm(pfdcm_name: str) -> dict: response = await client.get(pfdcm_hello_api) d_results = json.loads(response.text) return d_results - except: - return {"error": f"Unable to reach {pfdcm_url}."} + except Exception as ex: + return {"error": f"Unable to reach {pfdcm_url}.{ex}"} async def about_pfdcm(service_name: str) -> dict: diff --git a/app/controllers/search.py b/app/controllers/search.py index 8904162..9019290 100644 --- a/app/controllers/search.py +++ b/app/controllers/search.py @@ -1,11 +1,11 @@ from app.models.workflow import WorkflowSearchSchema -def compound_queries(query_params: WorkflowSearchSchema): - query = { "$match": { "$text": - {"$search": query_params.keywords} +def compound_queries(query_params: WorkflowSearchSchema): + query = {"$match": {"$text": + {"$search": query_params.keywords} - }} + }} rank = { "$sort": {"score": {"$meta": "textScore"}} @@ -17,9 +17,9 @@ def compound_queries(query_params: WorkflowSearchSchema): } } - return query, rank, response + def index_search(query_params: dict): query = { "$match": { @@ -30,4 +30,4 @@ def index_search(query_params: dict): } } } - return query \ No newline at end of file + return query diff --git a/app/controllers/subprocesses/utils.py b/app/controllers/subprocesses/utils.py index 4afee1e..483a0f5 100644 --- a/app/controllers/subprocesses/utils.py +++ b/app/controllers/subprocesses/utils.py @@ -14,7 +14,7 @@ ) MONGO_DETAILS = str(settings.pflink_mongodb) -client = MongoClient(MONGO_DETAILS) +client = MongoClient(MONGO_DETAILS, username=settings.mongo_username, password=settings.mongo_password) database = client.database diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index a79e82b..07fae14 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -16,7 +16,6 @@ Error, State, WorkflowRequestSchema, - WorkflowDBSchema, ) from app.controllers.subprocesses.utils import ( request_to_dict, @@ -49,9 +48,9 @@ def manage_workflow(db_key: str, test: bool): workflow = retrieve_workflow(db_key) if workflow.started or not workflow.response.status or test: # Do nothing and return - reason = f"Workflow request failed. Error: {workflow.response.error}" if not workflow.response.status \ - else f"Workflow already started. The current status is: {workflow.response.workflow_state}" - logger.warning(f"Cannot restart this workflow request. : {reason}" + reason = f"Workflow request failed. {workflow.response.error}" if not workflow.response.status \ + else f"Workflow already started. The current status is {workflow.response.workflow_state}" + logger.warning(f"Cannot restart this workflow request. {reason}" f". Kindly delete this request to restart using the delete API end point", extra=d) return diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index c7a1131..753ed58 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -14,9 +14,6 @@ UserResponseSchema, State, WorkflowSearchSchema, - PFDCMInfoSchema, - PACSqueryCore, - WorkflowInfoSchema, ) from app.controllers import search from app.controllers.subprocesses.subprocess_helper import get_process_count @@ -24,13 +21,14 @@ from app.config import settings MONGO_DETAILS = str(settings.pflink_mongodb) -client = MongoClient(MONGO_DETAILS) +client = MongoClient(MONGO_DETAILS, username=settings.mongo_username, password=settings.mongo_password) database = client.database workflow_collection = database.get_collection("workflows_collection") test_collection = database.get_collection("tests_collection") logger = logging.getLogger('pflink-logger') -d = {'workername': 'PFLINK' ,'log_color': "\33[32m", 'key': ""} +d = {'workername': 'PFLINK', 'log_color': "\33[32m", 'key': ""} + # DB methods @@ -39,17 +37,17 @@ def retrieve_workflows(search_params: WorkflowSearchSchema, test: bool = False): collection = test_collection if test else workflow_collection - index = collection.create_index([('$**',TEXT)], - name='search_index', default_language='english') + index = collection.create_index([('$**', TEXT)], + name='search_index', default_language='english') workflows = [] query, rank, response = search.compound_queries(search_params) workflows = collection.aggregate( - [ - { "$match": {"$text": { "$search": search_params.keywords } } }, - { "$project": {"_id": 1 , "score": { "$meta": "textScore" }} }, - {"$sort": {"score": -1}}, - ] -) + [ + {"$match": {"$text": {"$search": search_params.keywords}}}, + {"$project": {"_id": 1, "score": {"$meta": "textScore"}}}, + {"$sort": {"score": -1}}, + ] + ) search_results = [] for wrkflo in workflows: search_results.append(str(wrkflo)) @@ -130,16 +128,17 @@ async def post_workflow( return response workflow = create_new_workflow(db_key, fingerprint, request, test) - # 'error_type' is an optional test-only parameter that forces the workflow to error out # at a given error state if error_type: + logger.error(workflow.response.error, extra=d) return create_response_with_error(error_type, workflow.response) # debug_process(sub_updt) # run workflow manager subprocess on the workflow sub_mng = manage_workflow(str_data, mode) + logger.debug(f"Status response is {workflow.response}") return workflow.response @@ -154,7 +153,7 @@ def create_new_workflow( new_workflow = WorkflowDBSchema(key=key, fingerprint=fingerprint, request=request, response=response) workflow = add_workflow(new_workflow, test) pretty_response = pprint.pformat(workflow.response.__dict__) - logger.info(f"New workflow record created. Initial workflow response: {pretty_response}", extra=d) + logger.info(f"New workflow record created.", extra=d) return workflow @@ -199,7 +198,7 @@ def update_workflow_status(str_data: str, mode: str): """ proc_count = get_process_count("status", str_data) logger.debug(f"{proc_count} subprocess of status manager running on the system.", extra=d) - if proc_count>0: + if proc_count > 0: logger.info(f"No new status subprocess started.", extra=d) return @@ -240,8 +239,8 @@ def check_for_duplicates(request_hash: str, test: bool = False): def get_fingerprint(request: WorkflowRequestSchema) -> str: """ - Create a unique has on a request footprint. - A request footprint is a users request payload stripped down to + Create a unique has on a request fingerprint. + A request fingerprint is a users request payload stripped down to include only essential information such as pfdcm_info, workflow_info and PACS directive. """ diff --git a/app/models/workflow.py b/app/models/workflow.py index e920326..cb56e4f 100644 --- a/app/models/workflow.py +++ b/app/models/workflow.py @@ -169,4 +169,3 @@ class WorkflowDBSchema(BaseModel): class WorkflowSearchSchema(BaseModel): """A schema to search Workflow DB records""" keywords: str = "" - date: str = "" diff --git a/app/routes/pfdcm.py b/app/routes/pfdcm.py index 4a19781..7f4873f 100644 --- a/app/routes/pfdcm.py +++ b/app/routes/pfdcm.py @@ -22,7 +22,7 @@ async def add_pfdcm_data(pfdcm_data: PfdcmQuerySchema = Body(...)) -> PfdcmQuery pfdcm_data = jsonable_encoder(pfdcm_data) new_pfdcm = await pfdcm.add_pfdcm(pfdcm_data) if new_pfdcm.get("error"): - return PfdcmQueryResponseSchema(data={}, message=new_pfdcm["error"]) + raise HTTPException(status_code=400, detail=new_pfdcm["error"]) return PfdcmQueryResponseSchema(data=new_pfdcm, message="New record created.") @@ -65,7 +65,7 @@ async def get_hello_pfdcm(service_name: str) -> PfdcmQueryResponseSchema: """ response = await pfdcm.hello_pfdcm(service_name) if response.get("error"): - raise HTTPException(status_code=404, detail=response["error"]) + raise HTTPException(status_code=502, detail=response["error"]) return PfdcmQueryResponseSchema(data=response, message='') @@ -80,7 +80,7 @@ async def get_about_pfdcm(service_name: str) -> PfdcmQueryResponseSchema: """ response = await pfdcm.about_pfdcm(service_name) if response.get("error"): - raise HTTPException(status_code=404, detail=response["error"]) + raise HTTPException(status_code=502, detail=response["error"]) return PfdcmQueryResponseSchema(data=response, message='') @@ -95,7 +95,7 @@ async def cube_service_list(service_name: str) -> list[str]: """ response = await pfdcm.cube_list(service_name) if not response: - raise HTTPException(status_code=404, detail=f"Unable to reach endpoints of {service_name}") + raise HTTPException(status_code=502, detail=f"Unable to reach endpoints of {service_name}") return response @@ -110,7 +110,7 @@ async def storage_service_list(service_name: str) -> list[str]: """ response = await pfdcm.storage_list(service_name) if not response: - raise HTTPException(status_code=404, detail=f"Unable to reach endpoints of {service_name}") + raise HTTPException(status_code=502, detail=f"Unable to reach endpoints of {service_name}") return response @@ -125,7 +125,7 @@ async def pacs_service_list(service_name: str) -> list[str]: """ response = await pfdcm.pacs_list(service_name) if not response: - raise HTTPException(status_code=404, detail=f"Unable to reach endpoints of {service_name}") + raise HTTPException(status_code=502, detail=f"Unable to reach endpoints of {service_name}") return response diff --git a/app/routes/workflow.py b/app/routes/workflow.py index 64e15f4..7dc552d 100644 --- a/app/routes/workflow.py +++ b/app/routes/workflow.py @@ -26,15 +26,13 @@ async def create_workflow(data: WorkflowRequestSchema) -> WorkflowStatusResponse @router.get("/search", response_description="All workflows retrieved") async def get_workflows( - keywords: str = "", - date: str = "", + keywords: str = "" ): """ Fetch all workflows currently present in the database matching the search criteria """ search_params = WorkflowSearchSchema( keywords = keywords, - date = date, ) workflows = workflow.retrieve_workflows(search_params) return workflows diff --git a/docker-compose.yml b/docker-compose.yml index 8805053..c800684 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,8 @@ services: - ./:/app/ environment: PFLINK_MONGODB: mongodb://pflink-db:27017 + MONGO_USERNAME: admin + MONGO_PASSWORD: admin ports: - "8050:8050" networks: @@ -18,12 +20,18 @@ services: pflink-db: image: mongo:4.4.6 environment: - - PUID=1000 - - PGID=1000 + MONGO_INITDB_ROOT_USERNAME: admin + MONGO_INITDB_ROOT_PASSWORD: admin ports: - 27017:27017 volumes: - - pflink-db-data:/data/db + - type: volume + source: MONGO_DATA + target: /data/db + - type: volume + source: MONGO_CONFIG + target: /data/configdb + restart: unless-stopped networks: pflink: @@ -33,5 +41,6 @@ networks: pflink: volumes: - pflink-db-data: + MONGO_DATA: + MONGO_CONFIG: From 34a4654a9d7345d49c5d220568eb68082f4cd221 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Fri, 1 Sep 2023 10:05:06 -0400 Subject: [PATCH 15/19] pass log level params as env variables --- app/config.py | 30 +--------------------- app/controllers/subprocesses/status.py | 2 +- app/controllers/subprocesses/wf_manager.py | 2 +- app/controllers/workflow.py | 2 +- app/log.py | 24 +++++++++++++++++ app/main.py | 4 +-- docker-compose.yml | 1 + 7 files changed, 31 insertions(+), 34 deletions(-) create mode 100644 app/log.py diff --git a/app/config.py b/app/config.py index 41f7db4..a2ab84b 100644 --- a/app/config.py +++ b/app/config.py @@ -6,6 +6,7 @@ class Settings(BaseSettings): version: str = "3.7.0" mongo_username: str = "admin" mongo_password: str = "admin" + log_level: str = "INFO" class Auth(BaseSettings): @@ -13,34 +14,5 @@ class Auth(BaseSettings): user_name: str = 'pflink' password: str = 'pflink1234' - -class LogConfig(BaseSettings): - level: str = "DEBUG" - log_config = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "default": { - "()": "uvicorn.logging.DefaultFormatter", - "fmt": '%(log_color)s {"levelname":"%(levelname)s", "worker":"%(workername)s", "timestamp":"%(' - 'asctime)s", "key":"%(key)s", "msg":"%(message)s"}\33[0m', - "datefmt": "%Y-%m-%d %H:%M:%S", - - }, - }, - "handlers": { - "default": { - "formatter": "default", - "class": "logging.StreamHandler", - "stream": "ext://sys.stderr", - }, - }, - "loggers": { - "pflink-logger": {"handlers": ["default"], "level": level}, - }, - } - - settings = Settings() auth = Auth() -log = LogConfig() diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index 4dae175..856395b 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -9,7 +9,7 @@ import time import pprint from logging.config import dictConfig -from app.config import log +from app import log from app.models.workflow import ( State, WorkflowRequestSchema, diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 07fae14..8f7125c 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -11,7 +11,7 @@ from app.controllers.subprocesses.python_chris_client import PythonChrisClient from app.controllers.subprocesses.subprocess_helper import get_process_count from logging.config import dictConfig -from app.config import log +from app import log from app.models.workflow import ( Error, State, diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index 753ed58..4ac6c2c 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -138,7 +138,7 @@ async def post_workflow( # run workflow manager subprocess on the workflow sub_mng = manage_workflow(str_data, mode) - logger.debug(f"Status response is {workflow.response}") + logger.debug(f"Status response is {workflow.response}", extra=d) return workflow.response diff --git a/app/log.py b/app/log.py new file mode 100644 index 0000000..016b65b --- /dev/null +++ b/app/log.py @@ -0,0 +1,24 @@ +from app.config import settings +log_config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "default": { + "()": "uvicorn.logging.DefaultFormatter", + "fmt": '%(log_color)s {"worker":"%(workername)s", "timestamp":"%(' + 'asctime)s", "key":"%(key)s", "%(levelname)s":"%(message)s"}\33[0m', + "datefmt": "%Y-%m-%d %H:%M:%S", + + }, + }, + "handlers": { + "default": { + "formatter": "default", + "class": "logging.StreamHandler", + "stream": "ext://sys.stderr", + }, + }, + "loggers": { + "pflink-logger": {"handlers": ["default"], "level": settings.log_level}, + }, + } \ No newline at end of file diff --git a/app/main.py b/app/main.py index 4caf129..067211a 100644 --- a/app/main.py +++ b/app/main.py @@ -6,10 +6,10 @@ from app.routes.testing import router as WorkflowTestRouter from app.routes.auth import router as AuthRouter from app.routes.cube import router as CubeRouter -from app.config import settings, log +from app.config import settings from app.controllers import auth from logging.config import dictConfig - +from app import log description = """ `pflink` is an application to interact with `CUBE` and `pfdcm` 🚀 diff --git a/docker-compose.yml b/docker-compose.yml index c800684..02355d7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,7 @@ services: PFLINK_MONGODB: mongodb://pflink-db:27017 MONGO_USERNAME: admin MONGO_PASSWORD: admin + LOG_LEVEL: DEBUG ports: - "8050:8050" networks: From 682c260472bb891041d3785b43308d56950392a7 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Fri, 1 Sep 2023 13:13:49 -0400 Subject: [PATCH 16/19] log cleanup and additional info --- app/controllers/subprocesses/wf_manager.py | 6 +++++- app/log.py | 4 ++-- docker-compose.yml | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 8f7125c..08cb6c8 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -74,21 +74,25 @@ def manage_workflow(db_key: str, test: bool): do_pfdcm_retrieve(request, pfdcm_url) case State.RETRIEVING: + logger.info(f"Retrieving progress {workflow.response.state_progress}", extra=d) if workflow.response.state_progress == "100%" and workflow.stale: logger.info("Requesting PACS push.", extra=d) do_pfdcm_push(request, pfdcm_url) case State.PUSHING: + logger.info(f"Pushing progress {workflow.response.state_progress}", extra=d) if workflow.response.state_progress == "100%" and workflow.stale: logger.info("Requesting PACS register.", extra=d) do_pfdcm_register(request, pfdcm_url) case State.REGISTERING: + logger.info(f"Registering progress {workflow.response.state_progress}", extra=d) if workflow.response.state_progress == "100%" and workflow.stale: try: resp = do_cube_create_feed(request, cube_url) pl_inst_id = resp["pl_inst_id"] feed_id = resp["feed_id"] + logger.info(f"New feed created with feed_id {feed_id}", extra=d) workflow.response.feed_id = feed_id update_workflow(key, workflow) except Exception as ex: @@ -282,7 +286,7 @@ def __run_plugin_instance(previous_id: str, request: WorkflowRequestSchema, clie # convert CLI params from string to a JSON dictionary feed_params = str_to_param_dict(request.workflow_info.plugin_params) feed_params["previous_id"] = previous_id - logger.debug(f"Creating new analysis with plugin: {plugin_search_params} and parameters: {feed_params}", + logger.info(f"Creating new analysis with plugin: {plugin_search_params} and parameters: {feed_params}", extra=d) feed_resp = client.createFeed(plugin_id, feed_params) diff --git a/app/log.py b/app/log.py index 016b65b..b55f709 100644 --- a/app/log.py +++ b/app/log.py @@ -5,8 +5,8 @@ "formatters": { "default": { "()": "uvicorn.logging.DefaultFormatter", - "fmt": '%(log_color)s {"worker":"%(workername)s", "timestamp":"%(' - 'asctime)s", "key":"%(key)s", "%(levelname)s":"%(message)s"}\33[0m', + "fmt": '%(log_color)s {"worker":"%(workername)16s", "timestamp":"%(' + 'asctime)s", "key":"%(key)s", "%(levelname)-8s":"%(message)s"}\33[0m', "datefmt": "%Y-%m-%d %H:%M:%S", }, diff --git a/docker-compose.yml b/docker-compose.yml index 02355d7..f688b34 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: PFLINK_MONGODB: mongodb://pflink-db:27017 MONGO_USERNAME: admin MONGO_PASSWORD: admin - LOG_LEVEL: DEBUG + LOG_LEVEL: INFO ports: - "8050:8050" networks: From 1f5894a0766eabbc986258602155a708c52b983c Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Fri, 1 Sep 2023 14:17:41 -0400 Subject: [PATCH 17/19] Added new logging info --- app/controllers/subprocesses/status.py | 2 +- app/controllers/subprocesses/wf_manager.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index 856395b..91686e4 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -338,7 +338,7 @@ def _parse_response( status.status = False status.error = Error.feed_deleted.value status.state_progress = "0%" - + logger.info(f"Current status is {status.workflow_state}", extra=d) return status diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 08cb6c8..5fa4e06 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -118,7 +118,7 @@ def manage_workflow(db_key: str, test: bool): time.sleep(SLEEP_TIME) workflow = retrieve_workflow(key) - logger.debug(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", + logger.info(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", extra=d) # Reset workflow status if max service_retry is not reached @@ -257,7 +257,7 @@ def do_cube_create_feed(request: WorkflowRequestSchema, cube_url: str) -> dict: plugin_search_params = {"name": "pl-dircopy"} plugin_id = client.getPluginId(plugin_search_params) - logger.info(f"Creating a new feed with feed name: {feed_name}", extra=d) + logger.info(f"Creating a new feed with feed name {feed_name}", extra=d) # create a feed feed_params = {'title': feed_name, 'dir': data_path} feed_response = client.createFeed(plugin_id, feed_params) From a4cc979cf57be4b73c180c12c70dea9ba4624d58 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Tue, 5 Sep 2023 12:56:16 -0400 Subject: [PATCH 18/19] Fixed alignment issues in logs --- app/log.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/log.py b/app/log.py index b55f709..29cbc62 100644 --- a/app/log.py +++ b/app/log.py @@ -1,4 +1,5 @@ from app.config import settings + log_config = { "version": 1, "disable_existing_loggers": False, @@ -6,7 +7,7 @@ "default": { "()": "uvicorn.logging.DefaultFormatter", "fmt": '%(log_color)s {"worker":"%(workername)16s", "timestamp":"%(' - 'asctime)s", "key":"%(key)s", "%(levelname)-8s":"%(message)s"}\33[0m', + 'asctime)s", "key":"%(key)s", "level":"%(levelname)-8s", "msg":"%(message)s"}\33[0m', "datefmt": "%Y-%m-%d %H:%M:%S", }, From 4319ee85fd9426592128c159308daa48ddd01723 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Thu, 7 Sep 2023 15:37:35 -0400 Subject: [PATCH 19/19] Logging fixes --- app/config.py | 2 +- app/controllers/subprocesses/status.py | 8 +++---- app/controllers/subprocesses/wf_manager.py | 27 ++++++++++++++-------- app/controllers/workflow.py | 1 + app/log.py | 4 ++-- 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/app/config.py b/app/config.py index a2ab84b..a0c799c 100644 --- a/app/config.py +++ b/app/config.py @@ -3,7 +3,7 @@ class Settings(BaseSettings): pflink_mongodb: MongoDsn = 'mongodb://localhost:27017' - version: str = "3.7.0" + version: str = "3.8.0" mongo_username: str = "admin" mongo_password: str = "admin" log_level: str = "INFO" diff --git a/app/controllers/subprocesses/status.py b/app/controllers/subprocesses/status.py index 91686e4..2634d11 100644 --- a/app/controllers/subprocesses/status.py +++ b/app/controllers/subprocesses/status.py @@ -51,7 +51,7 @@ def update_workflow_status(key: str, test: bool): if is_status_subprocess_running(workflow): return - logger.info(f"Working on fetching the current status, locking DB flag", extra=d) + logger.info(f"Working on fetching the current status, locking DB flag.", extra=d) update_status_flag(key, workflow, False, test) if test: @@ -61,9 +61,9 @@ def update_workflow_status(key: str, test: bool): workflow.response = update_workflow_progress(updated_status) pretty_response = pprint.pformat(workflow.response.__dict__) - logger.debug(f"Updated response: {pretty_response}", extra=d) + logger.debug(f"Updated response: {pretty_response}.", extra=d) update_status_flag(key, workflow, True, test) - logger.info(f"Finished writing updated status to the DB, releasing lock", extra=d) + logger.info(f"Finished writing updated status to the DB, releasing lock.", extra=d) def update_workflow_progress(response: WorkflowStatusResponseSchema): @@ -338,7 +338,7 @@ def _parse_response( status.status = False status.error = Error.feed_deleted.value status.state_progress = "0%" - logger.info(f"Current status is {status.workflow_state}", extra=d) + logger.info(f"Current status is {status.workflow_state}.", extra=d) return status diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index 5fa4e06..6d951fc 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -45,13 +45,16 @@ def manage_workflow(db_key: str, test: bool): SLEEP_TIME = 10 MAX_RETRIES = 50 pl_inst_id = 0 + workflow = retrieve_workflow(db_key) + logger.info(f"Fetching request status from the DB. Current status is {workflow.response.workflow_state}.", extra=d) + if workflow.started or not workflow.response.status or test: # Do nothing and return - reason = f"Workflow request failed. {workflow.response.error}" if not workflow.response.status \ - else f"Workflow already started. The current status is {workflow.response.workflow_state}" + reason = f"Workflow request failed. {workflow.response.error}." if not workflow.response.status \ + else f"Workflow already started." logger.warning(f"Cannot restart this workflow request. {reason}" - f". Kindly delete this request to restart using the delete API end point", extra=d) + f" Kindly delete this request to restart using the delete API end point.", extra=d) return request = workflow.request @@ -74,25 +77,25 @@ def manage_workflow(db_key: str, test: bool): do_pfdcm_retrieve(request, pfdcm_url) case State.RETRIEVING: - logger.info(f"Retrieving progress {workflow.response.state_progress}", extra=d) + logger.info(f"Retrieving progress is {workflow.response.state_progress} complete.", extra=d) if workflow.response.state_progress == "100%" and workflow.stale: logger.info("Requesting PACS push.", extra=d) do_pfdcm_push(request, pfdcm_url) case State.PUSHING: - logger.info(f"Pushing progress {workflow.response.state_progress}", extra=d) + logger.info(f"Pushing progress is {workflow.response.state_progress} complete.", extra=d) if workflow.response.state_progress == "100%" and workflow.stale: logger.info("Requesting PACS register.", extra=d) do_pfdcm_register(request, pfdcm_url) case State.REGISTERING: - logger.info(f"Registering progress {workflow.response.state_progress}", extra=d) + logger.info(f"Registering progress is {workflow.response.state_progress} complete.", extra=d) if workflow.response.state_progress == "100%" and workflow.stale: try: resp = do_cube_create_feed(request, cube_url) pl_inst_id = resp["pl_inst_id"] feed_id = resp["feed_id"] - logger.info(f"New feed created with feed_id {feed_id}", extra=d) + logger.info(f"New feed created with feed_id {feed_id}.", extra=d) workflow.response.feed_id = feed_id update_workflow(key, workflow) except Exception as ex: @@ -114,11 +117,11 @@ def manage_workflow(db_key: str, test: bool): logger.info(f"Calling status update subprocess.", extra=d) update_status(request) - logger.info(f"Sleeping for {SLEEP_TIME} seconds", extra=d) + logger.info(f"Sleeping for {SLEEP_TIME} seconds.", extra=d) time.sleep(SLEEP_TIME) workflow = retrieve_workflow(key) - logger.info(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}", + logger.info(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}.", extra=d) # Reset workflow status if max service_retry is not reached @@ -128,6 +131,9 @@ def manage_workflow(db_key: str, test: bool): workflow.response.error = "" workflow.response.status = True update_workflow(key, workflow) + if workflow.service_retry <= 0: logger.warning("All retries exhausted. Giving up on this workflow request.", extra=d) + + # Reset workflow if pflink reached MAX no. of retries @@ -154,6 +160,7 @@ def update_status(request: WorkflowRequestSchema): return d_cmd = ["python", "app/controllers/subprocesses/status.py", "--data", str_data] pretty_cmd = pprint.pformat(d_cmd) + time.sleep(2) logger.debug(f"New status subprocess started with command: {pretty_cmd}", extra=d) process = subprocess.Popen(d_cmd) @@ -299,6 +306,8 @@ def __run_pipeline_instance(previous_id: str, request: WorkflowRequestSchema, cl pipeline_search_params = {"name": request.workflow_info.pipeline_name} pipeline_id = client.getPipelineId(pipeline_search_params) pipeline_params = {"previous_plugin_inst_id": previous_id, "name": request.workflow_info.pipeline_name} + logger.info(f"Creating new analysis with pipeline: {pipeline_search_params}.", + extra=d) feed_resp = client.createWorkflow(str(pipeline_id), pipeline_params) diff --git a/app/controllers/workflow.py b/app/controllers/workflow.py index 4ac6c2c..ee99b3e 100644 --- a/app/controllers/workflow.py +++ b/app/controllers/workflow.py @@ -138,6 +138,7 @@ async def post_workflow( # run workflow manager subprocess on the workflow sub_mng = manage_workflow(str_data, mode) + logger.info(f"Current status response is {workflow.response.workflow_state}.", extra=d) logger.debug(f"Status response is {workflow.response}", extra=d) return workflow.response diff --git a/app/log.py b/app/log.py index 29cbc62..b5cfb36 100644 --- a/app/log.py +++ b/app/log.py @@ -6,8 +6,8 @@ "formatters": { "default": { "()": "uvicorn.logging.DefaultFormatter", - "fmt": '%(log_color)s {"worker":"%(workername)16s", "timestamp":"%(' - 'asctime)s", "key":"%(key)s", "level":"%(levelname)-8s", "msg":"%(message)s"}\33[0m', + "fmt": '%(log_color)s {"worker":"%(workername)-16s", "timestamp":"%(' + 'asctime)s", "db_key":"%(key)s", "level":"%(levelname)-8s", "msg":"%(message)s"}\33[0m', "datefmt": "%Y-%m-%d %H:%M:%S", },