Skip to content

Commit

Permalink
Merge pull request #107 from alltilla/package-indexer-s3-and-cloudflare
Browse files Browse the repository at this point in the history
package-indexer: S3 and Cloudflare
  • Loading branch information
MrAnno authored May 19, 2024
2 parents 2fb304d + ec928bd commit 6f506d1
Show file tree
Hide file tree
Showing 12 changed files with 436 additions and 171 deletions.
4 changes: 3 additions & 1 deletion packaging/package-indexer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ RUN pip install \
azure-mgmt-cdn \
azure-identity \
pyyaml \
types-PyYAML
types-PyYAML \
boto3 \
types-requests
2 changes: 2 additions & 0 deletions packaging/package-indexer/cdn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

from .cdn import CDN
from .azure_cdn import AzureCDN
from .cloudflare_cdn import CloudflareCDN

__all__ = [
"CDN",
"AzureCDN",
"CloudflareCDN",
"get_implementations",
]

Expand Down
7 changes: 1 addition & 6 deletions packaging/package-indexer/cdn/azure_cdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ def from_config(cfg: dict) -> CDN:
client_secret=cfg["client-secret"],
)

def refresh_cache(self, path: Path) -> None:
def _refresh_cache(self, path: Path) -> None:
path_str = str(Path("/", path))

self._log_info("Refreshing CDN cache.", path=path_str)

