From 1ff41152a032fed973b2d9e13f37d79b0f42a377 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sat, 18 May 2024 21:59:52 +0200 Subject: [PATCH] package-indexer: move common logics to base classes Signed-off-by: Attila Szakacs --- packaging/package-indexer/cdn/azure_cdn.py | 7 +- packaging/package-indexer/cdn/cdn.py | 7 +- .../package-indexer/cdn/cloudflare_cdn.py | 10 +- .../azure_container_synchronizer.py | 166 ++-------------- .../remote_storage_synchronizer.py | 182 +++++++++++++++++- .../s3_bucket_synchronizer.py | 164 ++-------------- 6 files changed, 211 insertions(+), 325 deletions(-) diff --git a/packaging/package-indexer/cdn/azure_cdn.py b/packaging/package-indexer/cdn/azure_cdn.py index 9a6d73ff5a..b43e23a034 100644 --- a/packaging/package-indexer/cdn/azure_cdn.py +++ b/packaging/package-indexer/cdn/azure_cdn.py @@ -91,11 +91,8 @@ def from_config(cfg: dict) -> CDN: client_secret=cfg["client-secret"], ) - def refresh_cache(self, path: Path) -> None: + def _refresh_cache(self, path: Path) -> None: path_str = str(Path("/", path)) - - self._log_info("Refreshing CDN cache.", path=path_str) - poller: LROPoller = self.__cdn.endpoints.begin_purge_content( resource_group_name=self.__resource_group_name, profile_name=self.__profile_name, @@ -107,5 +104,3 @@ def refresh_cache(self, path: Path) -> None: status = poller.status() if not status == "Succeeded": raise Exception("Failed to refresh CDN cache. status: {}".format(status)) - - self._log_info("Successfully refreshed CDN cache.", path=path_str) diff --git a/packaging/package-indexer/cdn/cdn.py b/packaging/package-indexer/cdn/cdn.py index cc003cc45a..12a51a7171 100644 --- a/packaging/package-indexer/cdn/cdn.py +++ b/packaging/package-indexer/cdn/cdn.py @@ -42,9 +42,14 @@ def from_config(config: dict) -> CDN: pass @abstractmethod - def refresh_cache(self, path: Path) -> None: + def _refresh_cache(self, path: Path) -> None: pass + def refresh_cache(self, path: Path) -> None: + self._log_info("Refreshing CDN cache.", path=str(path)) + self._refresh_cache(path) + self._log_info("Successfully refreshed CDN cache.", path=str(path)) + @staticmethod def __create_logger() -> logging.Logger: logger = logging.getLogger("CDN") diff --git a/packaging/package-indexer/cdn/cloudflare_cdn.py b/packaging/package-indexer/cdn/cloudflare_cdn.py index f221722919..f2489402cd 100644 --- a/packaging/package-indexer/cdn/cloudflare_cdn.py +++ b/packaging/package-indexer/cdn/cloudflare_cdn.py @@ -57,15 +57,11 @@ def from_config(cfg: dict) -> CDN: api_token=cfg["api-token"], ) - def refresh_cache(self, path: Path) -> None: - self._log_info("Refreshing CDN cache.") - + def _refresh_cache(self, path: Path) -> None: url = f"https://api.cloudflare.com/client/v4/zones/{self.__zone_id}/purge_cache" headers = {"Authorization": f"Bearer {self.__api_token}"} data = {"purge_everything": True} response = requests.post(url, headers=headers, json=data).json() - if response.get("success", False): - self._log_info("Successfully refreshed CDN cache.") - else: - self._log_info("Failed to refresh CDN cache.", response=response) + if not response.get("success", False): + raise Exception("Failed to refresh CDN cache. response: {}".format(response)) diff --git a/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py b/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py index 13303b0a42..9b8b29c23f 100644 --- a/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py +++ b/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py @@ -22,11 +22,11 @@ from hashlib import md5 from pathlib import Path -from typing import List, Optional +from typing import Any, Dict, List from azure.storage.blob import BlobClient, ContainerClient -from .remote_storage_synchronizer import FileSyncState, RemoteStorageSynchronizer +from .remote_storage_synchronizer import RemoteStorageSynchronizer DEFAULT_ROOT_DIR = Path("/tmp/azure_container_synchronizer") @@ -55,7 +55,6 @@ class AzureContainerSynchronizer(RemoteStorageSynchronizer): def __init__(self, connection_string: str, storage_name: str) -> None: self.__client = ContainerClient.from_connection_string(conn_str=connection_string, container_name=storage_name) - self.__remote_files_cache: Optional[List[dict]] = None super().__init__( remote_root_dir=Path(""), local_root_dir=Path(DEFAULT_ROOT_DIR, storage_name), @@ -72,181 +71,44 @@ def from_config(cfg: dict) -> RemoteStorageSynchronizer: storage_name=cfg["storage-name"], ) - @property - def local_files(self) -> List[Path]: - dirs_and_files = list(self.local_dir.working_dir.rglob("*")) - return list(filter(lambda path: path.is_file(), dirs_and_files)) - - @property - def remote_files(self) -> List[dict]: - if self.__remote_files_cache is not None: - return self.__remote_files_cache - + def _list_remote_files(self) -> List[Dict[str, Any]]: file_name_prefix = "{}/".format(self.remote_dir.working_dir) - self.__remote_files_cache = [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)] + return [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)] - return self.__remote_files_cache - - def __download_file(self, relative_file_path: str) -> None: + def _download_file(self, relative_file_path: str) -> None: download_path = Path(self.local_dir.root_dir, relative_file_path).resolve() - - self._log_info( - "Downloading file.", - remote_path=relative_file_path, - local_path=str(download_path), - ) - download_path.parent.mkdir(parents=True, exist_ok=True) with download_path.open("wb") as downloaded_blob: blob_data = self.__client.download_blob(relative_file_path) blob_data.readinto(downloaded_blob) - def __upload_file(self, relative_file_path: str) -> None: + def _upload_file(self, relative_file_path: str) -> None: local_path = Path(self.local_dir.root_dir, relative_file_path) - - self._log_info( - "Uploading file.", - local_path=str(local_path), - remote_path=relative_file_path, - ) - with local_path.open("rb") as local_file_data: self.__client.upload_blob(relative_file_path, local_file_data, overwrite=True) - def __delete_local_file(self, relative_file_path: str) -> None: - local_file_path = Path(self.local_dir.root_dir, relative_file_path).resolve() - self._log_info("Deleting local file.", local_path=str(local_file_path)) - local_file_path.unlink() - - def __delete_remote_file(self, relative_file_path: str) -> None: - self._log_info("Deleting remote file.", remote_path=relative_file_path) + def _delete_remote_file(self, relative_file_path: str) -> None: self.__client.delete_blob(relative_file_path, delete_snapshots="include") - def sync_from_remote(self) -> None: - self._log_info( - "Syncing content from remote.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - for file in self.__all_files: - sync_state = self.__get_file_sync_state(file) - if sync_state == FileSyncState.IN_SYNC: - continue - if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_LOCAL: - self.__download_file(file) - continue - if sync_state == FileSyncState.NOT_IN_REMOTE: - self.__delete_local_file(file) - continue - raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) - self._log_info( - "Successfully synced remote content.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - - def sync_to_remote(self) -> None: - self._log_info( - "Syncing content to remote.", - local_workdir=str(self.local_dir.working_dir), - remote_workdir=str(self.remote_dir.working_dir), - ) - for file in self.__all_files: - sync_state = self.__get_file_sync_state(file) - if sync_state == FileSyncState.IN_SYNC: - continue - if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_REMOTE: - self.__upload_file(file) - continue - if sync_state == FileSyncState.NOT_IN_LOCAL: - self.__delete_remote_file(file) - continue - raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) - self._log_info( - "Successfully synced local content.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - self.__invalidate_remote_files_cache() - - def create_snapshot_of_remote(self) -> None: - self._log_info("Creating snapshot of the remote container.") - for file in self.remote_files: + def _create_snapshot_of_remote(self) -> None: + for file in self._remote_files: blob_client: BlobClient = self.__client.get_blob_client(file["name"]) snapshot_properties = blob_client.create_snapshot() self._log_debug( "Successfully created snapshot of remote file.", - remote_path=self.__get_relative_file_path_for_remote_file(file), - snapshot_properties=snapshot_properties, + remote_path=self._get_relative_file_path_for_remote_file(file), + snapshot_properties=str(snapshot_properties), ) - def __get_md5_of_remote_file(self, relative_file_path: str) -> bytearray: - for file in self.remote_files: + def _get_md5_of_remote_file(self, relative_file_path: str) -> bytes: + for file in self._remote_files: if file["name"] == relative_file_path: return file["content_settings"]["content_md5"] raise FileNotFoundError - def __get_md5_of_local_file(self, relative_file_path: str) -> bytes: - file = Path(self.local_dir.root_dir, relative_file_path) - return md5(file.read_bytes()).digest() - - def __get_file_sync_state(self, relative_file_path: str) -> FileSyncState: - try: - remote_md5 = self.__get_md5_of_remote_file(relative_file_path) - except FileNotFoundError: - self._log_debug( - "Local file is not available remotely.", - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - unavailable_remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - ) - return FileSyncState.NOT_IN_REMOTE - - try: - local_md5 = self.__get_md5_of_local_file(relative_file_path) - except FileNotFoundError: - self._log_debug( - "Remote file is not available locally.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - unavailable_local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - ) - return FileSyncState.NOT_IN_LOCAL - - if remote_md5 != local_md5: - self._log_debug( - "File differs locally and remotely.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - remote_md5sum=remote_md5.hex(), - local_md5sum=local_md5.hex(), - ) - return FileSyncState.DIFFERENT - - self._log_debug( - "File is in sync.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - md5sum=remote_md5.hex(), - ) - return FileSyncState.IN_SYNC - - def __get_relative_file_path_for_local_file(self, file: Path) -> str: - return str(file.relative_to(self.local_dir.root_dir)) - - def __get_relative_file_path_for_remote_file(self, file: dict) -> str: + def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str: return file["name"] - @property - def __all_files(self) -> List[str]: - files = set() - for local_file in self.local_files: - files.add(self.__get_relative_file_path_for_local_file(local_file)) - for remote_file in self.remote_files: - files.add(self.__get_relative_file_path_for_remote_file(remote_file)) - return sorted(files) - - def __invalidate_remote_files_cache(self) -> None: - self.__remote_files_cache = None - def _prepare_log(self, message: str, **kwargs: str) -> str: log = "[{} :: {}]\t{}".format(self.__client.container_name, str(self.remote_dir.working_dir), message) return super()._prepare_log(log, **kwargs) diff --git a/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py b/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py index 3128a0d62d..5b69943c26 100644 --- a/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py +++ b/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py @@ -1,5 +1,6 @@ ############################################################################# # Copyright (c) 2022 One Identity +# Copyright (c) 2024 Attila Szakacs # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License version 2 as published @@ -22,10 +23,13 @@ from __future__ import annotations + import logging from abc import ABC, abstractmethod from enum import Enum, auto +from hashlib import md5 from pathlib import Path +from typing import Any, Dict, List, Optional class WorkingDir: @@ -57,8 +61,13 @@ def __init__(self, remote_root_dir: Path, local_root_dir: Path) -> None: self.remote_dir = WorkingDir(remote_root_dir) self.local_dir = WorkingDir(local_root_dir) + self.__remote_files_cache: Optional[List[dict]] = None self.__logger = RemoteStorageSynchronizer.__create_logger() + def set_sub_dir(self, sub_dir: Path) -> None: + self.remote_dir.set_sub_dir(sub_dir) + self.local_dir.set_sub_dir(sub_dir) + @staticmethod @abstractmethod def get_config_keyword() -> str: @@ -69,21 +78,182 @@ def get_config_keyword() -> str: def from_config(cfg: dict) -> RemoteStorageSynchronizer: pass - @abstractmethod def sync_from_remote(self) -> None: + self._log_info( + "Syncing content from remote.", + remote_workdir=str(self.remote_dir.working_dir), + local_workdir=str(self.local_dir.working_dir), + ) + for file in self._all_files: + sync_state = self.__get_file_sync_state(file) + if sync_state == FileSyncState.IN_SYNC: + continue + if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_LOCAL: + self.__download_file(file) + continue + if sync_state == FileSyncState.NOT_IN_REMOTE: + self.__delete_local_file(file) + continue + raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) + self._log_info( + "Successfully synced remote content.", + remote_workdir=str(self.remote_dir.working_dir), + local_workdir=str(self.local_dir.working_dir), + ) + + def sync_to_remote(self) -> None: + self._log_info( + "Syncing content to remote.", + local_workdir=str(self.local_dir.working_dir), + remote_workdir=str(self.remote_dir.working_dir), + ) + for file in self._all_files: + sync_state = self.__get_file_sync_state(file) + if sync_state == FileSyncState.IN_SYNC: + continue + if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_REMOTE: + self.__upload_file(file) + continue + if sync_state == FileSyncState.NOT_IN_LOCAL: + self.__delete_remote_file(file) + continue + raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) + self._log_info( + "Successfully synced local content.", + remote_workdir=str(self.remote_dir.working_dir), + local_workdir=str(self.local_dir.working_dir), + ) + self.__invalidate_remote_files_cache() + + @abstractmethod + def _create_snapshot_of_remote(self) -> None: pass + def create_snapshot_of_remote(self) -> None: + self._log_info("Creating snapshot of remote") + self._create_snapshot_of_remote() + + @property + def _local_files(self) -> List[Path]: + dirs_and_files = list(self.local_dir.working_dir.rglob("*")) + return list(filter(lambda path: path.is_file(), dirs_and_files)) + @abstractmethod - def sync_to_remote(self) -> None: + def _list_remote_files(self) -> List[Dict[str, Any]]: pass + @property + def _remote_files(self) -> List[Dict[str, Any]]: + if self.__remote_files_cache is not None: + return self.__remote_files_cache + + self.__remote_files_cache = self._list_remote_files() + return self.__remote_files_cache + + def __invalidate_remote_files_cache(self) -> None: + self.__remote_files_cache = None + + @property + def _all_files(self) -> List[str]: + files = set() + for local_file in self._local_files: + files.add(self.__get_relative_file_path_for_local_file(local_file)) + for remote_file in self._remote_files: + files.add(self._get_relative_file_path_for_remote_file(remote_file)) + return sorted(files) + @abstractmethod - def create_snapshot_of_remote(self) -> None: + def _download_file(self, relative_file_path: str) -> None: pass - def set_sub_dir(self, sub_dir: Path) -> None: - self.remote_dir.set_sub_dir(sub_dir) - self.local_dir.set_sub_dir(sub_dir) + def __download_file(self, relative_file_path: str) -> None: + download_path = self._get_local_file_path_for_relative_file(relative_file_path) + self._log_info("Downloading file.", remote_path=relative_file_path, local_path=str(download_path)) + self._download_file(relative_file_path) + self._log_info("Successfully downloaded file.", remote_path=relative_file_path, local_path=str(download_path)) + + @abstractmethod + def _upload_file(self, relative_file_path: str) -> None: + pass + + def __upload_file(self, relative_file_path: str) -> None: + local_path = self._get_local_file_path_for_relative_file(relative_file_path) + + self._log_info("Uploading file.", local_path=str(local_path), remote_path=relative_file_path) + self._upload_file(relative_file_path) + self._log_info("Successfully uploaded file.", remote_path=relative_file_path, local_path=str(local_path)) + + def __delete_local_file(self, relative_file_path: str) -> None: + local_file_path = Path(self.local_dir.root_dir, relative_file_path).resolve() + self._log_info("Deleting local file.", local_path=str(local_file_path)) + local_file_path.unlink() + self._log_info("Successfully deleted local file.", local_path=str(local_file_path)) + + @abstractmethod + def _delete_remote_file(self, relative_file_path: str) -> None: + pass + + def __delete_remote_file(self, relative_file_path: str) -> None: + self._log_info("Deleting remote file.", remote_path=relative_file_path) + self._delete_remote_file(relative_file_path) + self._log_info("Successfully deleted remote file.", remote_path=relative_file_path) + + def __get_relative_file_path_for_local_file(self, file: Path) -> str: + return str(file.relative_to(self.local_dir.root_dir)) + + @abstractmethod + def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str: + pass + + def _get_local_file_path_for_relative_file(self, relative_file_path: str) -> Path: + return Path(self.local_dir.root_dir, relative_file_path).resolve() + + def __get_md5_of_local_file(self, relative_file_path: str) -> bytes: + file = Path(self.local_dir.root_dir, relative_file_path) + return md5(file.read_bytes()).digest() + + @abstractmethod + def _get_md5_of_remote_file(self, relative_file_path: str) -> bytes: + pass + + def __get_file_sync_state(self, relative_file_path: str) -> FileSyncState: + try: + local_md5 = self.__get_md5_of_local_file(relative_file_path) + except FileNotFoundError: + self._log_debug( + "Remote file is not available locally.", + remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + unavailable_local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + ) + return FileSyncState.NOT_IN_LOCAL + + try: + remote_md5 = self._get_md5_of_remote_file(relative_file_path) + except FileNotFoundError: + self._log_debug( + "Local file is not available remotely.", + local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + unavailable_remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + ) + return FileSyncState.NOT_IN_REMOTE + + if remote_md5 != local_md5: + self._log_debug( + "File differs locally and remotely.", + remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + remote_md5sum=remote_md5.hex(), + local_md5sum=local_md5.hex(), + ) + return FileSyncState.DIFFERENT + + self._log_debug( + "File is in sync.", + remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + md5sum=remote_md5.hex(), + ) + return FileSyncState.IN_SYNC @staticmethod def __create_logger() -> logging.Logger: diff --git a/packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py b/packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py index cf584a021b..0c05ed0ef8 100644 --- a/packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py +++ b/packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py @@ -22,12 +22,12 @@ from hashlib import md5 from pathlib import Path -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List from boto3 import Session from botocore.exceptions import ClientError, EndpointConnectionError -from .remote_storage_synchronizer import FileSyncState, RemoteStorageSynchronizer +from .remote_storage_synchronizer import RemoteStorageSynchronizer DEFAULT_ROOT_DIR = Path("/tmp/s3_bucket_synchronizer") @@ -69,7 +69,6 @@ def __init__(self, access_key: str, secret_key: str, endpoint: str, bucket: str) service_name="s3", endpoint_url=endpoint, ) - self.__remote_files_cache: Optional[List[dict]] = None self.__bucket = bucket super().__init__( remote_root_dir=Path(""), @@ -89,14 +88,7 @@ def from_config(cfg: dict) -> RemoteStorageSynchronizer: bucket=cfg["bucket"], ) - @property - def local_files(self) -> List[Path]: - dirs_and_files = list(self.local_dir.working_dir.rglob("*")) - return list( - filter(lambda path: path.is_file() and not path.name.endswith(".package-indexer-md5sum"), dirs_and_files) - ) - - def __list_existing_objects(self) -> List[Dict[str, Any]]: + def _list_remote_files(self) -> List[Dict[str, Any]]: objects: List[Dict[str, Any]] = [] pagination_options: Dict[str, str] = {} @@ -124,27 +116,12 @@ def __list_existing_objects(self) -> List[Dict[str, Any]]: return objects - @property - def remote_files(self) -> List[Dict[str, Any]]: - if self.__remote_files_cache is not None: - return self.__remote_files_cache - - self.__remote_files_cache = self.__list_existing_objects() - return self.__remote_files_cache - def __get_remote_md5sum_file_path(self, relative_file_path: str) -> Path: path = Path(self.local_dir.root_dir, "package-indexer-md5sums", relative_file_path).resolve() return Path(path.parent, path.name + ".package-indexer-md5sum") - def __download_file(self, relative_file_path: str) -> None: - download_path = Path(self.local_dir.root_dir, relative_file_path).resolve() - - self._log_info( - "Downloading file.", - remote_path=relative_file_path, - local_path=str(download_path), - ) - + def _download_file(self, relative_file_path: str) -> None: + download_path = self._get_local_file_path_for_relative_file(relative_file_path) download_path.parent.mkdir(parents=True, exist_ok=True) with download_path.open("wb") as downloaded_object: self.__client.download_fileobj(self.__bucket, relative_file_path, downloaded_object) @@ -154,143 +131,24 @@ def __download_file(self, relative_file_path: str) -> None: md5sum_file_path.parent.mkdir(exist_ok=True, parents=True) md5sum_file_path.write_bytes(md5sum) - def __upload_file(self, relative_file_path: str) -> None: - local_path = Path(self.local_dir.root_dir, relative_file_path) - - self._log_info( - "Uploading file.", - local_path=str(local_path), - remote_path=relative_file_path, - ) - + def _upload_file(self, relative_file_path: str) -> None: + local_path = self._get_local_file_path_for_relative_file(relative_file_path) with local_path.open("rb") as local_file_data: self.__client.upload_fileobj(local_file_data, self.__bucket, relative_file_path) - def __delete_local_file(self, relative_file_path: str) -> None: - local_file_path = Path(self.local_dir.root_dir, relative_file_path).resolve() - self._log_info("Deleting local file.", local_path=str(local_file_path)) - local_file_path.unlink() - - def __delete_remote_file(self, relative_file_path: str) -> None: - self._log_info("Deleting remote file.", remote_path=relative_file_path) + def _delete_remote_file(self, relative_file_path: str) -> None: self.__client.delete_object(Bucket=self.__bucket, Key=relative_file_path) - def sync_from_remote(self) -> None: - self._log_info( - "Syncing content from remote.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - for file in self.__all_files: - - sync_state = self.__get_file_sync_state(file) - if sync_state == FileSyncState.IN_SYNC: - continue - if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_LOCAL: - self.__download_file(file) - continue - if sync_state == FileSyncState.NOT_IN_REMOTE: - self.__delete_local_file(file) - continue - raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) - self._log_info( - "Successfully synced remote content.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - - def sync_to_remote(self) -> None: - self._log_info( - "Syncing content to remote.", - local_workdir=str(self.local_dir.working_dir), - remote_workdir=str(self.remote_dir.working_dir), - ) - for file in self.__all_files: - sync_state = self.__get_file_sync_state(file) - if sync_state == FileSyncState.IN_SYNC: - continue - if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_REMOTE: - self.__upload_file(file) - continue - if sync_state == FileSyncState.NOT_IN_LOCAL: - self.__delete_remote_file(file) - continue - raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) - self._log_info( - "Successfully synced local content.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - self.__invalidate_remote_files_cache() - - def create_snapshot_of_remote(self) -> None: + def _create_snapshot_of_remote(self) -> None: self._log_info("Cannot create snapshot, not implemented, skipping...") - def __get_md5_of_remote_file(self, relative_file_path: str) -> bytes: + def _get_md5_of_remote_file(self, relative_file_path: str) -> bytes: md5sum_file_path = self.__get_remote_md5sum_file_path(relative_file_path) return md5sum_file_path.read_bytes() - def __get_md5_of_local_file(self, relative_file_path: str) -> bytes: - file = Path(self.local_dir.root_dir, relative_file_path) - return md5(file.read_bytes()).digest() - - def __get_file_sync_state(self, relative_file_path: str) -> FileSyncState: - try: - local_md5 = self.__get_md5_of_local_file(relative_file_path) - except FileNotFoundError: - self._log_debug( - "Remote file is not available locally.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - unavailable_local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - ) - return FileSyncState.NOT_IN_LOCAL - - try: - remote_md5 = self.__get_md5_of_remote_file(relative_file_path) - except FileNotFoundError: - self._log_debug( - "Local file is not available remotely.", - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - unavailable_remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - ) - return FileSyncState.NOT_IN_REMOTE - - if remote_md5 != local_md5: - self._log_debug( - "File differs locally and remotely.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - remote_md5sum=remote_md5.hex(), - local_md5sum=local_md5.hex(), - ) - return FileSyncState.DIFFERENT - - self._log_debug( - "File is in sync.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - md5sum=remote_md5.hex(), - ) - return FileSyncState.IN_SYNC - - def __get_relative_file_path_for_local_file(self, file: Path) -> str: - return str(file.relative_to(self.local_dir.root_dir)) - - def __get_relative_file_path_for_remote_file(self, file: dict) -> str: + def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str: return file["Key"] - @property - def __all_files(self) -> List[str]: - files = set() - for local_file in self.local_files: - files.add(self.__get_relative_file_path_for_local_file(local_file)) - for remote_file in self.remote_files: - files.add(self.__get_relative_file_path_for_remote_file(remote_file)) - return sorted(files) - - def __invalidate_remote_files_cache(self) -> None: - self.__remote_files_cache = None - def _prepare_log(self, message: str, **kwargs: str) -> str: log = "[{} :: {}]\t{}".format(self.__bucket, str(self.remote_dir.working_dir), message) return super()._prepare_log(log, **kwargs)