Skip to content

chore: setup scripts to update primary language and publication_date #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions directory_issues/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

49 changes: 49 additions & 0 deletions directory_issues/scripts/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import logging
import pandas as pd
import mediacloud.api as mc_api
from dotenv import load_dotenv
from typing import Generator, Optional, List
from mc_providers import provider_for, PLATFORM_ONLINE_NEWS, PLATFORM_SOURCE_MEDIA_CLOUD


logger = logging.getLogger(__name__)


class MediaCloudClient:
def __init__(self):
load_dotenv()
self.base_url = os.getenv("MC_ELASTICSEARCH_BASE_URL")
self.api_token = os.getenv("MC_API_TOKEN")

if not self.base_url:
raise ValueError(
"MC_ELASTICSEARCH_BASE_URL environment variable is required"
)
if not self.api_token:
raise ValueError("MC_API_TOKEN environment variable is required")

self.directory_client = mc_api.DirectoryApi(self.api_token)
self.provider = self._initialize_provider()

def _initialize_provider(self):
return provider_for(
PLATFORM_ONLINE_NEWS, PLATFORM_SOURCE_MEDIA_CLOUD, base_url=self.base_url
)

def get_sources(
self, platform: Optional[str] = None, batch_size: int = 100, offset: int = 0
) -> Generator[List[str], None, None]:
try:
response = self.directory_client.source_list(
platform=platform, limit=batch_size, offset=offset
)
sources = [source["name"] for source in response.get("results", [])]
yield sources

offset += batch_size
print(f"Fetched batch of {len(sources)} sources. Total offset: {offset}")

except Exception as e:
print(f"Error fetching sources: {str(e)}")
raise
1 change: 1 addition & 0 deletions directory_issues/scripts/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

81 changes: 81 additions & 0 deletions directory_issues/scripts/sources/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pandas as pd
import logging
from typing import Optional, Any, Dict, List
from directory_issues.scripts.client import MediaCloudClient

logger = logging.getLogger(__name__)


class SourcesBase:
def __init__(self, client: MediaCloudClient):
self.client = client
self.result_column = "result"

def process_sources(
self,
platform: Optional[str] = None,
batch_size: int = 100,
file_name: Optional[str] = None,
):
sources_generator = self.client.get_sources(
platform=platform, batch_size=batch_size
)

total_sources = None
processed_sources = 0

for batch_number, sources in enumerate(sources_generator, start=1):
if total_sources is None:
response = self.client.directory_client.source_list(
platform=platform, limit=1
)
total_sources = response.get("count", 0)
logger.info(f"Total sources to process: {total_sources}")

batch_results = self._process_source_batch(sources)
self._save_batch_results_to_csv(
batch_results, batch_number, file_name or "sources"
)
processed_sources += len(sources)
logger.info(f"Processed {processed_sources}/{total_sources} sources")

if processed_sources >= total_sources:
break

def _process_source_batch(self, sources: List[str]) -> List[dict]:
"""
Process a batch of sources and collect results.
"""
batch_results = []
for source in sources:
try:
result = self.analyze_source(source)
if result is not None:
batch_results.append({"source": source, self.result_column: result})
except Exception as e:
logger.exception(e)

return batch_results

def _save_batch_results_to_csv(
self,
batch_results: List[dict],
batch_number: int,
file_name: str,
):
"""
Save batch results to a CSV file with error handling.
"""
if not batch_results:
return

try:
batch_df = pd.DataFrame(batch_results)
batch_df.to_csv(
f"{file_name}_results_batch_{batch_number}.csv", index=False
)
except Exception as e:
logger.exception(e)

def analyze_source(self, domain: str):
raise NotImplementedError("Subclasses must implement this method.")
47 changes: 47 additions & 0 deletions directory_issues/scripts/sources/update_language.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
import datetime as dt
import pandas as pd
from typing import Optional, List
from directory_issues.scripts.sources.base import SourcesBase, MediaCloudClient

logger = logging.getLogger(__name__)

DAYS_BACK = 365

class SourceLanguage(SourcesBase):
def __init__(self, client):
super().__init__(client)
self.result_column = "primary_language"

def analyze_source(self, domain: str, min_story_count: int = 100) -> Optional[str]:
"""
Analyze a single source to get its primary language.

Args:
domain (str): Domain to analyze.
min_story_count (int): Minimum number of stories to consider the source valid.

Returns:
Optional[str]: The primary language of the source or None if not enough data.
"""
query = f"canonical_domain:{domain}"
start_date = dt.datetime.now() - dt.timedelta(days=DAYS_BACK)
end_date = dt.datetime.now()

results = self.client.provider._overview_query(query, start_date, end_date)

if results["total"] <= min_story_count or self.client.provider._is_no_results(
results
):
return None

languages = [match["language"] for match in results["matches"]]
return max(set(languages), key=languages.count)


if __name__ == "__main__":
client = MediaCloudClient()
lang_analyzer = SourceLanguage(client)
lang_results = lang_analyzer.process_sources(
platform="online_news", batch_size=10000, file_name="language"
)
43 changes: 43 additions & 0 deletions directory_issues/scripts/sources/update_publication_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import datetime as dt
from typing import Optional
from directory_issues.scripts.sources.base import SourcesBase, MediaCloudClient

# Set earliest ingest date to 2000
START_DATE_YEAR = 2000
START_DATE_MONTH = 1
START_DATE_DAY = 1


class SourcesPublicationDate(SourcesBase):
def __init__(self, client):
super().__init__(client)
self.result_column = "first_publication_date"

def analyze_source(
self, domain: str, min_story_count: int = 100
) -> Optional[dt.datetime]:
"""Analyze a single source to get its first publication date."""
query = f"canonical_domain:{domain}"
start_date = dt.datetime(START_DATE_YEAR, START_DATE_MONTH, START_DATE_DAY)
end_date = dt.datetime.now()

results = self.client.provider._overview_query(query, start_date, end_date)

if results["total"] <= min_story_count or self.client.provider._is_no_results(
results
):
return None

publication_dates = [
dt.datetime.fromisoformat(match["publication_date"])
for match in results["matches"]
]
return min(publication_dates, default=None)


if __name__ == "__main__":
client = MediaCloudClient()
pub_date_analyzer = SourcesPublicationDate(client)
pub_results = pub_date_analyzer.process_sources(
platform="online_news", batch_size=10000, file_name="publication_date"
)