poller: LROPoller = self.__cdn.endpoints.begin_purge_content(
resource_group_name=self.__resource_group_name,
profile_name=self.__profile_name,
Expand All @@ -107,5 +104,3 @@ def refresh_cache(self, path: Path) -> None:
status = poller.status()
if not status == "Succeeded":
raise Exception("Failed to refresh CDN cache. status: {}".format(status))

self._log_info("Successfully refreshed CDN cache.", path=path_str)
7 changes: 6 additions & 1 deletion packaging/package-indexer/cdn/cdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ def from_config(config: dict) -> CDN:
pass

@abstractmethod
def refresh_cache(self, path: Path) -> None:
def _refresh_cache(self, path: Path) -> None:
pass

def refresh_cache(self, path: Path) -> None:
self._log_info("Refreshing CDN cache.", path=str(path))
self._refresh_cache(path)
self._log_info("Successfully refreshed CDN cache.", path=str(path))

@staticmethod
def __create_logger() -> logging.Logger:
logger = logging.getLogger("CDN")
Expand Down
67 changes: 67 additions & 0 deletions packaging/package-indexer/cdn/cloudflare_cdn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#############################################################################
# Copyright (c) 2024 Attila Szakacs
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 2 as published
# by the Free Software Foundation, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# As an additional exemption you are allowed to compile & link against the
# OpenSSL libraries as published by the OpenSSL project. See the file
# COPYING for details.
#
#############################################################################

import requests
from pathlib import Path

from .cdn import CDN


class CloudflareCDN(CDN):
"""
A `CDN` implementation that can connect to a Cloudflare CDN instance.
Example config:
```yaml
vendor: "cloudflare"
all:
zone-id: "secret1"
api-token: "secret2"
```
"""

def __init__(self, zone_id: str, api_token: str) -> None:
self.__zone_id = zone_id
self.__api_token = api_token

super().__init__()

@staticmethod
def get_config_keyword() -> str:
return "cloudflare"

@staticmethod
def from_config(cfg: dict) -> CDN:
return CloudflareCDN(
zone_id=cfg["zone-id"],
api_token=cfg["api-token"],
)

def _refresh_cache(self, path: Path) -> None:
url = f"https://api.cloudflare.com/client/v4/zones/{self.__zone_id}/purge_cache"
headers = {"Authorization": f"Bearer {self.__api_token}"}
data = {"purge_everything": True}

response = requests.post(url, headers=headers, json=data).json()
if not response.get("success", False):
raise Exception("Failed to refresh CDN cache. response: {}".format(response))
2 changes: 2 additions & 0 deletions packaging/package-indexer/indexer/deb_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __create_packages_files(self, indexed_dir: Path) -> None:
command = base_command + [str(relative_pkg_dir)]

packages_file_path = Path(pkg_dir, "Packages")
packages_file_path.parent.mkdir(parents=True, exist_ok=True)
with packages_file_path.open("w") as packages_file:
self._log_info("Creating `Packages` file.", packages_file_path=str(packages_file_path))
utils.execute_command(command, dir=dir, stdout=packages_file)
Expand All @@ -111,6 +112,7 @@ def __create_release_file(self, indexed_dir: Path) -> None:
command = ["apt-ftparchive", "release", "."]

release_file_path = Path(indexed_dir, "Release")
release_file_path.parent.mkdir(parents=True, exist_ok=True)
with release_file_path.open("w") as release_file:
self._log_info("Creating `Release` file.", release_file_path=str(release_file_path))
utils.execute_command(
Expand Down
10 changes: 10 additions & 0 deletions packaging/package-indexer/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ disallow_untyped_defs = true
module = "azure.*"
follow_imports = "skip"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "boto3.*"
follow_imports = "skip"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "botocore.*"
follow_imports = "skip"
ignore_missing_imports = true
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

from .remote_storage_synchronizer import RemoteStorageSynchronizer
from .azure_container_synchronizer import AzureContainerSynchronizer
from .s3_bucket_synchronizer import S3BucketSynchronizer

__all__ = [
"RemoteStorageSynchronizer",
"AzureContainerSynchronizer",
"S3BucketSynchronizer",
"get_implementations",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
#
#############################################################################

from hashlib import md5
from pathlib import Path
from typing import List, Optional
from typing import Any, Dict, List

from azure.storage.blob import BlobClient, ContainerClient

from .remote_storage_synchronizer import FileSyncState, RemoteStorageSynchronizer
from .remote_storage_synchronizer import RemoteStorageSynchronizer

DEFAULT_ROOT_DIR = Path("/tmp/azure_container_synchronizer")

Expand Down Expand Up @@ -55,7 +54,6 @@ class AzureContainerSynchronizer(RemoteStorageSynchronizer):

def __init__(self, connection_string: str, storage_name: str) -> None:
self.__client = ContainerClient.from_connection_string(conn_str=connection_string, container_name=storage_name)
self.__remote_files_cache: Optional[List[dict]] = None
super().__init__(
remote_root_dir=Path(""),
local_root_dir=Path(DEFAULT_ROOT_DIR, storage_name),
Expand All @@ -72,181 +70,38 @@ def from_config(cfg: dict) -> RemoteStorageSynchronizer:
storage_name=cfg["storage-name"],
)

@property
def local_files(self) -> List[Path]:
dirs_and_files = list(self.local_dir.working_dir.rglob("*"))
return list(filter(lambda path: path.is_file(), dirs_and_files))

@property
def remote_files(self) -> List[dict]:
if self.__remote_files_cache is not None:
return self.__remote_files_cache

def _list_remote_files(self) -> List[Dict[str, Any]]:
file_name_prefix = "{}/".format(self.remote_dir.working_dir)
self.__remote_files_cache = [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)]
return [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)]

return self.__remote_files_cache

def __download_file(self, relative_file_path: str) -> None:
def _download_file(self, relative_file_path: str) -> None:
download_path = Path(self.local_dir.root_dir, relative_file_path).resolve()

self._log_info(
"Downloading file.",
remote_path=relative_file_path,
local_path=str(download_path),
)

download_path.parent.mkdir(parents=True, exist_ok=True)
with download_path.open("wb") as downloaded_blob:
blob_data = self.__client.download_blob(relative_file_path)
blob_data.readinto(downloaded_blob)

def __upload_file(self, relative_file_path: str) -> None:
def _upload_file(self, relative_file_path: str) -> None:
local_path = Path(self.local_dir.root_dir, relative_file_path)

self._log_info(
"Uploading file.",
local_path=str(local_path),
remote_path=relative_file_path,
)

with local_path.open("rb") as local_file_data:
self.__client.upload_blob(relative_file_path, local_file_data, overwrite=True)

def __delete_local_file(self, relative_file_path: str) -> None:
local_file_path = Path(self.local_dir.root_dir, relative_file_path).resolve()
self._log_info("Deleting local file.", local_path=str(local_file_path))
local_file_path.unlink()

def __delete_remote_file(self, relative_file_path: str) -> None:
self._log_info("Deleting remote file.", remote_path=relative_file_path)
def _delete_remote_file(self, relative_file_path: str) -> None:
self.__client.delete_blob(relative_file_path, delete_snapshots="include")

def sync_from_remote(self) -> None:
self._log_info(
"Syncing content from remote.",
remote_workdir=str(self.remote_dir.working_dir),
local_workdir=str(self.local_dir.working_dir),
)
for file in self.__all_files:
sync_state = self.__get_file_sync_state(file)
if sync_state == FileSyncState.IN_SYNC:
continue
if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_LOCAL:
self.__download_file(file)
continue
if sync_state == FileSyncState.NOT_IN_REMOTE:
self.__delete_local_file(file)
continue
raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state))
self._log_info(
"Successfully synced remote content.",
remote_workdir=str(self.remote_dir.working_dir),
local_workdir=str(self.local_dir.working_dir),
)

