Skip to content

Commit

Permalink
package-indexer: move common logics to base classes
Browse files Browse the repository at this point in the history
Signed-off-by: Attila Szakacs <[email protected]>
  • Loading branch information
alltilla committed May 18, 2024
1 parent 3b33301 commit 1ff4115
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 325 deletions.
7 changes: 1 addition & 6 deletions packaging/package-indexer/cdn/azure_cdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ def from_config(cfg: dict) -> CDN:
client_secret=cfg["client-secret"],
)

def refresh_cache(self, path: Path) -> None:
def _refresh_cache(self, path: Path) -> None:
path_str = str(Path("/", path))

self._log_info("Refreshing CDN cache.", path=path_str)

poller: LROPoller = self.__cdn.endpoints.begin_purge_content(
resource_group_name=self.__resource_group_name,
profile_name=self.__profile_name,
Expand All @@ -107,5 +104,3 @@ def refresh_cache(self, path: Path) -> None:
status = poller.status()
if not status == "Succeeded":
raise Exception("Failed to refresh CDN cache. status: {}".format(status))

self._log_info("Successfully refreshed CDN cache.", path=path_str)
7 changes: 6 additions & 1 deletion packaging/package-indexer/cdn/cdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ def from_config(config: dict) -> CDN:
pass

@abstractmethod
def refresh_cache(self, path: Path) -> None:
def _refresh_cache(self, path: Path) -> None:
pass

def refresh_cache(self, path: Path) -> None:
self._log_info("Refreshing CDN cache.", path=str(path))
self._refresh_cache(path)
self._log_info("Successfully refreshed CDN cache.", path=str(path))

@staticmethod
def __create_logger() -> logging.Logger:
logger = logging.getLogger("CDN")
Expand Down
10 changes: 3 additions & 7 deletions packaging/package-indexer/cdn/cloudflare_cdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,11 @@ def from_config(cfg: dict) -> CDN:
api_token=cfg["api-token"],
)

def refresh_cache(self, path: Path) -> None:
self._log_info("Refreshing CDN cache.")

def _refresh_cache(self, path: Path) -> None:
url = f"https://api.cloudflare.com/client/v4/zones/{self.__zone_id}/purge_cache"
headers = {"Authorization": f"Bearer {self.__api_token}"}
data = {"purge_everything": True}

response = requests.post(url, headers=headers, json=data).json()
if response.get("success", False):
self._log_info("Successfully refreshed CDN cache.")
else:
self._log_info("Failed to refresh CDN cache.", response=response)
if not response.get("success", False):
raise Exception("Failed to refresh CDN cache. response: {}".format(response))
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

from hashlib import md5
from pathlib import Path
from typing import List, Optional
from typing import Any, Dict, List

from azure.storage.blob import BlobClient, ContainerClient

from .remote_storage_synchronizer import FileSyncState, RemoteStorageSynchronizer
from .remote_storage_synchronizer import RemoteStorageSynchronizer

DEFAULT_ROOT_DIR = Path("/tmp/azure_container_synchronizer")

Expand Down Expand Up @@ -55,7 +55,6 @@ class AzureContainerSynchronizer(RemoteStorageSynchronizer):

def __init__(self, connection_string: str, storage_name: str) -> None:
self.__client = ContainerClient.from_connection_string(conn_str=connection_string, container_name=storage_name)
self.__remote_files_cache: Optional[List[dict]] = None
super().__init__(
remote_root_dir=Path(""),
local_root_dir=Path(DEFAULT_ROOT_DIR, storage_name),
Expand All @@ -72,181 +71,44 @@ def from_config(cfg: dict) -> RemoteStorageSynchronizer:
storage_name=cfg["storage-name"],
)

@property
def local_files(self) -> List[Path]:
dirs_and_files = list(self.local_dir.working_dir.rglob("*"))
return list(filter(lambda path: path.is_file(), dirs_and_files))

@property
def remote_files(self) -> List[dict]:
if self.__remote_files_cache is not None:
return self.__remote_files_cache

def _list_remote_files(self) -> List[Dict[str, Any]]:
file_name_prefix = "{}/".format(self.remote_dir.working_dir)
self.__remote_files_cache = [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)]
return [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)]

return self.__remote_files_cache

def __download_file(self, relative_file_path: str) -> None:
def _download_file(self, relative_file_path: str) -> None:
download_path = Path(self.local_dir.root_dir, relative_file_path).resolve()

self._log_info(
"Downloading file.",
remote_path=relative_file_path,
local_path=str(download_path),
)

download_path.parent.mkdir(parents=True, exist_ok=True)
with download_path.open("wb") as downloaded_blob:
blob_data = self.__client.download_blob(relative_file_path)
blob_data.readinto(downloaded_blob)

def __upload_file(self, relative_file_path: str) -> None:
def _upload_file(self, relative_file_path: str) -> None:
local_path = Path(self.local_dir.root_dir, relative_file_path)

self._log_info(
"Uploading file.",
local_path=str(local_path),
remote_path=relative_file_path,
)

