Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit for perf improvement in tasks save_stories_from_feed #23

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 116 additions & 67 deletions fetcher/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,37 +179,43 @@ def _save_rss_files(dir: str, fname: Any, feed: Dict,
f.write(response.content)


def normalized_title_exists(session: SessionType,
normalized_title_hash: Optional[str],
sources_id: Optional[int]) -> bool:
if normalized_title_hash is None or sources_id is None:
# err on the side of keeping URLs
return False
def normalized_titles_exists(session: SessionType,
entries: list,
feed: Dict,
now: dt.datetime) -> list:
sources_id = feed['sources_id']
earliest_date = dt.date.today() - dt.timedelta(days=NORMALIZED_TITLE_DAYS)
# only care if matching rows exist, so doing nested EXISTS query
data = get_title_hash_for_entries(entries, feed, now)
normalized_title_hashes = list(data.keys())
with session.begin():
return session.query(literal(True))\
.filter(session.query(Story)
.filter(Story.published_at >= earliest_date,
Story.normalized_title_hash == normalized_title_hash,
Story.sources_id == sources_id)
.exists())\
.scalar() is True


def normalized_url_exists(session: SessionType,
normalized_url: Optional[str]) -> bool:
if normalized_url is None:
return False
# only care if matching rows exist, so doing nested EXISTS query
query = session.query(Story.normalized_title_hash).filter(Story.published_at >= earliest_date,
Story.normalized_title_hash.in_(normalized_title_hashes),
Story.sources_id == sources_id)
result = query.all()
entries = []
for row in result:
rowdict = row._mapping
entries.append(data[rowdict['normalized_title_hash']])
return entries


# return entries whose nomralized url exists
def normalized_urls_exists(session: SessionType,
entries: list,
feed: Dict,
now: dt.datetime) -> list:
sources_id = feed['sources_id']
data = get_urls_for_entries(entries, feed, now)
normalized_urls = list(data.keys())
with session.begin():
return session.query(literal(True))\
.filter(session.query(Story)
.filter(Story.normalized_url ==
normalized_url)
.exists())\
.scalar() is True

query = session.query(Story.normalized_url).filter(Story.normalized_url.in_(normalized_urls),
Story.sources_id == sources_id)
result = query.all()
entries = []
for row in result:
rowdict = row._mapping
entries.append(data[rowdict['normalized_url']])
return entries

# used by scripts/queue_feeds.py, but moved here
# because needs to be kept in sync with queuing policy
Expand Down Expand Up @@ -816,22 +822,12 @@ def fetch_and_process_feed(
saved=saved, dup=dup, skipped=skipped)


def save_stories_from_feed(session: SessionType, # type: ignore[no-any-unimported]
now: dt.datetime,
feed: Dict,
parsed_feed: feedparser.FeedParserDict) -> Tuple[int, int, int]:
"""
Take parsed feed, so insert all the (valid) entries.
returns (saved_count, dup_count, skipped_count)
"""
stats = Stats.get() # get singleton

def stories_incr(status: str) -> None:
"""call exactly ONCE for each story processed"""
stats.incr('stories', 1, labels=[('stat', status)])

skipped_count = dup_count = saved_count = 0
for entry in parsed_feed.entries:
def get_valid_feed_entries(entries: list,
feed: Dict,
now: dt.datetime) -> list:
validEntries = []
skipped_count = 0
for entry in entries:
try:
link = getattr(entry, 'link', None)
if link is None:
Expand Down Expand Up @@ -878,29 +874,7 @@ def stories_incr(status: str) -> None:
stories_incr('nonews')
skipped_count += 1
continue
s.sources_id = feed['sources_id']
# only save if url is unique, and title is unique recently
if not normalized_url_exists(session, s.normalized_url):
if not normalized_title_exists(
session, s.normalized_title_hash, s.sources_id):
# need to commit one by one so duplicate URL keys don't stop a larger insert from happening
# those are *expected* errors, so we can ignore them
with session.begin():
session.add(s)
session.commit()
stories_incr('ok')
saved_count += 1
else:
# raised to info 2022-10-27
logger.info(
f" * skip duplicate title URL: {link} | {s.normalized_title} | {s.sources_id}")
stories_incr('dup_title')
dup_count += 1
else:
logger.debug(
f" * skip duplicate normalized URL: {link} | {s.normalized_url}")
stories_incr('dup_url')
dup_count += 1

except (AttributeError, KeyError, ValueError, UnicodeError) as exc:
# NOTE!! **REALLY** easy for coding errors to end up here!!!
# couldn't parse the entry - skip it
Expand All @@ -915,6 +889,81 @@ def stories_incr(status: str) -> None:

stories_incr('bad2')
skipped_count += 1
# append valid entry to list
validEntries.append(entry)
return validEntries, skipped_count

def get_urls_for_entries(entries: list,
feed: Dict,
now: dt.datetime):
data = {}
for entry in entries:
s = Story.from_rss_entry(feed['id'], now, entry)
data[s.normalized_url] = entry
return data

def get_title_hash_for_entries(entries: list,
feed: Dict,
now: dt.datetime):
data = {}
for entry in entries:
s = Story.from_rss_entry(feed['id'], now, entry)
data[s.normalized_title_hash] = entry
return data

def save_stories_from_feed(session: SessionType, # type: ignore[no-any-unimported]
now: dt.datetime,
feed: Dict,
parsed_feed: feedparser.FeedParserDict) -> Tuple[int, int, int]:
"""
Take parsed feed, so insert all the (valid) entries.
returns (saved_count, dup_count, skipped_count)
"""
stats = Stats.get() # get singleton

def stories_incr(status: str) -> None:
"""call exactly ONCE for each story processed"""
stats.incr('stories', 1, labels=[('stat', status)])

skipped_count = dup_count = saved_count = 0

# get valid entries
logger.debug(
f"number of entries to start: {len(parsed_feed.entries)}")
entries, skipped_count = get_valid_feed_entries(parsed_feed.entries, feed, now)
if len(entries) > 0:
entries_urls_existing = normalized_urls_exists(session, entries, feed, now)
entries = list(set(entries) - set(entries_urls_existing))
logger.debug(
f"number of entries after checking url: {len(entries)}")
dup_count_url = len(set(entries_urls_existing))
if dup_count_url > 0:
dup_count += dup_count_url
logger.debug(
f"skip {dup_count_url} duplicate normalized URL")
if len(entries) > 0:
entries_titles_existing = normalized_titles_exists(session, entries, feed, now)
entries = list(set(entries) - set(entries_titles_existing))
logger.debug(
f"number of entries after checking title: {len(entries)}")
dup_count_title = len(set(entries_titles_existing))
if dup_count_title > 0:
dup_count += dup_count_title
logger.debug(
f"skip {dup_count_title} duplicate normalized title")

for entry in entries:
try:
link = getattr(entry, 'link', None)
s = Story.from_rss_entry(feed['id'], now, entry)
s.sources_id = feed['sources_id']
# need to commit one by one so duplicate URL keys don't stop a larger insert from happening
# those are *expected* errors, so we can ignore them
with session.begin():
session.add(s)
session.commit()
stories_incr('ok')
saved_count += 1
except (IntegrityError, PendingRollbackError, UniqueViolation) as _:
# expected exception - log and ignore
logger.debug(
Expand Down