def sync_to_remote(self) -> None:
self._log_info(
"Syncing content to remote.",
local_workdir=str(self.local_dir.working_dir),
remote_workdir=str(self.remote_dir.working_dir),
)
for file in self.__all_files:
sync_state = self.__get_file_sync_state(file)
if sync_state == FileSyncState.IN_SYNC:
continue
if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_REMOTE:
self.__upload_file(file)
continue
if sync_state == FileSyncState.NOT_IN_LOCAL:
self.__delete_remote_file(file)
continue
raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state))
self._log_info(
"Successfully synced local content.",
remote_workdir=str(self.remote_dir.working_dir),
local_workdir=str(self.local_dir.working_dir),
)
self.__invalidate_remote_files_cache()

def create_snapshot_of_remote(self) -> None:
self._log_info("Creating snapshot of the remote container.")
for file in self.remote_files:
def _create_snapshot_of_remote(self) -> None:
for file in self._remote_files:
blob_client: BlobClient = self.__client.get_blob_client(file["name"])
snapshot_properties = blob_client.create_snapshot()
self._log_debug(
"Successfully created snapshot of remote file.",
remote_path=self.__get_relative_file_path_for_remote_file(file),
snapshot_properties=snapshot_properties,
remote_path=self._get_relative_file_path_for_remote_file(file),
snapshot_properties=str(snapshot_properties),
)

def __get_md5_of_remote_file(self, relative_file_path: str) -> bytearray:
for file in self.remote_files:
if file["name"] == relative_file_path:
return file["content_settings"]["content_md5"]
raise FileNotFoundError

def __get_md5_of_local_file(self, relative_file_path: str) -> bytes:
file = Path(self.local_dir.root_dir, relative_file_path)
return md5(file.read_bytes()).digest()

def __get_file_sync_state(self, relative_file_path: str) -> FileSyncState:
try:
remote_md5 = self.__get_md5_of_remote_file(relative_file_path)
except FileNotFoundError:
self._log_debug(
"Local file is not available remotely.",
local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
unavailable_remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
)
return FileSyncState.NOT_IN_REMOTE

try:
local_md5 = self.__get_md5_of_local_file(relative_file_path)
except FileNotFoundError:
self._log_debug(
"Remote file is not available locally.",
remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
unavailable_local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
)
return FileSyncState.NOT_IN_LOCAL

if remote_md5 != local_md5:
self._log_debug(
"File differs locally and remotely.",
remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
remote_md5sum=remote_md5.hex(),
local_md5sum=local_md5.hex(),
)
return FileSyncState.DIFFERENT

self._log_debug(
"File is in sync.",
remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)),
local_path=str(Path(self.local_dir.root_dir, relative_file_path)),
md5sum=remote_md5.hex(),
)
return FileSyncState.IN_SYNC

def __get_relative_file_path_for_local_file(self, file: Path) -> str:
return str(file.relative_to(self.local_dir.root_dir))

def __get_relative_file_path_for_remote_file(self, file: dict) -> str:
def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str:
return file["name"]

@property
def __all_files(self) -> List[str]:
files = set()
for local_file in self.local_files:
files.add(self.__get_relative_file_path_for_local_file(local_file))
for remote_file in self.remote_files:
files.add(self.__get_relative_file_path_for_remote_file(remote_file))
return sorted(files)

def __invalidate_remote_files_cache(self) -> None:
self.__remote_files_cache = None

def _prepare_log(self, message: str, **kwargs: str) -> str:
log = "[{} :: {}]\t{}".format(self.__client.container_name, str(self.remote_dir.working_dir), message)
return super()._prepare_log(log, **kwargs)
Loading

0 comments on commit 6f506d1

Please sign in to comment.