Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hailtop/fs] Make sync / copy tools take advantage of cloud specific apis as much as possible #14601

Open
chrisvittal opened this issue Jul 2, 2024 · 2 comments
Assignees

Comments

@chrisvittal
Copy link
Collaborator

As part of our work with generating All of Us datasets, we needed to copy around a million gcs objects. Our Copier infrastructure 'should' be able to handle that, but it kept falling with robustness issues. What finally worked was using GCS's rewrite api. This allowed us to copy data without reading it, allowing the copies to complete in a fraction of the time while also reducing bandwidth needs.

There are two components to this:

  1. Research what specific APIs we can take advantage of
  2. Update our code to use them when we can, for the Copier, and the new sync tool ([fs] basic sync tool #14248)

Here's the code I used for making the rewrite requests for merging a set of matrix tables together, the progress bar code was for visibility.

async def rewrite(
    gfs: GoogleStorageAsyncFS,
    src: str,
    dst: str,
    progress: Optional[rich.progress.Progress] = None,
    file_tid: Optional[rich.progress.TaskID] = None,
    requests_tid: Optional[rich.progress.TaskID] = None,
):
    assert (progress is None) == (file_tid is None) == (requests_tid is None)
    src_bkt, src_name = gfs.get_bucket_and_name(src)
    dst_bkt, dst_name = gfs.get_bucket_and_name(dst)
    if not src_name:
        raise IsABucketError(src)
    if not dst_name:
        raise IsABucketError(dst)
    client = gfs._storage_client
    path = (
        f'/b/{src_bkt}/o/{urllib.parse.quote(src_name, safe="")}/rewriteTo'
        f'/b/{dst_bkt}/o/{urllib.parse.quote(dst_name, safe="")}'
    )
    kwargs = {'json': '', 'params': {}}
    client._update_params_with_user_project(kwargs, src_bkt)
    response = await retry_transient_errors(client.post, path, **kwargs)
    if progress is not None:
        progress.update(requests_tid, advance=1)
    while not response['done']:
        kwargs['params']['rewriteToken'] = response['rewriteToken']
        response = await retry_transient_errors(client.post, path, **kwargs)
        if progress is not None:
            progress.update(requests_tid, advance=1)
    if progress is not None:
        progress.update(file_tid, advance=1)
@danking
Copy link
Contributor

danking commented Jul 8, 2024

Were you using the old copier or the new (not yet merged) hailctl fs sync? I had hoped the latter was finally robust enough for real use. hailtop.aiotools.copy is indeed not very reliable. Regardless, using the rewrite action when the source and destination agree is the correct move.

@chrisvittal
Copy link
Collaborator Author

We used a one off script, an attempt was made to use Copier.copy, but that wasn't reliable enough. We also needed to rename destination files beyond what the sync (or copy) tool is capable of.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants