Skip to content

Commit

Permalink
Logging fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandip117 committed Sep 7, 2023
1 parent a4cc979 commit 4319ee8
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 16 deletions.
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class Settings(BaseSettings):
pflink_mongodb: MongoDsn = 'mongodb://localhost:27017'
version: str = "3.7.0"
version: str = "3.8.0"
mongo_username: str = "admin"
mongo_password: str = "admin"
log_level: str = "INFO"
Expand Down
8 changes: 4 additions & 4 deletions app/controllers/subprocesses/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def update_workflow_status(key: str, test: bool):
if is_status_subprocess_running(workflow):
return

logger.info(f"Working on fetching the current status, locking DB flag", extra=d)
logger.info(f"Working on fetching the current status, locking DB flag.", extra=d)
update_status_flag(key, workflow, False, test)

if test:
Expand All @@ -61,9 +61,9 @@ def update_workflow_status(key: str, test: bool):

workflow.response = update_workflow_progress(updated_status)
pretty_response = pprint.pformat(workflow.response.__dict__)
logger.debug(f"Updated response: {pretty_response}", extra=d)
logger.debug(f"Updated response: {pretty_response}.", extra=d)
update_status_flag(key, workflow, True, test)
logger.info(f"Finished writing updated status to the DB, releasing lock", extra=d)
logger.info(f"Finished writing updated status to the DB, releasing lock.", extra=d)


def update_workflow_progress(response: WorkflowStatusResponseSchema):
Expand Down Expand Up @@ -338,7 +338,7 @@ def _parse_response(
status.status = False
status.error = Error.feed_deleted.value
status.state_progress = "0%"
logger.info(f"Current status is {status.workflow_state}", extra=d)
logger.info(f"Current status is {status.workflow_state}.", extra=d)
return status


Expand Down
27 changes: 18 additions & 9 deletions app/controllers/subprocesses/wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ def manage_workflow(db_key: str, test: bool):
SLEEP_TIME = 10
MAX_RETRIES = 50
pl_inst_id = 0

workflow = retrieve_workflow(db_key)
logger.info(f"Fetching request status from the DB. Current status is {workflow.response.workflow_state}.", extra=d)

if workflow.started or not workflow.response.status or test:
# Do nothing and return
reason = f"Workflow request failed. {workflow.response.error}" if not workflow.response.status \
else f"Workflow already started. The current status is {workflow.response.workflow_state}"
reason = f"Workflow request failed. {workflow.response.error}." if not workflow.response.status \
else f"Workflow already started."
logger.warning(f"Cannot restart this workflow request. {reason}"
f". Kindly delete this request to restart using the delete API end point", extra=d)
f" Kindly delete this request to restart using the delete API end point.", extra=d)
return

request = workflow.request
Expand All @@ -74,25 +77,25 @@ def manage_workflow(db_key: str, test: bool):
do_pfdcm_retrieve(request, pfdcm_url)

case State.RETRIEVING:
logger.info(f"Retrieving progress {workflow.response.state_progress}", extra=d)
logger.info(f"Retrieving progress is {workflow.response.state_progress} complete.", extra=d)
if workflow.response.state_progress == "100%" and workflow.stale:
logger.info("Requesting PACS push.", extra=d)
do_pfdcm_push(request, pfdcm_url)

case State.PUSHING:
logger.info(f"Pushing progress {workflow.response.state_progress}", extra=d)
logger.info(f"Pushing progress is {workflow.response.state_progress} complete.", extra=d)
if workflow.response.state_progress == "100%" and workflow.stale:
logger.info("Requesting PACS register.", extra=d)
do_pfdcm_register(request, pfdcm_url)

case State.REGISTERING:
logger.info(f"Registering progress {workflow.response.state_progress}", extra=d)
logger.info(f"Registering progress is {workflow.response.state_progress} complete.", extra=d)
if workflow.response.state_progress == "100%" and workflow.stale:
try:
resp = do_cube_create_feed(request, cube_url)
pl_inst_id = resp["pl_inst_id"]
feed_id = resp["feed_id"]
logger.info(f"New feed created with feed_id {feed_id}", extra=d)
logger.info(f"New feed created with feed_id {feed_id}.", extra=d)
workflow.response.feed_id = feed_id
update_workflow(key, workflow)
except Exception as ex:
Expand All @@ -114,11 +117,11 @@ def manage_workflow(db_key: str, test: bool):
logger.info(f"Calling status update subprocess.", extra=d)
update_status(request)

logger.info(f"Sleeping for {SLEEP_TIME} seconds", extra=d)
logger.info(f"Sleeping for {SLEEP_TIME} seconds.", extra=d)
time.sleep(SLEEP_TIME)

workflow = retrieve_workflow(key)
logger.info(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}",
logger.info(f"Fetching request status from DB. Current status is {workflow.response.workflow_state}.",
extra=d)

# Reset workflow status if max service_retry is not reached
Expand All @@ -128,6 +131,9 @@ def manage_workflow(db_key: str, test: bool):
workflow.response.error = ""
workflow.response.status = True
update_workflow(key, workflow)
if workflow.service_retry <= 0: logger.warning("All retries exhausted. Giving up on this workflow request.", extra=d)




# Reset workflow if pflink reached MAX no. of retries
Expand All @@ -154,6 +160,7 @@ def update_status(request: WorkflowRequestSchema):
return
d_cmd = ["python", "app/controllers/subprocesses/status.py", "--data", str_data]
pretty_cmd = pprint.pformat(d_cmd)
time.sleep(2)
logger.debug(f"New status subprocess started with command: {pretty_cmd}", extra=d)
process = subprocess.Popen(d_cmd)

Expand Down Expand Up @@ -299,6 +306,8 @@ def __run_pipeline_instance(previous_id: str, request: WorkflowRequestSchema, cl
pipeline_search_params = {"name": request.workflow_info.pipeline_name}
pipeline_id = client.getPipelineId(pipeline_search_params)
pipeline_params = {"previous_plugin_inst_id": previous_id, "name": request.workflow_info.pipeline_name}
logger.info(f"Creating new analysis with pipeline: {pipeline_search_params}.",
extra=d)
feed_resp = client.createWorkflow(str(pipeline_id), pipeline_params)


Expand Down
1 change: 1 addition & 0 deletions app/controllers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ async def post_workflow(

# run workflow manager subprocess on the workflow
sub_mng = manage_workflow(str_data, mode)
logger.info(f"Current status response is {workflow.response.workflow_state}.", extra=d)
logger.debug(f"Status response is {workflow.response}", extra=d)
return workflow.response

Expand Down
4 changes: 2 additions & 2 deletions app/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"formatters": {
"default": {
"()": "uvicorn.logging.DefaultFormatter",
"fmt": '%(log_color)s {"worker":"%(workername)16s", "timestamp":"%('
'asctime)s", "key":"%(key)s", "level":"%(levelname)-8s", "msg":"%(message)s"}\33[0m',
"fmt": '%(log_color)s {"worker":"%(workername)-16s", "timestamp":"%('
'asctime)s", "db_key":"%(key)s", "level":"%(levelname)-8s", "msg":"%(message)s"}\33[0m',
"datefmt": "%Y-%m-%d %H:%M:%S",

},
Expand Down

0 comments on commit 4319ee8

Please sign in to comment.