diff --git a/directory_issues/scripts/__init__.py b/directory_issues/scripts/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/directory_issues/scripts/__init__.py @@ -0,0 +1 @@ + diff --git a/directory_issues/scripts/client.py b/directory_issues/scripts/client.py new file mode 100644 index 0000000..bb599c5 --- /dev/null +++ b/directory_issues/scripts/client.py @@ -0,0 +1,49 @@ +import os +import logging +import pandas as pd +import mediacloud.api as mc_api +from dotenv import load_dotenv +from typing import Generator, Optional, List +from mc_providers import provider_for, PLATFORM_ONLINE_NEWS, PLATFORM_SOURCE_MEDIA_CLOUD + + +logger = logging.getLogger(__name__) + + +class MediaCloudClient: + def __init__(self): + load_dotenv() + self.base_url = os.getenv("MC_ELASTICSEARCH_BASE_URL") + self.api_token = os.getenv("MC_API_TOKEN") + + if not self.base_url: + raise ValueError( + "MC_ELASTICSEARCH_BASE_URL environment variable is required" + ) + if not self.api_token: + raise ValueError("MC_API_TOKEN environment variable is required") + + self.directory_client = mc_api.DirectoryApi(self.api_token) + self.provider = self._initialize_provider() + + def _initialize_provider(self): + return provider_for( + PLATFORM_ONLINE_NEWS, PLATFORM_SOURCE_MEDIA_CLOUD, base_url=self.base_url + ) + + def get_sources( + self, platform: Optional[str] = None, batch_size: int = 100, offset: int = 0 + ) -> Generator[List[str], None, None]: + try: + response = self.directory_client.source_list( + platform=platform, limit=batch_size, offset=offset + ) + sources = [source["name"] for source in response.get("results", [])] + yield sources + + offset += batch_size + print(f"Fetched batch of {len(sources)} sources. Total offset: {offset}") + + except Exception as e: + print(f"Error fetching sources: {str(e)}") + raise diff --git a/directory_issues/scripts/sources/__init__.py b/directory_issues/scripts/sources/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/directory_issues/scripts/sources/__init__.py @@ -0,0 +1 @@ + diff --git a/directory_issues/scripts/sources/base.py b/directory_issues/scripts/sources/base.py new file mode 100644 index 0000000..60b8bee --- /dev/null +++ b/directory_issues/scripts/sources/base.py @@ -0,0 +1,81 @@ +import pandas as pd +import logging +from typing import Optional, Any, Dict, List +from directory_issues.scripts.client import MediaCloudClient + +logger = logging.getLogger(__name__) + + +class SourcesBase: + def __init__(self, client: MediaCloudClient): + self.client = client + self.result_column = "result" + + def process_sources( + self, + platform: Optional[str] = None, + batch_size: int = 100, + file_name: Optional[str] = None, + ): + sources_generator = self.client.get_sources( + platform=platform, batch_size=batch_size + ) + + total_sources = None + processed_sources = 0 + + for batch_number, sources in enumerate(sources_generator, start=1): + if total_sources is None: + response = self.client.directory_client.source_list( + platform=platform, limit=1 + ) + total_sources = response.get("count", 0) + logger.info(f"Total sources to process: {total_sources}") + + batch_results = self._process_source_batch(sources) + self._save_batch_results_to_csv( + batch_results, batch_number, file_name or "sources" + ) + processed_sources += len(sources) + logger.info(f"Processed {processed_sources}/{total_sources} sources") + + if processed_sources >= total_sources: + break + + def _process_source_batch(self, sources: List[str]) -> List[dict]: + """ + Process a batch of sources and collect results. + """ + batch_results = [] + for source in sources: + try: + result = self.analyze_source(source) + if result is not None: + batch_results.append({"source": source, self.result_column: result}) + except Exception as e: + logger.exception(e) + + return batch_results + + def _save_batch_results_to_csv( + self, + batch_results: List[dict], + batch_number: int, + file_name: str, + ): + """ + Save batch results to a CSV file with error handling. + """ + if not batch_results: + return + + try: + batch_df = pd.DataFrame(batch_results) + batch_df.to_csv( + f"{file_name}_results_batch_{batch_number}.csv", index=False + ) + except Exception as e: + logger.exception(e) + + def analyze_source(self, domain: str): + raise NotImplementedError("Subclasses must implement this method.") diff --git a/directory_issues/scripts/sources/update_language.py b/directory_issues/scripts/sources/update_language.py new file mode 100644 index 0000000..d2dc94b --- /dev/null +++ b/directory_issues/scripts/sources/update_language.py @@ -0,0 +1,47 @@ +import logging +import datetime as dt +import pandas as pd +from typing import Optional, List +from directory_issues.scripts.sources.base import SourcesBase, MediaCloudClient + +logger = logging.getLogger(__name__) + +DAYS_BACK = 365 + +class SourceLanguage(SourcesBase): + def __init__(self, client): + super().__init__(client) + self.result_column = "primary_language" + + def analyze_source(self, domain: str, min_story_count: int = 100) -> Optional[str]: + """ + Analyze a single source to get its primary language. + + Args: + domain (str): Domain to analyze. + min_story_count (int): Minimum number of stories to consider the source valid. + + Returns: + Optional[str]: The primary language of the source or None if not enough data. + """ + query = f"canonical_domain:{domain}" + start_date = dt.datetime.now() - dt.timedelta(days=DAYS_BACK) + end_date = dt.datetime.now() + + results = self.client.provider._overview_query(query, start_date, end_date) + + if results["total"] <= min_story_count or self.client.provider._is_no_results( + results + ): + return None + + languages = [match["language"] for match in results["matches"]] + return max(set(languages), key=languages.count) + + +if __name__ == "__main__": + client = MediaCloudClient() + lang_analyzer = SourceLanguage(client) + lang_results = lang_analyzer.process_sources( + platform="online_news", batch_size=10000, file_name="language" + ) diff --git a/directory_issues/scripts/sources/update_publication_date.py b/directory_issues/scripts/sources/update_publication_date.py new file mode 100644 index 0000000..7b1473a --- /dev/null +++ b/directory_issues/scripts/sources/update_publication_date.py @@ -0,0 +1,43 @@ +import datetime as dt +from typing import Optional +from directory_issues.scripts.sources.base import SourcesBase, MediaCloudClient + +# Set earliest ingest date to 2000 +START_DATE_YEAR = 2000 +START_DATE_MONTH = 1 +START_DATE_DAY = 1 + + +class SourcesPublicationDate(SourcesBase): + def __init__(self, client): + super().__init__(client) + self.result_column = "first_publication_date" + + def analyze_source( + self, domain: str, min_story_count: int = 100 + ) -> Optional[dt.datetime]: + """Analyze a single source to get its first publication date.""" + query = f"canonical_domain:{domain}" + start_date = dt.datetime(START_DATE_YEAR, START_DATE_MONTH, START_DATE_DAY) + end_date = dt.datetime.now() + + results = self.client.provider._overview_query(query, start_date, end_date) + + if results["total"] <= min_story_count or self.client.provider._is_no_results( + results + ): + return None + + publication_dates = [ + dt.datetime.fromisoformat(match["publication_date"]) + for match in results["matches"] + ] + return min(publication_dates, default=None) + + +if __name__ == "__main__": + client = MediaCloudClient() + pub_date_analyzer = SourcesPublicationDate(client) + pub_results = pub_date_analyzer.process_sources( + platform="online_news", batch_size=10000, file_name="publication_date" + )