with local_path.open("rb") as local_file_data:
self.__client.upload_blob(relative_file_path, local_file_data, overwrite=True)

def __delete_local_file(self, relative_file_path: str) -> None:
local_file_path = Path(self.local_dir.root_dir, relative_file_path).resolve()
self._log_info("Deleting local file.", local_path=str(local_file_path))
local_file_path.unlink()

def __delete_remote_file(self, relative_file_path: str) -> None:
self._log_info("Deleting remote file.", remote_path=relative_file_path)
def _delete_remote_file(self, relative_file_path: str) -> None:
self.__client.delete_blob(relative_file_path, delete_snapshots="include")

def sync_from_remote(self) -> None:
self._log_info(
"Syncing content from remote.",
remote_workdir=str(self.remote_dir.working_dir),
local_workdir=str(self.local_dir.working_dir),
)
for file in self.__all_files:
sync_state = self.__get_file_sync_state(file)
if sync_state == FileSyncState.IN_SYNC:
continue
if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_LOCAL:
self.__download_file(file)
continue
if sync_state == FileSyncState.NOT_IN_REMOTE:
self.__delete_local_file(file)
continue
raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state))
self._log_info(
"Successfully synced remote content.",
remote_workdir=str(self.remote_dir.working_dir),
local_workdir=str(self.local_dir.working_dir),
)

def sync_to_remote(self) -> None:
self._log_info(
"Syncing content to remote.",
local_workdir=str(self.local_dir.working_dir),
remote_workdir=str(self.remote_dir.working_dir),
)
for file in self.__all_files:
sync_state = self.__get_file_sync_state(file)
if sync_state == FileSyncState.IN_SYNC:
continue
if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_REMOTE:
self.__upload_file(file)
continue
if sync_state == FileSyncState.NOT_IN_LOCAL:
self.__delete_remote_file(file)
continue
raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state))
self._log_info(
"Successfully synced local content.",
remote_workdir=str(self.remote_dir.working_dir),
local_workdir=str(self.local_dir.working_dir),
)
self.__invalidate_remote_files_cache()

def create_snapshot_of_remote(self) -> None:
self._log_info("Creating snapshot of the remote container.")
for file in self.remote_files:
def _create_snapshot_of_remote(self) -> None:
for file in self._remote_files:
blob_client: BlobClient = self.__client.get_blob_client(file["name"])
snapshot_properties = blob_client.create_snapshot()
self._log_debug(
"Successfully created snapshot of remote file.",
remote_path=self.__get_relative_file_path_for_remote_file(file),
snapshot_properties=snapshot_properties,
remote_path=self._get_relative_file_path_for_remote_file(file),
snapshot_properties=str(snapshot_properties),
)

def __get_md5_of_remote_file(self, relative_file_path: str) -> bytearray:
for file in self.remote_files:
def _get_md5_of_remote_file(self, relative_file_path: str) -> bytes:
for file in self._remote_files:
if file["name"] == relative_file_path:
return file["content_settings"]["content_md5"]
raise FileNotFoundError

def __get_md5_of_local_file(self, relative_file_path: str) -> bytes:
file = Path(self.local_dir.root_dir, relative_file_path)
return md5(file.read_bytes()).digest()

def __get_file_sync_state(self, relative_file_path: str) -> FileSyncState:
try:
remote_md5 = self.__get_md5_of_remote_file(relative_file_path)
except FileNotFoundError:
self._log_debug(
"Local file is not available remotely.",
local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
unavailable_remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
)
return FileSyncState.NOT_IN_REMOTE

try:
local_md5 = self.__get_md5_of_local_file(relative_file_path)
except FileNotFoundError:
self._log_debug(
"Remote file is not available locally.",
remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
unavailable_local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
)
return FileSyncState.NOT_IN_LOCAL

if remote_md5 != local_md5:
self._log_debug(
"File differs locally and remotely.",
remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
remote_md5sum=remote_md5.hex(),
local_md5sum=local_md5.hex(),
)
return FileSyncState.DIFFERENT

self._log_debug(
"File is in sync.",
remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
md5sum=remote_md5.hex(),
)
return FileSyncState.IN_SYNC

def __get_relative_file_path_for_local_file(self, file: Path) -> str:
return str(file.relative_to(self.local_dir.root_dir))

def __get_relative_file_path_for_remote_file(self, file: dict) -> str:
def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str:
return file["name"]

@property
def __all_files(self) -> List[str]:
files = set()
for local_file in self.local_files:
files.add(self.__get_relative_file_path_for_local_file(local_file))
for remote_file in self.remote_files:
files.add(self.__get_relative_file_path_for_remote_file(remote_file))
return sorted(files)

def __invalidate_remote_files_cache(self) -> None:
self.__remote_files_cache = None

def _prepare_log(self, message: str, **kwargs: str) -> str:
log = "[{} :: {}]\t{}".format(self.__client.container_name, str(self.remote_dir.working_dir), message)
return super()._prepare_log(log, **kwargs)
Loading

0 comments on commit 1ff4115

Please sign in to comment.