Skip to content

Commit

Permalink
Merge pull request #35 from FNNDSC/delete-single-workflow
Browse files Browse the repository at this point in the history
Delete single workflow
  • Loading branch information
Sandip117 committed Sep 7, 2023
2 parents 37deaba + 4319ee8 commit 327d2a6
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 99 deletions.
6 changes: 4 additions & 2 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

class Settings(BaseSettings):
pflink_mongodb: MongoDsn = 'mongodb://localhost:27017'
version: str = "3.6.0"
version: str = "3.8.0"
mongo_username: str = "admin"
mongo_password: str = "admin"
log_level: str = "INFO"


class Auth(BaseSettings):
JWT_SECRET_KEY: str = "aad10a452df3f4451c975a0a2982b159c5088eb0f5da816a1fb129f473c0ddc7" # should be kept secret
user_name: str = 'pflink'
password: str = 'pflink1234'


settings = Settings()
auth = Auth()
34 changes: 15 additions & 19 deletions app/controllers/pfdcm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import hashlib
import httpx
import pymongo.results

from app.config import settings
from pymongo import MongoClient

MONGO_DETAILS = str(settings.pflink_mongodb)
client = MongoClient(MONGO_DETAILS)
client = MongoClient(MONGO_DETAILS, username=settings.mongo_username, password=settings.mongo_password)
database = client.database
pfdcm_collection = database.get_collection("pfdcms_collection")

Expand All @@ -14,20 +15,13 @@


def pfdcm_helper(pfdcm) -> dict:
key = str_to_hash(pfdcm["service_name"])
return {
"_id": key,
"_id": pfdcm["service_name"],
"service_name": pfdcm["service_name"],
"service_address": pfdcm["service_address"],
}


def str_to_hash(str_data: str) -> str:
hash_request = hashlib.md5(str_data.encode())
key = hash_request.hexdigest()
return key


# Retrieve all pfdcm records present in the database
async def retrieve_pfdcms():
pfdcms = [pfdcm["service_name"] for pfdcm in pfdcm_collection.find()]
Expand All @@ -40,11 +34,14 @@ async def add_pfdcm(pfdcm_data: dict) -> dict:
DB constraint: Only unique names allowed
"""
try:
pfdcm = pfdcm_collection.insert_one(pfdcm_helper(pfdcm_data))
new_pfdcm = pfdcm_collection.find_one({"_id": pfdcm.inserted_id})
return pfdcm_helper(new_pfdcm)
except:
return {}
pfdcm: pymongo.results.InsertOneResult = pfdcm_collection.insert_one(pfdcm_helper(pfdcm_data))
if pfdcm.acknowledged:
inserted_pfdcm: dict = pfdcm_collection.find_one({"_id": pfdcm.inserted_id})
return inserted_pfdcm
else:
raise Exception("Could not store new record.")
except Exception as ex:
return {"error": str(ex)}


# Retrieve a pfdcm record with a matching service name
Expand All @@ -66,8 +63,8 @@ async def hello_pfdcm(pfdcm_name: str) -> dict:
response = await client.get(pfdcm_hello_api)
d_results = json.loads(response.text)
return d_results
except:
return {"error": f"Unable to reach {pfdcm_url}."}
except Exception as ex:
return {"error": f"Unable to reach {pfdcm_url}.{ex}"}


async def about_pfdcm(service_name: str) -> dict:
Expand Down Expand Up @@ -141,9 +138,8 @@ async def delete_pfdcm(service_name: str):
Delete a pfdcm record from the DB
"""
delete_count = 0
key = str_to_hash(service_name)
for pfdcm in pfdcm_collection.find():
if pfdcm["_id"] == key:
if pfdcm["_id"] == service_name:
pfdcm_collection.delete_one({"_id": pfdcm["_id"]})
delete_count += 1
return {"Message": f"{delete_count} record(s) deleted!"}
Expand Down
33 changes: 33 additions & 0 deletions app/controllers/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from app.models.workflow import WorkflowSearchSchema


def compound_queries(query_params: WorkflowSearchSchema):
query = {"$match": {"$text":
{"$search": query_params.keywords}

}}

rank = {
"$sort": {"score": {"$meta": "textScore"}}
}

response = {
"$project": {
"_id": 1
}
}

return query, rank, response


def index_search(query_params: dict):
query = {
"$match": {
"index": "some_index",
"text": {
"query": "chris",
"path": "request.cube_user_info.username"
}
}
}
return query
43 changes: 31 additions & 12 deletions app/controllers/subprocesses/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import logging
import random
import requests

import time
import pprint
from logging.config import dictConfig
from app import log
from app.models.workflow import (
State,
WorkflowRequestSchema,
Expand All @@ -26,12 +29,10 @@
)

from app.controllers.subprocesses.subprocess_helper import get_process_count
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(
format=log_format,
level=logging.INFO,
datefmt="%H:%M:%S"
)
dictConfig(log.log_config)
logger = logging.getLogger('pflink-logger')
d = {'workername': 'STATUS_MGR', 'log_color': "\33[36m", 'key': ""}


parser = argparse.ArgumentParser(description='Process arguments passed through CLI')
parser.add_argument('--data', type=str)
Expand All @@ -50,7 +51,7 @@ def update_workflow_status(key: str, test: bool):
if is_status_subprocess_running(workflow):
return

