Skip to content

Commit

Permalink
No calls found from FSM shouldnt fail pipeline (#131)
Browse files Browse the repository at this point in the history
* No calls found from FSM shouldnt fail pipeline
  • Loading branch information
harshithere authored Dec 7, 2023
1 parent 4e75874 commit 4846b31
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 55 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

1.1.5
- [x] fix: No calls found from FSM shouldnt fail data_fetch pipelines

1.1.4
- [x] fix: Intent list was not coming in comprision file while training or evaluate slu.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "skit-pipelines"
version = "1.1.4"
version = "1.1.5"
description = "Kubeflow components for ml workflows at skit.ai."
authors = ["ltbringer <[email protected]>"]
license = "MIT"
Expand Down
10 changes: 6 additions & 4 deletions skit_pipelines/components/fetch_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def fetch_calls(
flow_ids=flow_ids
)
logger.info(f"Finished in {time.time() - start:.2f} seconds")
logger.info(f"Obtained {maybe_df.shape[0]} calls from FSM Db before removing empty audios")
if not maybe_df.size:
raise ValueError("No calls found for the above parameters")
return ""

_, file_path = tempfile.mkstemp(suffix=const.CSV_FILE)
maybe_df.to_csv(file_path, index=False)
logger.info(f"Obtained {maybe_df.shape[0]} calls from FSM Db before removing empty audios")

def empty_audios_remover(df: pd.DataFrame, df_path: str):
audios_dir_path = tempfile.mkdtemp()
Expand All @@ -133,12 +133,14 @@ def empty_audios_remover(df: pd.DataFrame, df_path: str):
)
].drop("audio_filename", axis=1)
if not df_final.size:
raise ValueError("No calls found for the above parameters")
return False
logger.info(f"Obtained {df_final.shape[0]} calls after removing empty audios")
df_final.to_csv(df_path, index=False)
return True

if remove_empty_audios:
empty_audios_remover(df=maybe_df, df_path=file_path)
if not empty_audios_remover(df=maybe_df, df_path=file_path):
return ""
client_id_string = "-".join(client_id) if isinstance(client_id, list) else client_id
s3_path = upload2s3(
file_path,
Expand Down
113 changes: 63 additions & 50 deletions skit_pipelines/pipelines/fetch_n_tag_turns_and_calls/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,61 +220,74 @@ def fetch_n_tag_turns_and_calls(
"P0D" # disables caching
)

# Get intent response from GPT for qualifying turns
gpt_response_path = fetch_gpt_intent_prediction_op(
s3_file_path=calls.output, use_assisted_annotation=use_assisted_annotation
)

# uploads data for turn level intent, entity & transcription tagging
tag_turns_output = tag_calls_op(
input_file=gpt_response_path.output,
project_id=labelstudio_project_id,
data_label=data_label,
)

fetch_slot_and_calls_output = fetch_calls_for_slots_op(
untagged_records_path=calls.output,
org_id=org_id,
language_code=lang,
start_date=start_date,
end_date=end_date,
)

# uploads data for call & slot level tagging to labelstudio
tag_calls_output = tag_calls_op(
input_file=fetch_slot_and_calls_output.output,
call_project_id=call_project_id,
data_label=data_label,
)

with kfp.dsl.Condition(notify != "", "notify").after(tag_turns_output) as check1:
df_sizes = tag_turns_output.outputs["df_sizes"]
errors = tag_turns_output.outputs["errors"]

notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}.
Uploaded {getattr(calls, 'output')} ({df_sizes}, {org_id=}) for tagging to {labelstudio_project_id=}."""
notification_text += f"\nErrors: {errors}" if errors else ""

task_no_cache = slack_notification_op(
notification_text, channel=channel, cc=notify, thread_id=slack_thread
)
task_no_cache.execution_options.caching_strategy.max_cache_staleness = (
"P0D" # disables caching
with kfp.dsl.Condition(calls.output != "", "calls_found").after(calls):
# Get intent response from GPT for qualifying turns
gpt_response_path = fetch_gpt_intent_prediction_op(
s3_file_path=calls.output, use_assisted_annotation=use_assisted_annotation
)

df_sizes2 = tag_calls_output.outputs["df_sizes"]
errors2 = tag_calls_output.outputs["errors"]

notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}.
Uploaded {getattr(fetch_slot_and_calls_output, 'output')} ({df_sizes2}, {org_id=}) for call & slot tagging to {call_project_id=}."""
notification_text += f"\nErrors: {errors2}" if errors else ""
# uploads data for turn level intent, entity & transcription tagging
tag_turns_output = tag_calls_op(
input_file=gpt_response_path.output,
project_id=labelstudio_project_id,
data_label=data_label,
)

task_no_cache2 = slack_notification_op(
notification_text, channel=channel, cc=notify, thread_id=slack_thread
fetch_slot_and_calls_output = fetch_calls_for_slots_op(
untagged_records_path=calls.output,
org_id=org_id,
language_code=lang,
start_date=start_date,
end_date=end_date,
)
task_no_cache2.execution_options.caching_strategy.max_cache_staleness = (
"P0D" # disables caching

# uploads data for call & slot level tagging to labelstudio
tag_calls_output = tag_calls_op(
input_file=fetch_slot_and_calls_output.output,
call_project_id=call_project_id,
data_label=data_label,
)

with kfp.dsl.Condition(notify != "", "notify").after(tag_turns_output):
df_sizes = tag_turns_output.outputs["df_sizes"]
errors = tag_turns_output.outputs["errors"]

notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}.
Uploaded {getattr(calls, 'output')} ({df_sizes}, {org_id=}) for tagging to {labelstudio_project_id=}."""
notification_text += f"\nErrors: {errors}" if errors else ""

task_no_cache = slack_notification_op(
notification_text, channel=channel, cc=notify, thread_id=slack_thread
)
task_no_cache.execution_options.caching_strategy.max_cache_staleness = (
"P0D" # disables caching
)

df_sizes2 = tag_calls_output.outputs["df_sizes"]
errors2 = tag_calls_output.outputs["errors"]

notification_text = f"""Finished a request for {call_quantity} calls. Fetched from {start_date} to {end_date} for {client_id=}.
Uploaded {getattr(fetch_slot_and_calls_output, 'output')} ({df_sizes2}, {org_id=}) for call & slot tagging to {call_project_id=}."""
notification_text += f"\nErrors: {errors2}" if errors else ""

task_no_cache2 = slack_notification_op(
notification_text, channel=channel, cc=notify, thread_id=slack_thread
)
task_no_cache2.execution_options.caching_strategy.max_cache_staleness = (
"P0D" # disables caching
)

with kfp.dsl.Condition(calls.output == "", "no_calls").after(calls):
with kfp.dsl.Condition(notify != "", "notify").after(calls):
notification_text = f"""No calls could be found from {start_date} to {end_date} for {client_id=}.
Please verify the parameters you have used or refer to the debugging guide on Notion."""

task_no_cache2 = slack_notification_op(
notification_text, channel=channel, cc=notify, thread_id=slack_thread
)
task_no_cache2.execution_options.caching_strategy.max_cache_staleness = (
"P0D" # disables caching
)


__all__ = ["fetch_n_tag_turns_and_calls"]

0 comments on commit 4846b31

Please sign in to comment.