Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Asynchronious directory synchronization #701

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/hub-prod/test-cloud-sync.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"outputs": [],
"source": [
"from lamindb_setup import init, settings\n",
"from lamindb_setup.core.upath import synchronize_sync\n",
"import time\n",
"import os"
]
Expand Down Expand Up @@ -261,6 +262,47 @@
" ).modified.timestamp()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "44b632bd",
"metadata": {},
"outputs": [],
"source": [
"(dir_sync_local / \"file2\").unlink()\n",
"local_file_new_parent.mkdir()\n",
"local_file_new.touch()\n",
"\n",
"assert num_files(dir_sync_local) == 2"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "860938e8",
"metadata": {},
"outputs": [],
"source": [
"time.sleep(1)\n",
"\n",
"cloud_file = dir_sync / \"file2\"\n",
"cloud_file.touch()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ab4799f2",
"metadata": {},
"outputs": [],
"source": [
"synchronize_sync(dir_sync, dir_sync_local)\n",
"\n",
"assert num_files(dir_sync_local) == 2\n",
"assert not local_file_new.exists()\n",
"assert not local_file_new_parent.exists()"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
211 changes: 211 additions & 0 deletions lamindb_setup/core/_synchronize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
from fsspec.asyn import _run_coros_in_chunks
from fsspec.callbacks import _DEFAULT_CALLBACK
from pathlib import Path, PurePosixPath
from lamin_utils import logger
from upath import UPath
import os


PROTOCOL_MODIFIED = {"s3": "LastModified", "gs": "mtime"}


# synchronious version
def synchronize_sync(
upath: UPath, objectpath: Path, error_no_origin: bool = True, **kwargs
):
"""Sync to a local destination path."""
# optimize the number of network requests
if "timestamp" in kwargs:
is_dir = False
exists = True
cloud_mts = kwargs.pop("timestamp")
else:
# perform only one network request to check existence, type and timestamp
try:
cloud_mts = upath.modified.timestamp()
is_dir = False
exists = True

Check warning on line 27 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L26-L27

Added lines #L26 - L27 were not covered by tests
except FileNotFoundError:
exists = False

Check warning on line 29 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L29

Added line #L29 was not covered by tests
except IsADirectoryError:
is_dir = True
exists = True
callback = kwargs.pop("callback", _DEFAULT_CALLBACK)

if not exists:
warn_or_error = f"The original path {upath} does not exist anymore."
if objectpath.exists():
warn_or_error += (

Check warning on line 38 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L36-L38

Added lines #L36 - L38 were not covered by tests
f"\nHowever, the local path {objectpath} still exists, you might want"
" to reupload the object back."
)
logger.warning(warn_or_error)
elif error_no_origin:
warn_or_error += "\nIt is not possible to synchronize."
raise FileNotFoundError(warn_or_error)
return None

Check warning on line 46 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L42-L46

Added lines #L42 - L46 were not covered by tests

# synchronization logic for directories
if is_dir:
files = upath.fs.find(str(upath), detail=True)
modified_key = PROTOCOL_MODIFIED.get(upath.protocol, None)
if modified_key is None:
raise ValueError(f"Can't synchronize a directory for {upath.protocol}.")

Check warning on line 53 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L53

Added line #L53 was not covered by tests
if objectpath.exists():
destination_exists = True
cloud_mts_max = max(
file[modified_key] for file in files.values()
).timestamp()
local_mts = [
file.stat().st_mtime for file in objectpath.rglob("*") if file.is_file()
]
n_local_files = len(local_mts)
local_mts_max = max(local_mts)
if local_mts_max == cloud_mts_max:
need_synchronize = n_local_files != len(files)

Check warning on line 65 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L65

Added line #L65 was not covered by tests
elif local_mts_max > cloud_mts_max:
need_synchronize = False

Check warning on line 67 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L67

Added line #L67 was not covered by tests
else:
need_synchronize = True
else:
destination_exists = False
need_synchronize = True

Check warning on line 72 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L71-L72

Added lines #L71 - L72 were not covered by tests
if need_synchronize:
origin_file_keys = []
callback.set_size(len(files))
for file, stat in callback.wrap(files.items()):
destination = PurePosixPath(file).relative_to(upath.path)
origin_file_keys.append(destination.as_posix())
timestamp = stat[modified_key].timestamp()
origin = UPath(f"{upath.protocol}://{file}", **upath._kwargs)
synchronize_sync(
origin, objectpath / destination, timestamp=timestamp, **kwargs
)
if destination_exists:
local_files = [file for file in objectpath.rglob("*") if file.is_file()]
if len(local_files) > len(files):
for file in local_files:
if (
file.relative_to(objectpath).as_posix()
not in origin_file_keys
):
file.unlink()
parent = file.parent
if next(parent.iterdir(), None) is None:
parent.rmdir()
return None

# synchronization logic for files
if objectpath.exists():
local_mts = objectpath.stat().st_mtime # type: ignore
need_synchronize = cloud_mts > local_mts
else:
objectpath.parent.mkdir(parents=True, exist_ok=True)
need_synchronize = True
if need_synchronize:
upath.download_to(objectpath, **kwargs)
os.utime(objectpath, times=(cloud_mts, cloud_mts))


# asynchronious version
async def synchronize_async(
upath: UPath, objectpath: Path, error_no_origin: bool = True, **kwargs
):
"""Sync to a local destination path."""
modified_key = PROTOCOL_MODIFIED.get(upath.protocol, None)
if modified_key is None:
raise ValueError(f"Can't synchronize for {upath.protocol}.")

Check warning on line 117 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L117

Added line #L117 was not covered by tests
kwargs.pop("print_progress", None)
# optimize the number of network requests
if "timestamp" in kwargs:
is_dir = False
exists = True
cloud_mts = kwargs.pop("timestamp")
else:
try:
info = await upath.fs._info(str(upath))
exists = True
if info["type"] == "directory":
is_dir = True
batch_size = kwargs.pop("batch_size", upath.fs.batch_size)
callback = kwargs.pop("callback", _DEFAULT_CALLBACK)
else:
is_dir = False
cloud_mts = info[modified_key].timestamp()
except FileNotFoundError:
exists = False

if not exists:
warn_or_error = f"The original path {upath} does not exist anymore."
if objectpath.exists():
warn_or_error += (

Check warning on line 141 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L141

Added line #L141 was not covered by tests
f"\nHowever, the local path {objectpath} still exists, you might want"
" to reupload the object back."
)
logger.warning(warn_or_error)

Check warning on line 145 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L145

Added line #L145 was not covered by tests
elif error_no_origin:
warn_or_error += "\nIt is not possible to synchronize."
raise FileNotFoundError(warn_or_error)

Check warning on line 148 in lamindb_setup/core/_synchronize.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/_synchronize.py#L147-L148

Added lines #L147 - L148 were not covered by tests
return None

# synchronization logic for directories
if is_dir:
files = await upath.fs._find(str(upath), detail=True)
if objectpath.exists():
destination_exists = True
cloud_mts_max = max(
file[modified_key] for file in files.values()
).timestamp()
local_mts = [
file.stat().st_mtime for file in objectpath.rglob("*") if file.is_file()
]
n_local_files = len(local_mts)
local_mts_max = max(local_mts)
if local_mts_max == cloud_mts_max:
need_synchronize = n_local_files != len(files)
elif local_mts_max > cloud_mts_max:
need_synchronize = False
else:
need_synchronize = True
else:
destination_exists = False
need_synchronize = True
if need_synchronize:
origin_file_keys = []
coros = []
for file, stat in files.items():
destination = PurePosixPath(file).relative_to(upath.path)
origin_file_keys.append(destination.as_posix())
timestamp = stat[modified_key].timestamp()
origin = UPath(f"{upath.protocol}://{file}", **upath._kwargs)
coros.append(
synchronize_async(
origin, objectpath / destination, timestamp=timestamp, **kwargs
)
)
callback.set_size(len(files))
await _run_coros_in_chunks(coros, batch_size=batch_size, callback=callback)
if destination_exists:
local_files = [file for file in objectpath.rglob("*") if file.is_file()]
if len(local_files) > len(files):
for file in local_files:
if (
file.relative_to(objectpath).as_posix()
not in origin_file_keys
):
file.unlink()
parent = file.parent
if next(parent.iterdir(), None) is None:
parent.rmdir()
return None

# synchronization logic for files
if objectpath.exists():
local_mts = objectpath.stat().st_mtime # type: ignore
need_synchronize = cloud_mts > local_mts
else:
objectpath.parent.mkdir(parents=True, exist_ok=True)
need_synchronize = True
if need_synchronize:
await upath.fs._get(str(upath), str(objectpath), **kwargs)
os.utime(objectpath, times=(cloud_mts, cloud_mts))
98 changes: 8 additions & 90 deletions lamindb_setup/core/upath.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import os
from datetime import datetime, timezone
import botocore.session
from pathlib import Path, PurePosixPath
from pathlib import Path
from typing import Literal, Dict
import fsspec
from fsspec.asyn import AsyncFileSystem, sync
from itertools import islice
from typing import Optional, Set, Any, Tuple, List
from collections import defaultdict
Expand All @@ -16,6 +17,7 @@
from upath.implementations.local import LocalPath, PosixUPath, WindowsUPath
from .types import UPathStr
from .hashing import b16_to_b64, hash_md5s_from_dir
from ._synchronize import synchronize_sync, synchronize_async

LocalPathClasses = (PosixUPath, WindowsUPath, LocalPath)

Expand Down Expand Up @@ -183,96 +185,12 @@

def synchronize(self, objectpath: Path, error_no_origin: bool = True, **kwargs):
"""Sync to a local destination path."""
# optimize the number of network requests
if "timestamp" in kwargs:
is_dir = False
exists = True
cloud_mts = kwargs.pop("timestamp")
else:
# perform only one network request to check existence, type and timestamp
try:
cloud_mts = self.modified.timestamp()
is_dir = False
exists = True
except FileNotFoundError:
exists = False
except IsADirectoryError:
is_dir = True
exists = True

if not exists:
warn_or_error = f"The original path {self} does not exist anymore."
if objectpath.exists():
warn_or_error += (
f"\nHowever, the local path {objectpath} still exists, you might want"
" to reupload the object back."
)
logger.warning(warn_or_error)
elif error_no_origin:
warn_or_error += "\nIt is not possible to synchronize."
raise FileNotFoundError(warn_or_error)
return None

# synchronization logic for directories
if is_dir:
files = self.fs.find(str(self), detail=True)
protocol_modified = {"s3": "LastModified", "gs": "mtime"}
modified_key = protocol_modified.get(self.protocol, None)
if modified_key is None:
raise ValueError(f"Can't synchronize a directory for {self.protocol}.")
if objectpath.exists():
destination_exists = True
cloud_mts_max = max(
file[modified_key] for file in files.values()
).timestamp()
local_mts = [
file.stat().st_mtime for file in objectpath.rglob("*") if file.is_file()
]
n_local_files = len(local_mts)
local_mts_max = max(local_mts)
if local_mts_max == cloud_mts_max:
need_synchronize = n_local_files != len(files)
elif local_mts_max > cloud_mts_max:
need_synchronize = False
else:
need_synchronize = True
else:
destination_exists = False
need_synchronize = True
if need_synchronize:
origin_file_keys = []
for file, stat in files.items():
destination = PurePosixPath(file).relative_to(self.path)
origin_file_keys.append(destination.as_posix())
timestamp = stat[modified_key].timestamp()
origin = UPath(f"{self.protocol}://{file}", **self._kwargs)
origin.synchronize(
objectpath / destination, timestamp=timestamp, **kwargs
)
if destination_exists:
local_files = [file for file in objectpath.rglob("*") if file.is_file()]
if len(local_files) > len(files):
for file in local_files:
if (
file.relative_to(objectpath).as_posix()
not in origin_file_keys
):
file.unlink()
parent = file.parent
if next(parent.iterdir(), None) is None:
parent.rmdir()
return None

# synchronization logic for files
if objectpath.exists():
local_mts = objectpath.stat().st_mtime # type: ignore
need_synchronize = cloud_mts > local_mts
if isinstance(self.fs, AsyncFileSystem):
sync(
self.fs.loop, synchronize_async, self, objectpath, error_no_origin, **kwargs
)
else:
objectpath.parent.mkdir(parents=True, exist_ok=True)
need_synchronize = True
if need_synchronize:
self.download_to(objectpath, **kwargs)
os.utime(objectpath, times=(cloud_mts, cloud_mts))
synchronize_sync(self, objectpath, error_no_origin, **kwargs)

Check warning on line 193 in lamindb_setup/core/upath.py

View check run for this annotation

Codecov / codecov/patch

lamindb_setup/core/upath.py#L193

Added line #L193 was not covered by tests


def modified(self) -> Optional[datetime]:
Expand Down
Loading