From 4846b3185a3c20780485a0df65da54d1e6a8e264 Mon Sep 17 00:00:00 2001 From: Harshit Chopra Date: Thu, 7 Dec 2023 13:42:46 +0530 Subject: [PATCH] No calls found from FSM shouldnt fail pipeline (#131) * No calls found from FSM shouldnt fail pipeline --- CHANGELOG.md | 3 + pyproject.toml | 2 +- skit_pipelines/components/fetch_calls.py | 10 +- .../fetch_n_tag_turns_and_calls/__init__.py | 113 ++++++++++-------- 4 files changed, 73 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a81e46ed..f6075660 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +1.1.5 +- [x] fix: No calls found from FSM shouldnt fail data_fetch pipelines + 1.1.4 - [x] fix: Intent list was not coming in comprision file while training or evaluate slu. diff --git a/pyproject.toml b/pyproject.toml index 9e448501..d21ca7de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "skit-pipelines" -version = "1.1.4" +version = "1.1.5" description = "Kubeflow components for ml workflows at skit.ai." authors = ["ltbringer "] license = "MIT" diff --git a/skit_pipelines/components/fetch_calls.py b/skit_pipelines/components/fetch_calls.py index 2938809e..dd902f52 100644 --- a/skit_pipelines/components/fetch_calls.py +++ b/skit_pipelines/components/fetch_calls.py @@ -104,12 +104,12 @@ def fetch_calls( flow_ids=flow_ids ) logger.info(f"Finished in {time.time() - start:.2f} seconds") + logger.info(f"Obtained {maybe_df.shape[0]} calls from FSM Db before removing empty audios") if not maybe_df.size: - raise ValueError("No calls found for the above parameters") + return "" _, file_path = tempfile.mkstemp(suffix=const.CSV_FILE) maybe_df.to_csv(file_path, index=False) - logger.info(f"Obtained {maybe_df.shape[0]} calls from FSM Db before removing empty audios") def empty_audios_remover(df: pd.DataFrame, df_path: str): audios_dir_path = tempfile.mkdtemp() @@ -133,12 +133,14 @@ def empty_audios_remover(df: pd.DataFrame, df_path: str): ) ].drop("audio_filename", axis=1) if not df_final.size: - raise ValueError("No calls found for the above parameters") + return False logger.info(f"Obtained {df_final.shape[0]} calls after removing empty audios") df_final.to_csv(df_path, index=False) + return True if remove_empty_audios: - empty_audios_remover(df=maybe_df, df_path=file_path) + if not empty_audios_remover(df=maybe_df, df_path=file_path): + return "" client_id_string = "-".join(client_id) if isinstance(client_id, list) else client_id s3_path = upload2s3( file_path, diff --git a/skit_pipelines/pipelines/fetch_n_tag_turns_and_calls/__init__.py b/skit_pipelines/pipelines/fetch_n_tag_turns_and_calls/__init__.py index 8287c057..9198e9e4 100644 --- a/skit_pipelines/pipelines/fetch_n_tag_turns_and_calls/__init__.py +++ b/skit_pipelines/pipelines/fetch_n_tag_turns_and_calls/__init__.py @@ -220,61 +220,74 @@ def fetch_n_tag_turns_and_calls( "P0D" # disables caching ) - # Get intent response from GPT for qualifying turns - gpt_response_path = fetch_gpt_intent_prediction_op( - s3_file_path=calls.output, use_assisted_annotation=use_assisted_annotation - ) - - # uploads data for turn level intent, entity & transcription tagging - tag_turns_output = tag_calls_op( - input_file=gpt_response_path.output, - project_id=labelstudio_project_id, - data_label=data_label, - ) - - fetch_slot_and_calls_output = fetch_calls_for_slots_op( - untagged_records_path=calls.output, - org_id=org_id, - language_code=lang, - start_date=start_date, - end_date=end_date, - ) - - # uploads data for call & slot level tagging to labelstudio - tag_calls_output = tag_calls_op( - input_file=fetch_slot_and_calls_output.output, - call_project_id=call_project_id, - data_label=data_label, - ) - - with kfp.dsl.Condition(notify != "", "notify").after(tag_turns_output) as check1: - df_sizes = tag_turns_output.outputs["df_sizes"] - errors = tag_turns_output.outputs["errors"] - - notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}. - Uploaded {getattr(calls, 'output')} ({df_sizes}, {org_id=}) for tagging to {labelstudio_project_id=}.""" - notification_text += f"\nErrors: {errors}" if errors else "" - - task_no_cache = slack_notification_op( - notification_text, channel=channel, cc=notify, thread_id=slack_thread - ) - task_no_cache.execution_options.caching_strategy.max_cache_staleness = ( - "P0D" # disables caching + with kfp.dsl.Condition(calls.output != "", "calls_found").after(calls): + # Get intent response from GPT for qualifying turns + gpt_response_path = fetch_gpt_intent_prediction_op( + s3_file_path=calls.output, use_assisted_annotation=use_assisted_annotation ) - df_sizes2 = tag_calls_output.outputs["df_sizes"] - errors2 = tag_calls_output.outputs["errors"] - - notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}. - Uploaded {getattr(fetch_slot_and_calls_output, 'output')} ({df_sizes2}, {org_id=}) for call & slot tagging to {call_project_id=}.""" - notification_text += f"\nErrors: {errors2}" if errors else "" + # uploads data for turn level intent, entity & transcription tagging + tag_turns_output = tag_calls_op( + input_file=gpt_response_path.output, + project_id=labelstudio_project_id, + data_label=data_label, + ) - task_no_cache2 = slack_notification_op( - notification_text, channel=channel, cc=notify, thread_id=slack_thread + fetch_slot_and_calls_output = fetch_calls_for_slots_op( + untagged_records_path=calls.output, + org_id=org_id, + language_code=lang, + start_date=start_date, + end_date=end_date, ) - task_no_cache2.execution_options.caching_strategy.max_cache_staleness = ( - "P0D" # disables caching + + # uploads data for call & slot level tagging to labelstudio + tag_calls_output = tag_calls_op( + input_file=fetch_slot_and_calls_output.output, + call_project_id=call_project_id, + data_label=data_label, ) + with kfp.dsl.Condition(notify != "", "notify").after(tag_turns_output): + df_sizes = tag_turns_output.outputs["df_sizes"] + errors = tag_turns_output.outputs["errors"] + + notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}. + Uploaded {getattr(calls, 'output')} ({df_sizes}, {org_id=}) for tagging to {labelstudio_project_id=}.""" + notification_text += f"\nErrors: {errors}" if errors else "" + + task_no_cache = slack_notification_op( + notification_text, channel=channel, cc=notify, thread_id=slack_thread + ) + task_no_cache.execution_options.caching_strategy.max_cache_staleness = ( + "P0D" # disables caching + ) + + df_sizes2 = tag_calls_output.outputs["df_sizes"] + errors2 = tag_calls_output.outputs["errors"] + + notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}. + Uploaded {getattr(fetch_slot_and_calls_output, 'output')} ({df_sizes2}, {org_id=}) for call & slot tagging to {call_project_id=}.""" + notification_text += f"\nErrors: {errors2}" if errors else "" + + task_no_cache2 = slack_notification_op( + notification_text, channel=channel, cc=notify, thread_id=slack_thread + ) + task_no_cache2.execution_options.caching_strategy.max_cache_staleness = ( + "P0D" # disables caching + ) + + with kfp.dsl.Condition(calls.output == "", "no_calls").after(calls): + with kfp.dsl.Condition(notify != "", "notify").after(calls): + notification_text = f"""No calls could be found from {start_date} to {end_date} for {client_id=}. + Please verify the parameters you have used or refer to the debugging guide on Notion.""" + + task_no_cache2 = slack_notification_op( + notification_text, channel=channel, cc=notify, thread_id=slack_thread + ) + task_no_cache2.execution_options.caching_strategy.max_cache_staleness = ( + "P0D" # disables caching + ) + __all__ = ["fetch_n_tag_turns_and_calls"]