Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ncmec: store checkpoint occasionally when start, end diff is one second #1731

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

prenner
Copy link
Contributor

@prenner prenner commented Jan 9, 2025

Summary

sometimes ncmec fails to make progress after hitting a second w/ a large number of results: #1679. when that happens (diff of end and start is a second and we have lots of data), store checkpoints occasionally via a next pointer

Test Plan

confirm that next pointer is used when fetching hashes locally

@@ -39,14 +39,16 @@ class NCMECCheckpoint(

# The biggest value of "to", and the next "from"
get_entries_max_ts: int
next_fetch: str
last_fetch_time: int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unsued currently but I know we talked about invalidating checkpoints older than 24hr.. happy to add that! just wanted to get confirmation on the checkpointing piece that this is on the right track

@prenner prenner force-pushed the prenner/checkpoint-ncmec branch from 928ddce to 4f12e50 Compare January 9, 2025 16:14
@prenner prenner changed the title ncmec: store checkpoint occasionally when start, end diff is one second [WIP] ncmec: store checkpoint occasionally when start, end diff is one second Jan 9, 2025
@prenner prenner force-pushed the prenner/checkpoint-ncmec branch 4 times, most recently from 2965a46 to 5270515 Compare January 9, 2025 17:12
@prenner prenner force-pushed the prenner/checkpoint-ncmec branch from 5270515 to d7f207e Compare January 9, 2025 17:54
Copy link
Contributor

@Dcallies Dcallies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looking good, thanks for making this change, and I think it will help a lot!

I am slightly suspicious that the paging URLs can go sour (e.g. I have noticed that NCMEC API tends to throw exceptions near the very end of the paging list that make me think that they are invaliding), so I think adding the time-based invalidation logic is a requirement.

As part of your test plan, can you also attempt fetching past an extremely dense time segment in the NCMEC API and confirm the behavior works as expected?

Comment on lines +156 to +166
class NCMECCheckpointWithoutNext(FetchCheckpointBase):
"""
0.99.x => 1.0.0

get_entries_max_ts: int =>
get_entries_max_ts: int
next_fetch: str
last_fetch_time: int
"""

get_entries_max_ts: int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this test case!

@@ -39,14 +39,16 @@ class NCMECCheckpoint(

# The biggest value of "to", and the next "from"
get_entries_max_ts: int
next_fetch: t.Optional[str] = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: Hmm, not sold on this name. NCMEC calls these "paging urls" so maybe paging_url?
blocking; Add a comment explaining what this variable represents.

nit: Since the default value is empty string, suggest making this non-Optional and just using that.


updates.extend(entry.updates)

if i % 100 == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: by change this from elif to if, I think it will now print the large update warning every update, which is incorrect, no?


def get_progress_timestamp(self) -> t.Optional[int]:
return self.get_entries_max_ts

@classmethod
def from_ncmec_fetch(cls, response: api.GetEntriesResponse) -> "NCMECCheckpoint":
"""Synthesizes a checkpoint from the API response"""
return cls(response.max_timestamp)
return cls(response.max_timestamp, response.next, int(time.time()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: NCMEC's documentation asks us not to store these checkpoints long term, so let's respect that in this implementation.

One way we could handle this is not to directly access this variable, but wrap it with a helper function that will return empty string if it's too old.

def get_resume_url_if_recent(self):
   if time.time() - self.last_fetch_time < SOME_CONSTANT_MAYBE_12_HOURS:
     return self.next_fetch
   return ""

@@ -39,14 +39,16 @@ class NCMECCheckpoint(

# The biggest value of "to", and the next "from"
get_entries_max_ts: int
next_fetch: t.Optional[str] = ""
last_fetch_time: t.Optional[int] = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: Add a comment on the relationship between the paging URL for why we save this.

ignorable: We could also make this variable explicitly mention the paging_url and not set it if we are not paging

@dataclass
class NCMECCheckpointWithoutNext(FetchCheckpointBase):
"""
0.99.x => 1.0.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: This version isn't quite correct, the current version is 1.2.3

@@ -565,7 +565,7 @@ def get_entries(
)

def get_entries_iter(
self, *, start_timestamp: int = 0, end_timestamp: int = 0
self, *, start_timestamp: int = 0, end_timestamp: int = 0, next_: str = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: next_ is a bit cryptic, perhaps we can do checkpointed_paging_url and then do

next_ = checkpointed_paging_url

log(f"large fetch ({i}), up to {len(updates)}")
updates.extend(entry.updates)
# so store the checkpoint occasionally
log(f"large fetch ({i}), up to {len(updates)}. storing checkpoint")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You don't actually store the checkpoint by yielding, technically the caller can decide whether to keep calling or store.

start_timestamp=current_start, end_timestamp=current_end
start_timestamp=current_start,
end_timestamp=current_end,
next_=current_next_fetch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: Danger! It's actually very easy to mess up this argument and accidentally trigger and endless loop. It may be that you have done so in the current code, but it's hard to tell.

The only time current_next_fetch should be populated is when you are resuming from checkpoint, and you need to explicitly disable the overfetch check (L290) then.

There might be a refactoring of this code that makes this easier, or now that we are switching over to the next pointer version we can get rid of the probing behavior, which simplifies the implementation quite a bit.

start_timestamp=current_start, end_timestamp=current_end
start_timestamp=current_start,
end_timestamp=current_end,
next_=current_next_fetch,
)
):
if i == 0: # First batch, check for overfetch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a comment, it turns out my implementation for estimation of the entries in range was completely off, and so this is basically always overly cautious. Not sure what to do about it, since the alternatives that I can think of are complicated.

@@ -240,15 +242,18 @@ def fetch_iter(
the cursor
"""
start_time = 0
next_fetch = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just call this variable current_next_fetch - it's unconditionally written at 256.

last_fetch_time=int(time.time()),
),
)
current_next_fetch = entry.next
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This value is never read, as it always unconditionally replaced on L359

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants