From 71fcc9a2a2ca44aea4c32182fd1795fbcacac461 Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Fri, 24 May 2024 10:00:11 -0700 Subject: [PATCH 1/7] Fix param reversal when creating Authors Fix prop name in pod query --- .../dags/processing/tasks/persist_summaries_task.py | 2 +- orchestration/airflow/dags/publishing/tasks/create_pod.py | 8 ++++---- orchestration/airflow/dags/shared/models/author.py | 5 ++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/orchestration/airflow/dags/processing/tasks/persist_summaries_task.py b/orchestration/airflow/dags/processing/tasks/persist_summaries_task.py index 15301287..5899df1c 100644 --- a/orchestration/airflow/dags/processing/tasks/persist_summaries_task.py +++ b/orchestration/airflow/dags/processing/tasks/persist_summaries_task.py @@ -252,7 +252,7 @@ def store_records(records: List[Dict], bucket_name: str, key: str, config: dict, malformed_records.append(record) try: for author in record.get("authors", []): - author_node = Author(driver, author.get(FIRST_NAME), author.get(LAST_NAME)) + author_node = Author(driver, last_name=author.get(LAST_NAME), first_name=author.get(FIRST_NAME)) author_node.create() arxiv_record.relate( driver, diff --git a/orchestration/airflow/dags/publishing/tasks/create_pod.py b/orchestration/airflow/dags/publishing/tasks/create_pod.py index 8f679ce0..628d8ebf 100644 --- a/orchestration/airflow/dags/publishing/tasks/create_pod.py +++ b/orchestration/airflow/dags/publishing/tasks/create_pod.py @@ -95,7 +95,7 @@ def next_pod_dates(config: dict, arxiv_set: str, category: str) -> List[datetime f"MATCH (s:ArxivSet {{code: $arxiv_set}}) " f"-[:{CATEGORIZED_BY}]->(c:ArxivCategory {{code: $category}}) " f"-[:{CATEGORIZES}]->(p:Podcast) " - "RETURN p.date ORDER BY p.date DESC LIMIT 1" + "RETURN p.episode_date ORDER BY p.episode_date DESC LIMIT 1" ) result = session.run(query, {"arxiv_set": arxiv_set, "category": category}) data = result.data() @@ -105,7 +105,7 @@ def next_pod_dates(config: dict, arxiv_set: str, category: str) -> List[datetime if len(data) == 0: start_date = end_date - timedelta(days=5) else: - start_date = datetime.combine(data[0]["p.date"].to_native(), datetime.min.time(), tzinfo) + start_date = datetime.combine(data[0]["p.episode_date"].to_native(), datetime.min.time(), tzinfo) date_list = [start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)] return date_list except Exception as e: @@ -126,7 +126,7 @@ def get_summaries(config: dict, arxiv_set: str, category: str, episode_date: dat f"<-[:{PRIMARILY_CATEGORIZED_BY}]-(a:ArxivRecord)--(b:Abstract) " "MATCH (a)-[:AUTHORED_BY]->(author:Author)" "WHERE a.date = $date " - "RETURN {record: a, abstract: b, authors: collect(author)} AS result" + "RETURN {record: a, abstract: b, authors: collect({first_name: author.first_name, last_name: author.last_name})} AS result" ) result = session.run(query, {"arxiv_set": arxiv_set, "category": category, "date": episode_date.date()}) data = result.data() @@ -224,7 +224,7 @@ def write_pod_script( script_content += no_latex_paragraph + "\n\n" part_record_ids.append(r["record"]["arxiv_id"]) except Exception as e: - logger.error("Error writing pod script", error=e, method=write_pod_script.__name__, record=r) + logger.error("Error writing pod script", error=e, method=write_pod_script.__name__) continue script_content += outro diff --git a/orchestration/airflow/dags/shared/models/author.py b/orchestration/airflow/dags/shared/models/author.py index 028b7ccb..5e73edb0 100644 --- a/orchestration/airflow/dags/shared/models/author.py +++ b/orchestration/airflow/dags/shared/models/author.py @@ -62,17 +62,16 @@ def create(self, last_name: str = "", first_name: str = ""): ) now = get_storage_key_datetime() properties = { - "first_name": self.first_name, "uuid": str(uuid.uuid4()), - "last_name": self.last_name, "created": now, "last_modified": now, } records, summary, _ = self.driver.execute_query( """ - MERGE (a:Author {last_name: $last_name}) + MERGE (a:Author {first_name: $first_name, last_name: $last_name}) ON CREATE SET a += $props RETURN a""", + first_name=self.first_name, last_name=self.last_name, props=properties, database_=self.db, From ad56f443d8aba60a79be5ae1ad00961f3291642b Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Fri, 24 May 2024 10:21:27 -0700 Subject: [PATCH 2/7] Remove default for required param --- orchestration/airflow/dags/shared/models/author.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/airflow/dags/shared/models/author.py b/orchestration/airflow/dags/shared/models/author.py index 5e73edb0..c786689e 100644 --- a/orchestration/airflow/dags/shared/models/author.py +++ b/orchestration/airflow/dags/shared/models/author.py @@ -117,7 +117,7 @@ def create(self, last_name: str = "", first_name: str = ""): raise e @classmethod - def find(cls, driver: Driver, last_name: str, first_name: str = ""): + def find(cls, driver: Driver, last_name: str, first_name: str): if not driver or not isinstance(driver, Driver): raise ValueError("Invalid driver") if not validate_strings(last_name, first_name): From dbe99a91903c7853da2908aa2e9f612ac78377f9 Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Fri, 24 May 2024 12:45:54 -0700 Subject: [PATCH 3/7] Fix Cypher query returns duplicate authors --- infra/core/dev.tfvars | 6 ++-- .../{create_pod.py => create_pod_task.py} | 30 ++++++++++++++++--- 2 files changed, 29 insertions(+), 7 deletions(-) rename orchestration/airflow/dags/publishing/tasks/{create_pod.py => create_pod_task.py} (92%) diff --git a/infra/core/dev.tfvars b/infra/core/dev.tfvars index c7fc76fc..1f7c42c7 100644 --- a/infra/core/dev.tfvars +++ b/infra/core/dev.tfvars @@ -3,7 +3,7 @@ # ********************************************************** app_name = "atomiklabs" -app_version = "0.3.6-alpha" +app_version = "0.3.7-alpha" availability_zones = ["us-east-1a", "us-east-1b", "us-east-1c"] aws_region = "us-east-1" backend_dynamodb_table = "terraform-state-locks" @@ -63,9 +63,9 @@ arxiv_sets = ["cs"] default_lambda_runtime = "python3.10" pods_prefix = "pods" -create_pod_task_version = "0.1.0" +create_pod_task_version = "0.1.1" fetch_from_arxiv_task_version = "0.1.0" most_recent_research_records_version = "0.0.2" parse_summaries_task_version = "0.1.0" -persist_summaries_task_version = "0.1.0" +persist_summaries_task_version = "0.1.1" save_summaries_to_datalake_task_version = "0.0.1" diff --git a/orchestration/airflow/dags/publishing/tasks/create_pod.py b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py similarity index 92% rename from orchestration/airflow/dags/publishing/tasks/create_pod.py rename to orchestration/airflow/dags/publishing/tasks/create_pod_task.py index 628d8ebf..f16e4894 100644 --- a/orchestration/airflow/dags/publishing/tasks/create_pod.py +++ b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py @@ -30,6 +30,7 @@ PUBLISHES, RECORDS_PREFIX, RETRIEVAL_ERRORS, + SUMMARIZES, ) from shared.utils.utils import get_config @@ -73,10 +74,31 @@ def run( logger.info("No summaries for date", set=arxiv_set, category=category, date=pod_date) continue pod_summaries = get_pod_summaries(context, config, summaries) - scripts = write_pod_script(config, pod_summaries, arxiv_set, category, pod_date) + scripts = write_pod_script( + config=config, + pod_summaries=pod_summaries, + arxiv_set=arxiv_set, + category=category, + episode_date=pod_date, + ) for key, script, part_record_ids in scripts: - audio_key = create_audio(config, arxiv_set, category, pod_date, script, key) - create_pod_node(config, arxiv_set, category, pod_date, key, audio_key, part_record_ids) + audio_key = create_audio( + config=config, + arxiv_set=arxiv_set, + category=category, + episode_date=pod_date, + script_text=script, + key=key, + ) + create_pod_node( + config=config, + arxiv_set=arxiv_set, + category=category, + episode_date=pod_date, + key=key, + audio_key=audio_key, + part_record_ids=part_record_ids, + ) except Exception as e: logger.error("Error creating pod", set=arxiv_set, category=category, date=pod_date, error=e) continue @@ -123,7 +145,7 @@ def get_summaries(config: dict, arxiv_set: str, category: str, episode_date: dat query = ( f"MATCH (s:ArxivSet {{code: $arxiv_set}}) " f"-[:{CATEGORIZED_BY}]->(c:ArxivCategory {{code: $category}}) " - f"<-[:{PRIMARILY_CATEGORIZED_BY}]-(a:ArxivRecord)--(b:Abstract) " + f"<-[:{PRIMARILY_CATEGORIZED_BY}]-(a:ArxivRecord)<-[:{SUMMARIZES}]-(b:Abstract) " "MATCH (a)-[:AUTHORED_BY]->(author:Author)" "WHERE a.date = $date " "RETURN {record: a, abstract: b, authors: collect({first_name: author.first_name, last_name: author.last_name})} AS result" From 02ef58bcfae43b1c7047cf50380d80b2485004e7 Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Fri, 24 May 2024 18:19:01 -0700 Subject: [PATCH 4/7] Change task filename to match convention Update changelog --- CHANGELOG.md | 7 +++++++ orchestration/airflow/dags/publishing/create_pods_dag.py | 2 +- .../airflow/dags/publishing/tasks/create_pod_task.py | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a70ffb1..f271c219 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.3.7-alpha] - 2024-05-24 + +### Fixed + +- Author first and last names reversed +- Podcast scripts double author names + ## [0.3.6-alpha] - 2024-05-24 ### Added diff --git a/orchestration/airflow/dags/publishing/create_pods_dag.py b/orchestration/airflow/dags/publishing/create_pods_dag.py index 05829474..387eae92 100644 --- a/orchestration/airflow/dags/publishing/create_pods_dag.py +++ b/orchestration/airflow/dags/publishing/create_pods_dag.py @@ -1,7 +1,7 @@ import os from logging.config import dictConfig -import publishing.tasks.create_pod as cpt +import publishing.tasks.create_pod_task as cpt import structlog from airflow import DAG from airflow.operators.python import PythonOperator diff --git a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py index f16e4894..2b128f64 100644 --- a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py +++ b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py @@ -148,7 +148,7 @@ def get_summaries(config: dict, arxiv_set: str, category: str, episode_date: dat f"<-[:{PRIMARILY_CATEGORIZED_BY}]-(a:ArxivRecord)<-[:{SUMMARIZES}]-(b:Abstract) " "MATCH (a)-[:AUTHORED_BY]->(author:Author)" "WHERE a.date = $date " - "RETURN {record: a, abstract: b, authors: collect({first_name: author.first_name, last_name: author.last_name})} AS result" + "RETURN {record: a, abstract: b, authors: collect(DISTINCT {first_name: author.first_name, last_name: author.last_name})} AS result" ) result = session.run(query, {"arxiv_set": arxiv_set, "category": category, "date": episode_date.date()}) data = result.data() From 1c7b1c11eb57c08e988c19fa6bc2667b11240778 Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Fri, 24 May 2024 18:19:35 -0700 Subject: [PATCH 5/7] Remove unnecessary check --- orchestration/airflow/dags/publishing/tasks/create_pod_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py index 2b128f64..f16e4894 100644 --- a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py +++ b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py @@ -148,7 +148,7 @@ def get_summaries(config: dict, arxiv_set: str, category: str, episode_date: dat f"<-[:{PRIMARILY_CATEGORIZED_BY}]-(a:ArxivRecord)<-[:{SUMMARIZES}]-(b:Abstract) " "MATCH (a)-[:AUTHORED_BY]->(author:Author)" "WHERE a.date = $date " - "RETURN {record: a, abstract: b, authors: collect(DISTINCT {first_name: author.first_name, last_name: author.last_name})} AS result" + "RETURN {record: a, abstract: b, authors: collect({first_name: author.first_name, last_name: author.last_name})} AS result" ) result = session.run(query, {"arxiv_set": arxiv_set, "category": category, "date": episode_date.date()}) data = result.data() From 78a31c8359b343b48857f2904f6c1574688b0fe4 Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Fri, 24 May 2024 19:47:27 -0700 Subject: [PATCH 6/7] Fix param name --- orchestration/airflow/dags/publishing/tasks/create_pod_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py index f16e4894..0bbcbdb6 100644 --- a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py +++ b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py @@ -94,7 +94,7 @@ def run( config=config, arxiv_set=arxiv_set, category=category, - episode_date=pod_date, + pod_date=pod_date, key=key, audio_key=audio_key, part_record_ids=part_record_ids, From e3b5b0b4db2ab1081b7c83f2fbbf090d8f4f15c8 Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Fri, 24 May 2024 19:57:15 -0700 Subject: [PATCH 7/7] Fix param name --- orchestration/airflow/dags/publishing/tasks/create_pod_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py index 0bbcbdb6..7d76daf7 100644 --- a/orchestration/airflow/dags/publishing/tasks/create_pod_task.py +++ b/orchestration/airflow/dags/publishing/tasks/create_pod_task.py @@ -95,7 +95,7 @@ def run( arxiv_set=arxiv_set, category=category, pod_date=pod_date, - key=key, + script_key=key, audio_key=audio_key, part_record_ids=part_record_ids, )