logging.info(f"WORKING on updating the status for {key}, locking--")
logger.info(f"Working on fetching the current status, locking DB flag.", extra=d)
update_status_flag(key, workflow, False, test)

if test:
Expand All @@ -59,8 +60,10 @@ def update_workflow_status(key: str, test: bool):
updated_status = get_current_status(workflow.request, workflow.response)

workflow.response = update_workflow_progress(updated_status)
pretty_response = pprint.pformat(workflow.response.__dict__)
logger.debug(f"Updated response: {pretty_response}.", extra=d)
update_status_flag(key, workflow, True, test)
logging.info(f"UPDATED status for {key}, releasing lock")
logger.info(f"Finished writing updated status to the DB, releasing lock.", extra=d)


def update_workflow_progress(response: WorkflowStatusResponseSchema):
Expand Down Expand Up @@ -170,11 +173,20 @@ def _get_pfdcm_status(request: WorkflowRequestSchema):
"json_response": True
}
}
pretty_json = pprint.pformat(pfdcm_body)
logger.debug(f"POSTing the below request at {pfdcm_status_url} to get status: {pretty_json}", extra=d)
st = time.time()
response = requests.post(pfdcm_status_url, json=pfdcm_body, headers=headers)
et = time.time()
elapsed_time = et - st
logger.debug(f'Execution time to get status:{elapsed_time} seconds', extra=d)
d_response = json.loads(response.text)
#logger.debug(f"Response from pfdcm: {d_response}")
d_response["service_name"] = request.pfdcm_info.pfdcm_service
return d_response
except Exception as ex:
logger.error(f"{Error.pfdcm.value} {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}",
extra=d)
return {"error": Error.pfdcm.value + f" {str(ex)} for pfdcm_service {request.pfdcm_info.pfdcm_service}"}


Expand All @@ -201,13 +213,18 @@ def _get_feed_status(request: WorkflowRequestSchema, feed_id: str) -> dict:
feed_name = substitute_dicom_tags(requested_feed_name, pacs_details)

# search for feed
logger.debug(f"Request CUBE at {cube_url} for feed id: {feed_id} and feed name: {feed_name}", extra=d)
resp = cl.getFeed({"id": feed_id, "name_exact": feed_name})
if resp["errored_jobs"] or resp["cancelled_jobs"]:
pretty_response = pprint.pformat(resp)
logger.debug(f"Response from CUBE : {pretty_response}", extra=d)
if resp.get("errored_jobs") or resp.get("cancelled_jobs"):
l_inst_resp = cl.getPluginInstances({"feed_id": feed_id})
l_error = [d_instance['plugin_name'] for d_instance in l_inst_resp['data'] if d_instance['status']=='finishedWithError' or d_instance['status'] == 'cancelled']
l_error = [d_instance['plugin_name'] for d_instance in l_inst_resp['data']
if d_instance['status']=='finishedWithError' or d_instance['status'] == 'cancelled']
resp["errored_plugins"] = str(l_error)
return resp
except Exception as ex:
logger.error(f"{Error.cube.value} {str(ex)}", extra=d)
return {"error": Error.cube.value + str(ex)}


Expand Down Expand Up @@ -320,7 +337,8 @@ def _parse_response(
status.workflow_state = State.FEED_DELETED
status.status = False
status.error = Error.feed_deleted.value

status.state_progress = "0%"
logger.info(f"Current status is {status.workflow_state}.", extra=d)
return status


Expand Down Expand Up @@ -397,4 +415,5 @@ def __get_progress_from_text(progress: str):
"""
dict_data = json.loads(args.data)
wf_key = dict_to_hash(dict_data)
d['key'] = wf_key
update_workflow_status(wf_key, args.test)
7 changes: 6 additions & 1 deletion app/controllers/subprocesses/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import hashlib
import json
import requests
Expand All @@ -13,7 +14,7 @@
)
MONGO_DETAILS = str(settings.pflink_mongodb)

client = MongoClient(MONGO_DETAILS)
client = MongoClient(MONGO_DETAILS, username=settings.mongo_username, password=settings.mongo_password)

database = client.database

Expand Down Expand Up @@ -41,8 +42,10 @@ def workflow_retrieve_helper(workflow: dict) -> WorkflowDBSchema:
return WorkflowDBSchema(
key=workflow["_id"],
fingerprint=workflow["fingerprint"],
creation_time=datetime.datetime.min if not workflow.get("creation_time") else workflow["creation_time"],
request=request,
response=workflow["response"],
service_retry=workflow["service_retry"],
stale=workflow["stale"],
started=workflow["started"],
)
Expand All @@ -59,8 +62,10 @@ def workflow_add_helper(workflow: WorkflowDBSchema) -> dict:
return {
"_id": workflow.key,
"fingerprint": workflow.fingerprint,
"creation_time": workflow.creation_time,
"request": d_request,
"response": workflow.response.__dict__,
"service_retry": workflow.service_retry,
"stale": workflow.stale,
"started": workflow.started,
}
Expand Down
Loading

0 comments on commit 327d2a6

Please sign in to comment.