diff --git a/.env-devel b/.env-devel index 3b9979834bb..27eeadc24ab 100644 --- a/.env-devel +++ b/.env-devel @@ -32,6 +32,8 @@ AUTOSCALING_NODES_MONITORING=null AUTOSCALING_POLL_INTERVAL=10 AUTOSCALING_SSM_ACCESS=null +AWS_S3_CLI_S3='{"S3_ACCESS_KEY":"12345678", "S3_BUCKET_NAME":"simcore", "S3_ENDPOINT": "http://172.17.0.1:9001", "S3_SECRET_KEY": "12345678", "S3_REGION": "us-east-1"}' + CATALOG_BACKGROUND_TASK_REST_TIME=60 CATALOG_DEV_FEATURES_ENABLED=0 CATALOG_HOST=catalog diff --git a/packages/settings-library/src/settings_library/aws_s3_cli.py b/packages/settings-library/src/settings_library/aws_s3_cli.py new file mode 100644 index 00000000000..7fd3b271b10 --- /dev/null +++ b/packages/settings-library/src/settings_library/aws_s3_cli.py @@ -0,0 +1,11 @@ +from pydantic import Field + +from .base import BaseCustomSettings +from .s3 import S3Settings + + +class AwsS3CliSettings(BaseCustomSettings): + AWS_S3_CLI_S3: S3Settings = Field( + default=None, + description="These settings intentionally do not use auto_default_from_env=True because we might want to turn them off if RClone is enabled.", + ) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py b/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py index 398fcdcb6a1..7b8b810ba38 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py @@ -10,6 +10,7 @@ from servicelib.archiving_utils import unarchive_dir from servicelib.logging_utils import log_context from servicelib.progress_bar import ProgressBarData +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings from ..node_ports_common import filemanager @@ -41,6 +42,7 @@ async def _push_directory( r_clone_settings: RCloneSettings, exclude_patterns: set[str] | None = None, progress_bar: ProgressBarData, + aws_s3_cli_settings: AwsS3CliSettings | None, ) -> None: s3_object = __create_s3_object_key(project_id, node_uuid, source_path) with log_context( @@ -56,6 +58,7 @@ async def _push_directory( io_log_redirect_cb=io_log_redirect_cb, progress_bar=progress_bar, exclude_patterns=exclude_patterns, + aws_s3_cli_settings=aws_s3_cli_settings, ) @@ -68,6 +71,7 @@ async def _pull_directory( io_log_redirect_cb: LogRedirectCB, r_clone_settings: RCloneSettings, progress_bar: ProgressBarData, + aws_s3_cli_settings: AwsS3CliSettings | None, save_to: Path | None = None, ) -> None: save_to_path = destination_path if save_to is None else save_to @@ -84,6 +88,7 @@ async def _pull_directory( io_log_redirect_cb=io_log_redirect_cb, r_clone_settings=r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=aws_s3_cli_settings, ) @@ -116,6 +121,7 @@ async def _pull_legacy_archive( io_log_redirect_cb=io_log_redirect_cb, r_clone_settings=None, progress_bar=sub_prog, + aws_s3_cli_settings=None, ) _logger.info("completed pull of %s.", destination_path) @@ -195,6 +201,7 @@ async def push( r_clone_settings: RCloneSettings, exclude_patterns: set[str] | None = None, progress_bar: ProgressBarData, + aws_s3_cli_settings: AwsS3CliSettings | None, ) -> None: """pushes and removes the legacy archive if present""" @@ -207,6 +214,7 @@ async def push( exclude_patterns=exclude_patterns, io_log_redirect_cb=io_log_redirect_cb, progress_bar=progress_bar, + aws_s3_cli_settings=aws_s3_cli_settings, ) archive_exists = await _state_metadata_entry_exists( user_id=user_id, @@ -236,6 +244,7 @@ async def pull( io_log_redirect_cb: LogRedirectCB, r_clone_settings: RCloneSettings, progress_bar: ProgressBarData, + aws_s3_cli_settings: AwsS3CliSettings | None, ) -> None: """restores the state folder""" @@ -274,6 +283,7 @@ async def pull( io_log_redirect_cb=io_log_redirect_cb, r_clone_settings=r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=aws_s3_cli_settings, ) return diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/_utils.py new file mode 100644 index 00000000000..56920f978f4 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/_utils.py @@ -0,0 +1,7 @@ +from abc import abstractmethod + + +class BaseLogParser: + @abstractmethod + async def __call__(self, logs: str) -> None: + ... diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/aws_s3_cli.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/aws_s3_cli.py new file mode 100644 index 00000000000..72ed5b97279 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/aws_s3_cli.py @@ -0,0 +1,345 @@ +import asyncio +import contextlib +import logging +import os +import shlex +from asyncio.streams import StreamReader +from pathlib import Path + +from aiocache import cached # type: ignore[import-untyped] +from models_library.basic_types import IDStr +from pydantic import AnyUrl, ByteSize +from pydantic.errors import PydanticErrorMixin +from servicelib.progress_bar import ProgressBarData +from servicelib.utils import logged_gather +from settings_library.aws_s3_cli import AwsS3CliSettings + +from ._utils import BaseLogParser +from .aws_s3_cli_utils import SyncAwsCliS3ProgressLogParser +from .r_clone_utils import CommandResultCaptureParser, DebugLogParser + +_logger = logging.getLogger(__name__) + + +_OSPARC_SYMLINK_EXTENSION = ".rclonelink" # named `rclonelink` to maintain backwards + + +class BaseAwsS3CliError(PydanticErrorMixin, RuntimeError): + ... + + +class AwsS3CliFailedError(BaseAwsS3CliError): + msg_template: str = ( + "Command {command} finished with exit code={returncode}:\n{command_output}" + ) + + +class AwsS3CliPathIsAFileError(BaseAwsS3CliError): + msg_template: str = ( + "Provided path '{local_directory_path}' is a file. Expects a directory!" + ) + + +class CRLFStreamReaderWrapper: + """ + A wrapper for asyncio streams that converts carriage return characters to newlines. + + When the AWS S3 CLI provides progress updates, it uses carriage return ('\r') characters + to overwrite the output. This wrapper converts '\r' to '\n' to standardize line endings, + allowing the stream to be read line by line using newlines as delimiters. + """ + + def __init__(self, reader): + self.reader = reader + self.buffer = bytearray() + + async def readline(self): + while True: + # Check if there's a newline character in the buffer + if b"\n" in self.buffer: + line, self.buffer = self.buffer.split(b"\n", 1) + return line + b"\n" + # Read a chunk of data from the stream + chunk = await self.reader.read(1024) + if not chunk: + # If no more data is available, return the buffer as the final line + line = self.buffer + self.buffer = bytearray() + return line + # Replace \r with \n in the chunk + chunk = chunk.replace(b"\r", b"\n") + self.buffer.extend(chunk) + + +async def _read_stream( + stream: StreamReader, aws_s3_cli_log_parsers: list[BaseLogParser] +): + reader_wrapper = CRLFStreamReaderWrapper(stream) + while True: + line: bytes = await reader_wrapper.readline() + if line: + decoded_line = line.decode() + await logged_gather( + *[parser(decoded_line) for parser in aws_s3_cli_log_parsers] + ) + else: + break + + +@cached() +async def is_aws_s3_cli_available(aws_s3_cli_settings: AwsS3CliSettings | None) -> bool: + """returns: True if the `aws` cli is installed and a configuration is provided""" + if aws_s3_cli_settings is None: + return False + try: + await _async_aws_cli_command( + "aws", "--version", aws_s3_cli_settings=aws_s3_cli_settings + ) + return True + except AwsS3CliFailedError: + return False + + +async def _async_aws_cli_command( + *cmd: str, + aws_s3_cli_settings: AwsS3CliSettings, + aws_cli_s3_log_parsers: list[BaseLogParser] | None = None, +) -> str: + str_cmd = " ".join(cmd) + proc = await asyncio.create_subprocess_shell( + str_cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + env={ + "AWS_ACCESS_KEY_ID": aws_s3_cli_settings.AWS_S3_CLI_S3.S3_ACCESS_KEY, + "AWS_SECRET_ACCESS_KEY": aws_s3_cli_settings.AWS_S3_CLI_S3.S3_SECRET_KEY, + "AWS_REGION": aws_s3_cli_settings.AWS_S3_CLI_S3.S3_REGION, + }, + ) + + command_result_parser = CommandResultCaptureParser() + aws_cli_s3_log_parsers = ( + [*aws_cli_s3_log_parsers, command_result_parser] + if aws_cli_s3_log_parsers + else [command_result_parser] + ) + + assert proc.stdout # nosec + await asyncio.wait([_read_stream(proc.stdout, [*aws_cli_s3_log_parsers])]) + + _stdout, _stderr = await proc.communicate() + + command_output = command_result_parser.get_output() + if proc.returncode != 0: + raise AwsS3CliFailedError( + command=str_cmd, + command_output=command_output, + returncode=proc.returncode, + ) + + _logger.debug("'%s' result:\n%s", str_cmd, command_output) + return command_output + + +def _get_exclude_filters(exclude_patterns: set[str] | None) -> list[str]: + if exclude_patterns is None: + return [] + + exclude_options: list[str] = [] + for entry in exclude_patterns: + exclude_options.append("--exclude") + exclude_options.append(entry.replace("*", "**")) + + return exclude_options + + +async def _get_s3_folder_size( + aws_s3_cli_settings: AwsS3CliSettings, + *, + s3_path: str, +) -> ByteSize: + cli_command = [ + "aws", + "s3", + "ls", + "--summarize", + "--recursive", + s3_path, + "| grep 'Total Size' | awk '{print $3}'", + ] + + if aws_s3_cli_settings.AWS_S3_CLI_S3.S3_ENDPOINT: + cli_command.insert( + 1, f"--endpoint-url {aws_s3_cli_settings.AWS_S3_CLI_S3.S3_ENDPOINT}" + ) + + result = await _async_aws_cli_command( + *cli_command, aws_s3_cli_settings=aws_s3_cli_settings + ) + return ByteSize(result.strip()) + + +def _get_file_size_and_manage_symlink(path: Path) -> ByteSize: + if path.is_symlink(): + # Convert symlink to a .rclonelink file that can be stored in the S3 + target_path = f"{path.readlink()}" + _name = path.name + _OSPARC_SYMLINK_EXTENSION + + textfile_path = path.parent / _name + textfile_path.write_text(target_path) + return ByteSize(textfile_path.stat().st_size) + return ByteSize(path.stat().st_size) + + +async def _get_local_folder_size_and_manage_symlink(local_path: Path) -> ByteSize: + total_size = 0 + for dirpath, _, filenames in os.walk(local_path): + for filename in filenames: + file_path = Path(dirpath) / filename + total_size += _get_file_size_and_manage_symlink(Path(file_path)) + return ByteSize(total_size) + + +async def _sync_sources( + aws_s3_cli_settings: AwsS3CliSettings, + progress_bar: ProgressBarData, + *, + source: str, + destination: str, + local_dir: Path, + exclude_patterns: set[str] | None, + debug_logs: bool, +) -> None: + + if source.startswith("s3://"): + folder_size: ByteSize = await _get_s3_folder_size( + aws_s3_cli_settings, s3_path=shlex.quote(source) + ) + else: + folder_size = await _get_local_folder_size_and_manage_symlink(Path(source)) + + cli_command = [ + "aws", + "s3", + "sync", + "--delete", + shlex.quote(source), + shlex.quote(destination), + # filter options + *_get_exclude_filters(exclude_patterns), + "--no-follow-symlinks", + ] + + if aws_s3_cli_settings.AWS_S3_CLI_S3.S3_ENDPOINT: + cli_command.insert( + 1, f"--endpoint-url {aws_s3_cli_settings.AWS_S3_CLI_S3.S3_ENDPOINT}" + ) + + async with progress_bar.sub_progress( + steps=folder_size, + progress_unit="Byte", + description=IDStr(f"transferring {local_dir.name}"), + ) as sub_progress: + aws_s3_cli_log_parsers: list[BaseLogParser] = ( + [DebugLogParser()] if debug_logs else [] + ) + aws_s3_cli_log_parsers.append(SyncAwsCliS3ProgressLogParser(sub_progress)) + + await _async_aws_cli_command( + *cli_command, + aws_s3_cli_settings=aws_s3_cli_settings, + aws_cli_s3_log_parsers=aws_s3_cli_log_parsers, + ) + + +def _raise_if_directory_is_file(local_directory_path: Path) -> None: + if local_directory_path.exists() and local_directory_path.is_file(): + raise AwsS3CliPathIsAFileError(local_directory_path=local_directory_path) + + +@contextlib.asynccontextmanager +async def remove_local_osparclinks(local_directory_path): + try: + yield + finally: + # Remove the temporary created .rclonelink files generated by `_get_local_folder_size_and_manage_symlink` + for textfile_path in local_directory_path.rglob( + f"*{_OSPARC_SYMLINK_EXTENSION}" + ): + textfile_path.unlink() + + +@contextlib.asynccontextmanager +async def convert_osparclinks_to_original_symlinks(local_directory_path): + try: + yield + finally: + # Convert .rclonelink files to real symlink files after they were downloaded from S3 + for textfile_path in local_directory_path.rglob( + f"*{_OSPARC_SYMLINK_EXTENSION}" + ): + symlink_path = textfile_path.with_suffix("") + target_path = textfile_path.read_text().strip() + os.symlink(target_path, symlink_path) + textfile_path.unlink() + + +async def sync_local_to_s3( + aws_s3_cli_settings: AwsS3CliSettings, + progress_bar: ProgressBarData, + *, + local_directory_path: Path, + upload_s3_link: AnyUrl, + exclude_patterns: set[str] | None = None, + debug_logs: bool = False, +) -> None: + """transfer the contents of a local directory to an s3 path + + :raises e: AwsS3CliFailedError + """ + _raise_if_directory_is_file(local_directory_path) + + upload_s3_path = upload_s3_link + _logger.debug(" %s; %s", f"{upload_s3_link=}", f"{upload_s3_path=}") + + async with remove_local_osparclinks(local_directory_path): + await _sync_sources( + aws_s3_cli_settings, + progress_bar, + source=f"{local_directory_path}", + destination=f"{upload_s3_path}", + local_dir=local_directory_path, + exclude_patterns=exclude_patterns, + debug_logs=debug_logs, + ) + + +async def sync_s3_to_local( + aws_s3_cli_settings: AwsS3CliSettings, + progress_bar: ProgressBarData, + *, + local_directory_path: Path, + download_s3_link: AnyUrl, + exclude_patterns: set[str] | None = None, + debug_logs: bool = False, +) -> None: + """transfer the contents of a path in s3 to a local directory + + :raises e: AwsS3CliFailedError + """ + _raise_if_directory_is_file(local_directory_path) + + download_s3_path = download_s3_link + _logger.debug(" %s; %s", f"{download_s3_link=}", f"{download_s3_path=}") + + async with convert_osparclinks_to_original_symlinks(local_directory_path): + await _sync_sources( + aws_s3_cli_settings, + progress_bar, + source=f"{download_s3_path}", + destination=f"{local_directory_path}", + local_dir=local_directory_path, + exclude_patterns=exclude_patterns, + debug_logs=debug_logs, + ) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/aws_s3_cli_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/aws_s3_cli_utils.py new file mode 100644 index 00000000000..5cfbb536583 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/aws_s3_cli_utils.py @@ -0,0 +1,39 @@ +import logging +import re + +from pydantic import ByteSize, parse_obj_as +from servicelib.logging_utils import log_catch +from servicelib.progress_bar import ProgressBarData + +from ._utils import BaseLogParser + +_logger = logging.getLogger(__name__) + + +def _parse_size(log_string): + match = re.search(r"^\w+ (?P[^\/]+)", log_string) + if match: + return match.group("size") + return None + + +class SyncAwsCliS3ProgressLogParser(BaseLogParser): + """ + log processor that onlyyields progress updates detected in the logs. + + + This command: + aws --endpoint-url ENDPOINT_URL s3 sync s3://BUCKET/S3_KEY . --delete --no-follow-symlinks + generates this log lines: + Completed 2.9 GiB/4.9 GiB (102.8 MiB/s) with 1 file(s) remaining + """ + + def __init__(self, progress_bar: ProgressBarData) -> None: + self.progress_bar = progress_bar + + async def __call__(self, logs: str) -> None: + _logger.debug("received logs: %s", logs) + with log_catch(_logger, reraise=False): + if _size := _parse_size(logs): + _bytes = parse_obj_as(ByteSize, _size) + await self.progress_bar.set_(_bytes) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py index 5581f801c3f..6a5609c7eb5 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py @@ -18,6 +18,7 @@ from pydantic import AnyUrl, ByteSize, parse_obj_as from servicelib.file_utils import create_sha256_checksum from servicelib.progress_bar import ProgressBarData +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.node_ports import NodePortsSettings from settings_library.r_clone import RCloneSettings from tenacity import AsyncRetrying @@ -29,7 +30,7 @@ from yarl import URL from ..node_ports_common.client_session_manager import ClientSessionContextManager -from . import exceptions, r_clone, storage_client +from . import aws_s3_cli, exceptions, r_clone, storage_client from ._filemanager import _abort_upload, _complete_upload, _resolve_location_id from .file_io_utils import ( LogRedirectCB, @@ -123,6 +124,7 @@ async def download_path_from_s3( client_session: ClientSession | None = None, r_clone_settings: RCloneSettings | None, progress_bar: ProgressBarData, + aws_s3_cli_settings: AwsS3CliSettings | None, ) -> Path: """Downloads a file from S3 @@ -150,11 +152,20 @@ async def download_path_from_s3( client_session=session, ) - if file_meta_data.is_directory and not await r_clone.is_r_clone_available( - r_clone_settings + if ( + file_meta_data.is_directory + and not aws_s3_cli_settings + and not await r_clone.is_r_clone_available(r_clone_settings) ): msg = f"Requested to download directory {s3_object}, but no rclone support was detected" raise exceptions.NodeportsException(msg) + if ( + file_meta_data.is_directory + and aws_s3_cli_settings + and not await aws_s3_cli.is_aws_s3_cli_available(aws_s3_cli_settings) + ): + msg = f"Requested to download directory {s3_object}, but no aws cli support was detected" + raise exceptions.NodeportsException(msg) # get the s3 link download_link = await get_download_link_from_s3( @@ -173,13 +184,23 @@ async def download_path_from_s3( raise exceptions.S3InvalidPathError(s3_object) if file_meta_data.is_directory: - assert r_clone_settings # nosec - await r_clone.sync_s3_to_local( - r_clone_settings, - progress_bar, - local_directory_path=local_path, - download_s3_link=parse_obj_as(AnyUrl, f"{download_link}"), - ) + if aws_s3_cli_settings: + await aws_s3_cli.sync_s3_to_local( + aws_s3_cli_settings, + progress_bar, + local_directory_path=local_path, + download_s3_link=parse_obj_as(AnyUrl, f"{download_link}"), + ) + elif r_clone_settings: + await r_clone.sync_s3_to_local( + r_clone_settings, + progress_bar, + local_directory_path=local_path, + download_s3_link=parse_obj_as(AnyUrl, f"{download_link}"), + ) + else: + msg = "Unexpected configuration" + raise RuntimeError(msg) return local_path return await download_file_from_link( @@ -266,7 +287,7 @@ async def _generate_checksum( return checksum -async def upload_path( +async def upload_path( # pylint: disable=too-many-arguments *, user_id: UserID, store_id: LocationID | None, @@ -278,6 +299,7 @@ async def upload_path( r_clone_settings: RCloneSettings | None = None, progress_bar: ProgressBarData | None = None, exclude_patterns: set[str] | None = None, + aws_s3_cli_settings: AwsS3CliSettings | None = None, ) -> UploadedFile | UploadedFolder: """Uploads a file (potentially in parallel) or a file object (sequential in any case) to S3 @@ -310,11 +332,12 @@ async def upload_path( r_clone_settings=r_clone_settings, progress_bar=progress_bar, exclude_patterns=exclude_patterns, + aws_s3_cli_settings=aws_s3_cli_settings, ) return result -async def _upload_path( +async def _upload_path( # pylint: disable=too-many-arguments *, user_id: UserID, store_id: LocationID | None, @@ -326,6 +349,7 @@ async def _upload_path( r_clone_settings: RCloneSettings | None, progress_bar: ProgressBarData | None, exclude_patterns: set[str] | None, + aws_s3_cli_settings: AwsS3CliSettings | None, ) -> UploadedFile | UploadedFolder: _logger.debug( "Uploading %s to %s:%s@%s", @@ -339,9 +363,21 @@ async def _upload_path( progress_bar = ProgressBarData(num_steps=1, description=IDStr("uploading")) is_directory: bool = isinstance(path_to_upload, Path) and path_to_upload.is_dir() - if is_directory and not await r_clone.is_r_clone_available(r_clone_settings): + if ( + is_directory + and not aws_s3_cli_settings + and not await r_clone.is_r_clone_available(r_clone_settings) + ): msg = f"Requested to upload directory {path_to_upload}, but no rclone support was detected" raise exceptions.NodeportsException(msg) + if ( + is_directory + and aws_s3_cli_settings + and not await aws_s3_cli.is_aws_s3_cli_available(aws_s3_cli_settings) + ): + msg = f"Requested to upload directory {path_to_upload}, but no aws cli support was detected" + raise exceptions.NodeportsException(msg) + checksum: SHA256Str | None = await _generate_checksum(path_to_upload, is_directory) if io_log_redirect_cb: await io_log_redirect_cb(f"uploading {path_to_upload}, please wait...") @@ -376,8 +412,13 @@ async def _upload_path( is_directory=is_directory, session=session, exclude_patterns=exclude_patterns, + aws_s3_cli_settings=aws_s3_cli_settings, ) - except (r_clone.RCloneFailedError, exceptions.S3TransferError) as exc: + except ( + r_clone.RCloneFailedError, + aws_s3_cli.AwsS3CliFailedError, + exceptions.S3TransferError, + ) as exc: _logger.exception("The upload failed with an unexpected error:") if upload_links: await _abort_upload( @@ -405,19 +446,31 @@ async def _upload_to_s3( is_directory: bool, session: ClientSession, exclude_patterns: set[str] | None, + aws_s3_cli_settings: AwsS3CliSettings | None, ) -> tuple[ETag | None, FileUploadSchema]: uploaded_parts: list[UploadedPart] = [] if is_directory: - assert r_clone_settings # nosec assert isinstance(path_to_upload, Path) # nosec assert len(upload_links.urls) > 0 # nosec - await r_clone.sync_local_to_s3( - r_clone_settings, - progress_bar, - local_directory_path=path_to_upload, - upload_s3_link=upload_links.urls[0], - exclude_patterns=exclude_patterns, - ) + if aws_s3_cli_settings: + await aws_s3_cli.sync_local_to_s3( + aws_s3_cli_settings, + progress_bar, + local_directory_path=path_to_upload, + upload_s3_link=upload_links.urls[0], + exclude_patterns=exclude_patterns, + ) + elif r_clone_settings: + await r_clone.sync_local_to_s3( + r_clone_settings, + progress_bar, + local_directory_path=path_to_upload, + upload_s3_link=upload_links.urls[0], + exclude_patterns=exclude_patterns, + ) + else: + msg = "Unexpected configuration" + raise RuntimeError(msg) else: uploaded_parts = await upload_file_to_presigned_links( session, diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py index 5fe5df2c4b6..b9bc3d8437a 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py @@ -18,8 +18,8 @@ from settings_library.r_clone import RCloneSettings from settings_library.utils_r_clone import get_r_clone_config +from ._utils import BaseLogParser from .r_clone_utils import ( - BaseRCloneLogParser, CommandResultCaptureParser, DebugLogParser, SyncProgressLogParser, @@ -56,9 +56,7 @@ async def _config_file(config: str) -> AsyncIterator[str]: yield f.name -async def _read_stream( - stream: StreamReader, r_clone_log_parsers: list[BaseRCloneLogParser] -): +async def _read_stream(stream: StreamReader, r_clone_log_parsers: list[BaseLogParser]): while True: line: bytes = await stream.readline() if line: @@ -72,7 +70,7 @@ async def _read_stream( async def _async_r_clone_command( *cmd: str, - r_clone_log_parsers: list[BaseRCloneLogParser] | None = None, + r_clone_log_parsers: list[BaseLogParser] | None = None, cwd: str | None = None, ) -> str: str_cmd = " ".join(cmd) @@ -227,7 +225,7 @@ async def _sync_sources( progress_unit="Byte", description=IDStr(f"transferring {local_dir.name}"), ) as sub_progress: - r_clone_log_parsers: list[BaseRCloneLogParser] = ( + r_clone_log_parsers: list[BaseLogParser] = ( [DebugLogParser()] if debug_logs else [] ) r_clone_log_parsers.append(SyncProgressLogParser(sub_progress)) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_utils.py index ed64f74137f..f539e451026 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_utils.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_utils.py @@ -1,6 +1,5 @@ import datetime import logging -from abc import abstractmethod from typing import Union from models_library.utils.change_case import snake_to_camel @@ -8,13 +7,9 @@ from servicelib.logging_utils import log_catch from servicelib.progress_bar import ProgressBarData -_logger = logging.getLogger(__name__) - +from ._utils import BaseLogParser -class BaseRCloneLogParser: - @abstractmethod - async def __call__(self, logs: str) -> None: - ... +_logger = logging.getLogger(__name__) class _RCloneSyncMessageBase(BaseModel): @@ -52,7 +47,7 @@ class _RCloneSyncTransferringMessage(_RCloneSyncMessageBase): ] -class SyncProgressLogParser(BaseRCloneLogParser): +class SyncProgressLogParser(BaseLogParser): """ log processor that only yields and progress updates detected in the logs. @@ -91,12 +86,12 @@ async def __call__(self, logs: str) -> None: await self.progress_bar.set_(rclone_message.stats.bytes) -class DebugLogParser(BaseRCloneLogParser): +class DebugLogParser(BaseLogParser): async def __call__(self, logs: str) -> None: _logger.debug("|>>>| %s |", logs) -class CommandResultCaptureParser(BaseRCloneLogParser): +class CommandResultCaptureParser(BaseLogParser): def __init__(self) -> None: super().__init__() self._logs: list[str] = [] diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py index a9d1c861445..d08f095b111 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py @@ -4,6 +4,7 @@ from models_library.projects import ProjectIDStr from models_library.projects_nodes_io import NodeIDStr from models_library.users import UserID +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings from ..node_ports_common import exceptions @@ -23,7 +24,8 @@ async def ports( *, db_manager: DBManager | None = None, r_clone_settings: RCloneSettings | None = None, - io_log_redirect_cb: LogRedirectCB | None = None + io_log_redirect_cb: LogRedirectCB | None = None, + aws_s3_cli_settings: AwsS3CliSettings | None = None ) -> Nodeports: log.debug("creating node_ports_v2 object using provided dbmanager: %s", db_manager) # FIXME: warning every dbmanager create a new db engine! @@ -39,6 +41,7 @@ async def ports( auto_update=True, r_clone_settings=r_clone_settings, io_log_redirect_cb=io_log_redirect_cb, + aws_s3_cli_settings=aws_s3_cli_settings, ) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index 8418a006b42..c7dbcf41233 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -11,6 +11,7 @@ from pydantic.error_wrappers import flatten_errors from servicelib.progress_bar import ProgressBarData from servicelib.utils import logged_gather +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings from ..node_ports_common.dbmanager import DBManager @@ -43,6 +44,7 @@ class Nodeports(BaseModel): auto_update: bool = False r_clone_settings: RCloneSettings | None = None io_log_redirect_cb: LogRedirectCB | None + aws_s3_cli_settings: AwsS3CliSettings | None = None class Config: arbitrary_types_allowed = True diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py index e78b5a6581f..aac381ba9e2 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py @@ -264,6 +264,7 @@ async def _evaluate() -> ItemConcreteValue | None: io_log_redirect_cb=self._node_ports.io_log_redirect_cb, r_clone_settings=self._node_ports.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=self._node_ports.aws_s3_cli_settings, ) elif isinstance(self.value, DownloadLink): @@ -339,6 +340,7 @@ async def _set( io_log_redirect_cb=self._node_ports.io_log_redirect_cb, file_base_path=base_path, progress_bar=progress_bar, + aws_s3_cli_settings=self._node_ports.aws_s3_cli_settings, ) else: new_value = converted_value diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py index 4bc977b46f6..eb45483f98c 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py @@ -11,6 +11,7 @@ from pydantic import AnyUrl, ByteSize from pydantic.tools import parse_obj_as from servicelib.progress_bar import ProgressBarData +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings from yarl import URL @@ -189,6 +190,7 @@ async def pull_file_from_store( io_log_redirect_cb: LogRedirectCB | None, r_clone_settings: RCloneSettings | None, progress_bar: ProgressBarData | None, + aws_s3_cli_settings: AwsS3CliSettings | None, ) -> Path: log.debug("pulling file from storage %s", value) # do not make any assumption about s3_path, it is a str containing stuff that can be anything depending on the store @@ -203,6 +205,7 @@ async def pull_file_from_store( r_clone_settings=r_clone_settings, progress_bar=progress_bar or ProgressBarData(num_steps=1, description=IDStr("pulling file")), + aws_s3_cli_settings=aws_s3_cli_settings, ) # if a file alias is present use it to rename the file accordingly if file_to_key_map: @@ -226,6 +229,7 @@ async def push_file_to_store( r_clone_settings: RCloneSettings | None = None, file_base_path: Path | None = None, progress_bar: ProgressBarData, + aws_s3_cli_settings: AwsS3CliSettings | None = None, ) -> FileLink: """ :raises exceptions.NodeportsException @@ -248,6 +252,7 @@ async def push_file_to_store( r_clone_settings=r_clone_settings, io_log_redirect_cb=io_log_redirect_cb, progress_bar=progress_bar, + aws_s3_cli_settings=aws_s3_cli_settings, ) assert isinstance(upload_result, UploadedFile) # nosec log.debug("file path %s uploaded, received ETag %s", file, upload_result.etag) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/serialization_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/serialization_v2.py index 3016f17ae2e..daa4c9aaa3e 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/serialization_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/serialization_v2.py @@ -9,6 +9,7 @@ from models_library.utils.json_serialization import json_dumps from models_library.utils.nodes import compute_node_hash from packaging import version +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings from ..node_ports_common.dbmanager import DBManager @@ -38,6 +39,7 @@ async def load( io_log_redirect_cb: LogRedirectCB | None, auto_update: bool = False, r_clone_settings: RCloneSettings | None = None, + aws_s3_cli_settings: AwsS3CliSettings | None = None, ) -> Nodeports: """creates a nodeport object from a row from comp_tasks""" log.debug( @@ -99,6 +101,7 @@ async def load( auto_update=auto_update, r_clone_settings=r_clone_settings, io_log_redirect_cb=io_log_redirect_cb, + aws_s3_cli_settings=aws_s3_cli_settings, ) log.debug( "created node_ports_v2 object %s", diff --git a/packages/simcore-sdk/tests/integration/conftest.py b/packages/simcore-sdk/tests/integration/conftest.py index 8ced749bcf3..d5f6cd7227a 100644 --- a/packages/simcore-sdk/tests/integration/conftest.py +++ b/packages/simcore-sdk/tests/integration/conftest.py @@ -19,6 +19,7 @@ from models_library.users import UserID from pydantic import parse_obj_as from pytest_simcore.helpers.faker_factories import random_project, random_user +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings, S3Provider from settings_library.s3 import S3Settings from simcore_postgres_database.models.comp_pipeline import comp_pipeline @@ -26,6 +27,7 @@ from simcore_postgres_database.models.file_meta_data import file_meta_data from simcore_postgres_database.models.projects import projects from simcore_postgres_database.models.users import users +from simcore_sdk.node_ports_common.aws_s3_cli import is_aws_s3_cli_available from simcore_sdk.node_ports_common.r_clone import is_r_clone_available from yarl import URL @@ -348,6 +350,20 @@ async def _factory() -> RCloneSettings: return _factory() +@pytest.fixture +async def aws_s3_cli_settings_factory( + minio_s3_settings: S3Settings, storage_service: URL +) -> Awaitable[AwsS3CliSettings]: + async def _factory() -> AwsS3CliSettings: + settings = AwsS3CliSettings(AWS_S3_CLI_S3=minio_s3_settings) + if not await is_aws_s3_cli_available(settings): + pytest.skip("aws cli not installed") + + return settings + + return _factory() + + @pytest.fixture async def r_clone_settings( r_clone_settings_factory: Awaitable[RCloneSettings], @@ -355,6 +371,13 @@ async def r_clone_settings( return await r_clone_settings_factory +@pytest.fixture +async def aws_s3_cli_settings( + aws_s3_cli_settings_factory: Awaitable[AwsS3CliSettings], +) -> AwsS3CliSettings: + return await aws_s3_cli_settings_factory + + @pytest.fixture def cleanup_file_meta_data(postgres_db: sa.engine.Engine) -> Iterator[None]: yield None diff --git a/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py b/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py index 468189a85eb..ca7a81e6c17 100644 --- a/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py +++ b/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py @@ -19,6 +19,7 @@ from models_library.users import UserID from pydantic import parse_obj_as from servicelib.progress_bar import ProgressBarData +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings from simcore_sdk.node_data import data_manager from simcore_sdk.node_ports_common import filemanager @@ -152,6 +153,7 @@ async def test_valid_upload_download( project_id: ProjectID, node_uuid: NodeID, r_clone_settings: RCloneSettings, + aws_s3_cli_settings: AwsS3CliSettings, mock_io_log_redirect_cb: LogRedirectCB, faker: Faker, ): @@ -164,6 +166,7 @@ async def test_valid_upload_download( io_log_redirect_cb=mock_io_log_redirect_cb, progress_bar=progress_bar, r_clone_settings=r_clone_settings, + aws_s3_cli_settings=None, ) assert progress_bar._current_steps == pytest.approx(1.0) # noqa: SLF001 @@ -179,6 +182,7 @@ async def test_valid_upload_download( io_log_redirect_cb=mock_io_log_redirect_cb, r_clone_settings=r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=None, ) assert progress_bar._current_steps == pytest.approx(2.0) # noqa: SLF001 @@ -195,6 +199,7 @@ async def test_valid_upload_download_saved_to( node_uuid: NodeID, random_tmp_dir_generator: Callable, r_clone_settings: RCloneSettings, + aws_s3_cli_settings: AwsS3CliSettings, mock_io_log_redirect_cb: LogRedirectCB, faker: Faker, ): @@ -207,6 +212,7 @@ async def test_valid_upload_download_saved_to( io_log_redirect_cb=mock_io_log_redirect_cb, progress_bar=progress_bar, r_clone_settings=r_clone_settings, + aws_s3_cli_settings=None, ) # pylint: disable=protected-access assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 @@ -226,6 +232,7 @@ async def test_valid_upload_download_saved_to( io_log_redirect_cb=mock_io_log_redirect_cb, r_clone_settings=r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=None, ) assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001 diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_common_aws_s3_cli.py b/packages/simcore-sdk/tests/integration/test_node_ports_common_aws_s3_cli.py new file mode 100644 index 00000000000..e5e77c29475 --- /dev/null +++ b/packages/simcore-sdk/tests/integration/test_node_ports_common_aws_s3_cli.py @@ -0,0 +1,435 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument + +import filecmp +import os +import urllib.parse +from collections.abc import AsyncIterator, Callable +from pathlib import Path +from typing import Final +from unittest.mock import AsyncMock +from uuid import uuid4 + +import aioboto3 +import aiofiles +import pytest +from faker import Faker +from models_library.progress_bar import ProgressReport +from pydantic import AnyUrl, ByteSize, parse_obj_as +from servicelib.file_utils import remove_directory +from servicelib.progress_bar import ProgressBarData +from servicelib.utils import logged_gather +from settings_library.aws_s3_cli import AwsS3CliSettings +from simcore_sdk.node_ports_common import aws_s3_cli + +pytest_simcore_core_services_selection = [ + "migration", + "postgres", + "storage", + "redis", +] + +pytest_simcore_ops_services_selection = [ + "minio", + "adminer", +] + + +WAIT_FOR_S3_BACKEND_TO_UPDATE: Final[float] = 1.0 + + +@pytest.fixture +async def cleanup_bucket_after_test( + aws_s3_cli_settings: AwsS3CliSettings, +) -> AsyncIterator[None]: + session = aioboto3.Session( + aws_access_key_id=aws_s3_cli_settings.AWS_S3_CLI_S3.S3_ACCESS_KEY, + aws_secret_access_key=aws_s3_cli_settings.AWS_S3_CLI_S3.S3_SECRET_KEY, + ) + + yield + + async with session.client("s3", endpoint_url=aws_s3_cli_settings.AWS_S3_CLI_S3.S3_ENDPOINT) as s3_client: # type: ignore + # List all object versions + paginator = s3_client.get_paginator("list_object_versions") + async for page in paginator.paginate( + Bucket=aws_s3_cli_settings.AWS_S3_CLI_S3.S3_BUCKET_NAME + ): + # Prepare delete markers and versions for deletion + delete_markers = page.get("DeleteMarkers", []) + versions = page.get("Versions", []) + + objects_to_delete = [ + {"Key": obj["Key"], "VersionId": obj["VersionId"]} + for obj in delete_markers + versions + ] + + # Perform deletion + if objects_to_delete: + await s3_client.delete_objects( + Bucket=aws_s3_cli_settings.AWS_S3_CLI_S3.S3_BUCKET_NAME, + Delete={"Objects": objects_to_delete, "Quiet": True}, + ) + + +# put to shared config +def _fake_s3_link(aws_s3_cli_settings: AwsS3CliSettings, s3_object: str) -> AnyUrl: + return parse_obj_as( + AnyUrl, + f"s3://{aws_s3_cli_settings.AWS_S3_CLI_S3.S3_BUCKET_NAME}/{urllib.parse.quote(s3_object)}", + ) + + +# put to shared config +async def _create_random_binary_file( + file_path: Path, + file_size: ByteSize, + # NOTE: bigger files get created faster with bigger chunk_size + chunk_size: int = parse_obj_as(ByteSize, "1mib"), +): + async with aiofiles.open(file_path, mode="wb") as file: + bytes_written = 0 + while bytes_written < file_size: + remaining_bytes = file_size - bytes_written + current_chunk_size = min(chunk_size, remaining_bytes) + await file.write(os.urandom(current_chunk_size)) + bytes_written += current_chunk_size + assert bytes_written == file_size + + +# put to shared config +async def _create_file_of_size( + tmp_path: Path, *, name: str, file_size: ByteSize +) -> Path: + file: Path = tmp_path / name + if not file.parent.exists(): + file.parent.mkdir(parents=True, exist_ok=True) + + await _create_random_binary_file(file, file_size) + assert file.exists() + assert file.stat().st_size == file_size + return file + + +# put to shared config +async def _create_files_in_dir( + target_dir: Path, file_count: int, file_size: ByteSize +) -> set[str]: + results: list[Path] = await logged_gather( + *[ + _create_file_of_size(target_dir, name=f"{i}-file.bin", file_size=file_size) + for i in range(file_count) + ], + max_concurrency=10, + ) + return {x.name for x in results} + + +async def _upload_local_dir_to_s3( + aws_s3_cli_settings: AwsS3CliSettings, + s3_directory_link: AnyUrl, + source_dir: Path, + *, + check_progress: bool = False, + faker: Faker, +) -> None: + # NOTE: progress is enforced only when uploading and only when using + # total file sizes that are quite big, otherwise the test will fail + # we ant to avoid this from being flaky. + # Since using moto to mock the S3 api, downloading is way to fast. + # Progress behaves as expected with CEPH and AWS S3 backends. + + progress_entries: list[ProgressReport] = [] + + async def _report_progress_upload(report: ProgressReport) -> None: + print(">>>|", report, "| ⏫") + progress_entries.append(report) + + async with ProgressBarData( + num_steps=1, + progress_report_cb=_report_progress_upload, + description=faker.pystr(), + ) as progress_bar: + await aws_s3_cli.sync_local_to_s3( + aws_s3_cli_settings, + progress_bar, + local_directory_path=source_dir, + upload_s3_link=s3_directory_link, + debug_logs=True, + ) + if check_progress: + # NOTE: a progress of 1 is always sent by the progress bar + # we want to check that aws cli also reports some progress entries + assert len(progress_entries) > 1 + + +async def _download_from_s3_to_local_dir( + aws_s3_cli_settings: AwsS3CliSettings, + s3_directory_link: AnyUrl, + destination_dir: Path, + faker: Faker, +) -> None: + async def _report_progress_download(report: ProgressReport) -> None: + print(">>>|", report, "| ⏬") + + async with ProgressBarData( + num_steps=1, + progress_report_cb=_report_progress_download, + description=faker.pystr(), + ) as progress_bar: + await aws_s3_cli.sync_s3_to_local( + aws_s3_cli_settings, + progress_bar, + local_directory_path=destination_dir, + download_s3_link=s3_directory_link, + debug_logs=True, + ) + + +def _directories_have_the_same_content(dir_1: Path, dir_2: Path) -> bool: + names_in_dir_1 = {x.name for x in dir_1.glob("*")} + names_in_dir_2 = {x.name for x in dir_2.glob("*")} + if names_in_dir_1 != names_in_dir_2: + return False + + filecmp.clear_cache() + + compare_results: list[bool] = [] + + for file_name in names_in_dir_1: + f1 = dir_1 / file_name + f2 = dir_2 / file_name + + # when there is a broken symlink, which we want to sync, filecmp does not work + is_broken_symlink = ( + not f1.exists() and f1.is_symlink() and not f2.exists() and f2.is_symlink() + ) + + if is_broken_symlink: + compare_results.append(True) + else: + compare_results.append(filecmp.cmp(f1, f2, shallow=False)) + + return all(compare_results) + + +def _ensure_dir(tmp_path: Path, faker: Faker, *, dir_prefix: str) -> Path: + generated_files_dir: Path = tmp_path / f"{dir_prefix}-{faker.uuid4()}" + generated_files_dir.mkdir(parents=True, exist_ok=True) + assert generated_files_dir.exists() + return generated_files_dir + + +@pytest.fixture +async def dir_locally_created_files( + tmp_path: Path, faker: Faker +) -> AsyncIterator[Path]: + path = _ensure_dir(tmp_path, faker, dir_prefix="source") + yield path + await remove_directory(path) + + +@pytest.fixture +async def dir_downloaded_files_1(tmp_path: Path, faker: Faker) -> AsyncIterator[Path]: + path = _ensure_dir(tmp_path, faker, dir_prefix="downloaded-1") + yield path + await remove_directory(path) + + +@pytest.fixture +async def dir_downloaded_files_2(tmp_path: Path, faker: Faker) -> AsyncIterator[Path]: + path = _ensure_dir(tmp_path, faker, dir_prefix="downloaded-2") + yield path + await remove_directory(path) + + +@pytest.mark.parametrize( + "file_count, file_size, check_progress", + [ + (0, parse_obj_as(ByteSize, "0"), False), + (1, parse_obj_as(ByteSize, "1mib"), False), + (2, parse_obj_as(ByteSize, "1mib"), False), + (1, parse_obj_as(ByteSize, "1Gib"), True), + (4, parse_obj_as(ByteSize, "500Mib"), True), + (100, parse_obj_as(ByteSize, "20mib"), True), + ], +) +async def test_local_to_remote_to_local( + aws_s3_cli_settings: AwsS3CliSettings, + create_valid_file_uuid: Callable[[str, Path], str], + dir_locally_created_files: Path, + dir_downloaded_files_1: Path, + file_count: int, + file_size: ByteSize, + check_progress: bool, + cleanup_bucket_after_test: None, + faker: Faker, +) -> None: + await _create_files_in_dir(dir_locally_created_files, file_count, file_size) + + # get s3 reference link + directory_uuid = create_valid_file_uuid(f"{dir_locally_created_files}", Path()) + s3_directory_link = _fake_s3_link(aws_s3_cli_settings, directory_uuid) + + # run the test + await _upload_local_dir_to_s3( + aws_s3_cli_settings, + s3_directory_link, + dir_locally_created_files, + check_progress=check_progress, + faker=faker, + ) + await _download_from_s3_to_local_dir( + aws_s3_cli_settings, s3_directory_link, dir_downloaded_files_1, faker=faker + ) + assert _directories_have_the_same_content( + dir_locally_created_files, dir_downloaded_files_1 + ) + + +def _change_content_of_one_file( + dir_locally_created_files: Path, generated_file_names: set[str] +) -> None: + a_generated_file = next(iter(generated_file_names)) + (dir_locally_created_files / a_generated_file).write_bytes(os.urandom(10)) + + +def _change_content_of_all_file( + dir_locally_created_files: Path, generated_file_names: set[str] +) -> None: + for file_name in generated_file_names: + (dir_locally_created_files / file_name).unlink() + (dir_locally_created_files / file_name).write_bytes(os.urandom(10)) + + +def _remove_one_file( + dir_locally_created_files: Path, generated_file_names: set[str] +) -> None: + a_generated_file = next(iter(generated_file_names)) + (dir_locally_created_files / a_generated_file).unlink() + + +def _rename_one_file( + dir_locally_created_files: Path, generated_file_names: set[str] +) -> None: + a_generated_file = next(iter(generated_file_names)) + (dir_locally_created_files / a_generated_file).rename( + dir_locally_created_files / f"renamed-{a_generated_file}" + ) + + +def _add_a_new_file( + dir_locally_created_files: Path, generated_file_names: set[str] +) -> None: + (dir_locally_created_files / "new_file.bin").write_bytes(os.urandom(10)) + + +def _remove_all_files( + dir_locally_created_files: Path, generated_file_names: set[str] +) -> None: + for file_name in generated_file_names: + (dir_locally_created_files / file_name).unlink() + + +def _regression_add_broken_symlink( + dir_locally_created_files: Path, generated_file_names: set[str] +) -> None: + # NOTE: if rclone tries to copy a link that does not exist an error is raised + path_does_not_exist_on_fs = Path(f"/tmp/missing-{uuid4()}") # noqa: S108 + assert not path_does_not_exist_on_fs.exists() + + broken_symlink = dir_locally_created_files / "missing.link" + assert not broken_symlink.exists() + os.symlink(f"{path_does_not_exist_on_fs}", f"{broken_symlink}") + + +@pytest.mark.parametrize( + "changes_callable", + [ + _change_content_of_one_file, + _change_content_of_all_file, + _remove_one_file, + _remove_all_files, + _rename_one_file, + _add_a_new_file, + _regression_add_broken_symlink, + ], +) +async def test_overwrite_an_existing_file_and_sync_again( + aws_s3_cli_settings: AwsS3CliSettings, + create_valid_file_uuid: Callable[[str, Path], str], + dir_locally_created_files: Path, + dir_downloaded_files_1: Path, + dir_downloaded_files_2: Path, + changes_callable: Callable[[Path, set[str]], None], + cleanup_bucket_after_test: None, + faker: Faker, +) -> None: + generated_file_names: set[str] = await _create_files_in_dir( + dir_locally_created_files, + 3, + parse_obj_as(ByteSize, "1kib"), + ) + assert len(generated_file_names) > 0 + + # get s3 reference link + directory_uuid = create_valid_file_uuid(f"{dir_locally_created_files}", Path()) + s3_directory_link = _fake_s3_link(aws_s3_cli_settings, directory_uuid) + + # sync local to remote and check + await _upload_local_dir_to_s3( + aws_s3_cli_settings, s3_directory_link, dir_locally_created_files, faker=faker + ) + await _download_from_s3_to_local_dir( + aws_s3_cli_settings, s3_directory_link, dir_downloaded_files_1, faker=faker + ) + assert _directories_have_the_same_content( + dir_locally_created_files, dir_downloaded_files_1 + ) + + # make some changes to local content + changes_callable(dir_locally_created_files, generated_file_names) + + # ensure local content changed form remote content + assert not _directories_have_the_same_content( + dir_locally_created_files, dir_downloaded_files_1 + ) + + # upload and check new local and new remote are in sync + await _upload_local_dir_to_s3( + aws_s3_cli_settings, s3_directory_link, dir_locally_created_files, faker=faker + ) + await _download_from_s3_to_local_dir( + aws_s3_cli_settings, s3_directory_link, dir_downloaded_files_2, faker=faker + ) + assert _directories_have_the_same_content( + dir_locally_created_files, dir_downloaded_files_2 + ) + # check that old remote and new remote are not the same + assert not _directories_have_the_same_content( + dir_downloaded_files_1, dir_downloaded_files_2 + ) + + +async def test_raises_error_if_local_directory_path_is_a_file( + tmp_path: Path, faker: Faker, cleanup_bucket_after_test: None +): + file_path = await _create_file_of_size( + tmp_path, name=f"test{faker.uuid4()}.bin", file_size=ByteSize(1) + ) + with pytest.raises(aws_s3_cli.AwsS3CliPathIsAFileError): + await aws_s3_cli.sync_local_to_s3( + aws_s3_cli_settings=AsyncMock(), + progress_bar=AsyncMock(), + local_directory_path=file_path, + upload_s3_link=AsyncMock(), + debug_logs=True, + ) + with pytest.raises(aws_s3_cli.AwsS3CliPathIsAFileError): + await aws_s3_cli.sync_s3_to_local( + aws_s3_cli_settings=AsyncMock(), + progress_bar=AsyncMock(), + local_directory_path=file_path, + download_s3_link=AsyncMock(), + debug_logs=True, + ) diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py b/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py index 67e4bed3c68..9cd1ce32de4 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py @@ -19,12 +19,14 @@ SimcoreS3FileID, ) from models_library.users import UserID -from pydantic import ByteSize, parse_obj_as +from pydantic import BaseModel, ByteSize, parse_obj_as from pytest_mock import MockerFixture from pytest_simcore.helpers.parametrizations import byte_size_ids from servicelib.progress_bar import ProgressBarData +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.r_clone import RCloneSettings from simcore_sdk.node_ports_common import exceptions, filemanager +from simcore_sdk.node_ports_common.aws_s3_cli import AwsS3CliFailedError from simcore_sdk.node_ports_common.filemanager import UploadedFile, UploadedFolder from simcore_sdk.node_ports_common.r_clone import RCloneFailedError from yarl import URL @@ -39,11 +41,32 @@ pytest_simcore_ops_services_selection = ["minio", "adminer"] -@pytest.fixture(params=[True, False], ids=["with RClone", "without RClone"]) -def optional_r_clone( - r_clone_settings: RCloneSettings, request: pytest.FixtureRequest -) -> RCloneSettings | None: - return r_clone_settings if request.param else None # type: ignore +class _SyncSettings(BaseModel): + r_clone_settings: RCloneSettings | None + aws_s3_cli_settings: AwsS3CliSettings | None + + +@pytest.fixture( + params=[(True, False), (False, True), (False, False)], + ids=[ + "RClone enabled", + "AwsS3Cli enabled", + "Both RClone and AwsS3Cli disabled", + ], +) +def optional_sync_settings( + r_clone_settings: RCloneSettings, + aws_s3_cli_settings: AwsS3CliSettings, + request: pytest.FixtureRequest, +) -> _SyncSettings: + _rclone_enabled, _aws_s3_cli_enabled = request.param + + _r_clone_settings = r_clone_settings if _rclone_enabled else None + _aws_s3_cli_settings = aws_s3_cli_settings if _aws_s3_cli_enabled else None + + return _SyncSettings( + r_clone_settings=_r_clone_settings, aws_s3_cli_settings=_aws_s3_cli_settings + ) def _file_size(size_str: str, **pytest_params): @@ -68,7 +91,7 @@ async def test_valid_upload_download( s3_simcore_location: LocationID, file_size: ByteSize, create_file_of_size: Callable[[ByteSize, str], Path], - optional_r_clone: RCloneSettings | None, + optional_sync_settings: _SyncSettings, simcore_services_ready: None, storage_service: URL, faker: Faker, @@ -83,9 +106,10 @@ async def test_valid_upload_download( store_name=None, s3_object=file_id, path_to_upload=file_path, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, io_log_redirect_cb=None, progress_bar=progress_bar, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) assert isinstance(upload_result, UploadedFile) store_id, e_tag = upload_result.store_id, upload_result.etag @@ -107,8 +131,9 @@ async def test_valid_upload_download( s3_object=file_id, local_path=download_folder, io_log_redirect_cb=None, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001 assert download_file_path.exists() @@ -132,7 +157,7 @@ async def test_valid_upload_download_using_file_object( s3_simcore_location: LocationID, file_size: ByteSize, create_file_of_size: Callable[[ByteSize, str], Path], - optional_r_clone: RCloneSettings | None, + optional_sync_settings: _SyncSettings, faker: Faker, ): file_path = create_file_of_size(file_size, "test.test") @@ -147,8 +172,9 @@ async def test_valid_upload_download_using_file_object( path_to_upload=filemanager.UploadableFileObject( file_object, file_path.name, file_path.stat().st_size ), - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, io_log_redirect_cb=None, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) assert isinstance(upload_result, UploadedFile) store_id, e_tag = upload_result.store_id, upload_result.etag @@ -169,8 +195,9 @@ async def test_valid_upload_download_using_file_object( s3_object=file_id, local_path=download_folder, io_log_redirect_cb=None, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 assert download_file_path.exists() @@ -190,6 +217,11 @@ def mocked_upload_file_raising_exceptions(mocker: MockerFixture) -> None: autospec=True, side_effect=ClientError, ) + mocker.patch( + "simcore_sdk.node_ports_common.filemanager.aws_s3_cli.sync_local_to_s3", + autospec=True, + side_effect=AwsS3CliFailedError, + ) @pytest.mark.parametrize( @@ -204,7 +236,7 @@ async def test_failed_upload_is_properly_removed_from_storage( create_file_of_size: Callable[[ByteSize], Path], create_valid_file_uuid: Callable[[str, Path], SimcoreS3FileID], s3_simcore_location: LocationID, - optional_r_clone: RCloneSettings | None, + optional_sync_settings: _SyncSettings, file_size: ByteSize, user_id: UserID, mocked_upload_file_raising_exceptions: None, @@ -218,8 +250,9 @@ async def test_failed_upload_is_properly_removed_from_storage( store_name=None, s3_object=file_id, path_to_upload=file_path, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, io_log_redirect_cb=None, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) with pytest.raises(exceptions.S3InvalidPathError): await filemanager.get_file_metadata( @@ -239,7 +272,7 @@ async def test_failed_upload_after_valid_upload_keeps_last_valid_state( create_file_of_size: Callable[[ByteSize], Path], create_valid_file_uuid: Callable[[str, Path], SimcoreS3FileID], s3_simcore_location: LocationID, - optional_r_clone: RCloneSettings | None, + optional_sync_settings: _SyncSettings, file_size: ByteSize, user_id: UserID, mocker: MockerFixture, @@ -253,8 +286,9 @@ async def test_failed_upload_after_valid_upload_keeps_last_valid_state( store_name=None, s3_object=file_id, path_to_upload=file_path, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, io_log_redirect_cb=None, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) assert isinstance(upload_result, UploadedFile) store_id, e_tag = upload_result.store_id, upload_result.etag @@ -284,8 +318,9 @@ async def test_failed_upload_after_valid_upload_keeps_last_valid_state( store_name=None, s3_object=file_id, path_to_upload=file_path, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, io_log_redirect_cb=None, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) # the file shall be back to its original state file_metadata = await filemanager.get_file_metadata( @@ -301,7 +336,7 @@ async def test_invalid_file_path( user_id: int, create_valid_file_uuid: Callable[[str, Path], SimcoreS3FileID], s3_simcore_location: LocationID, - optional_r_clone: RCloneSettings | None, + optional_sync_settings: _SyncSettings, faker: Faker, ): file_path = Path(tmpdir) / "test.test" @@ -332,8 +367,9 @@ async def test_invalid_file_path( s3_object=file_id, local_path=download_folder, io_log_redirect_cb=None, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) @@ -343,7 +379,7 @@ async def test_errors_upon_invalid_file_identifiers( user_id: UserID, project_id: str, s3_simcore_location: LocationID, - optional_r_clone: RCloneSettings | None, + optional_sync_settings: _SyncSettings, faker: Faker, ): file_path = Path(tmpdir) / "test.test" @@ -386,8 +422,9 @@ async def test_errors_upon_invalid_file_identifiers( s3_object=invalid_s3_path, local_path=download_folder, io_log_redirect_cb=None, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) with pytest.raises(exceptions.S3InvalidPathError): # noqa: PT012 @@ -401,8 +438,9 @@ async def test_errors_upon_invalid_file_identifiers( s3_object=SimcoreS3FileID(f"{project_id}/{uuid4()}/invisible.txt"), local_path=download_folder, io_log_redirect_cb=None, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) @@ -411,7 +449,7 @@ async def test_invalid_store( tmpdir: Path, user_id: int, create_valid_file_uuid: Callable[[str, Path], SimcoreS3FileID], - optional_r_clone: RCloneSettings | None, + optional_sync_settings: _SyncSettings, faker: Faker, ): file_path = Path(tmpdir) / "test.test" @@ -442,11 +480,29 @@ async def test_invalid_store( s3_object=file_id, local_path=download_folder, io_log_redirect_cb=None, - r_clone_settings=optional_r_clone, + r_clone_settings=optional_sync_settings.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=optional_sync_settings.aws_s3_cli_settings, ) +@pytest.fixture( + params=[True, False], + ids=["with RClone", "with AwsS3Cli"], +) +def sync_settings( + r_clone_settings: RCloneSettings, + aws_s3_cli_settings: AwsS3CliSettings, + request: pytest.FixtureRequest, +) -> _SyncSettings: + is_rclone_enabled = request.param + + return _SyncSettings( + r_clone_settings=r_clone_settings if is_rclone_enabled else None, + aws_s3_cli_settings=aws_s3_cli_settings if not is_rclone_enabled else None, + ) + + @pytest.mark.parametrize("is_directory", [False, True]) async def test_valid_metadata( node_ports_config: None, @@ -454,7 +510,7 @@ async def test_valid_metadata( user_id: int, create_valid_file_uuid: Callable[[str, Path], SimcoreS3FileID], s3_simcore_location: LocationID, - r_clone_settings: RCloneSettings, + sync_settings: _SyncSettings, is_directory: bool, ): # first we go with a non-existing file @@ -486,7 +542,8 @@ async def test_valid_metadata( s3_object=file_id, path_to_upload=path_to_upload, io_log_redirect_cb=None, - r_clone_settings=r_clone_settings, + r_clone_settings=sync_settings.r_clone_settings, + aws_s3_cli_settings=sync_settings.aws_s3_cli_settings, ) if is_directory: assert isinstance(upload_result, UploadedFolder) @@ -591,7 +648,7 @@ async def test_upload_path_source_is_a_folder( user_id: int, s3_simcore_location: LocationID, files_in_folder: int, - r_clone_settings: RCloneSettings, + sync_settings: _SyncSettings, ): source_dir = tmp_path / f"source-{faker.uuid4()}" source_dir.mkdir(parents=True, exist_ok=True) @@ -614,7 +671,8 @@ async def test_upload_path_source_is_a_folder( s3_object=s3_object, path_to_upload=source_dir, io_log_redirect_cb=None, - r_clone_settings=r_clone_settings, + r_clone_settings=sync_settings.r_clone_settings, + aws_s3_cli_settings=sync_settings.aws_s3_cli_settings, ) assert isinstance(upload_result, UploadedFolder) assert source_dir.exists() @@ -627,8 +685,9 @@ async def test_upload_path_source_is_a_folder( s3_object=s3_object, local_path=download_dir, io_log_redirect_cb=None, - r_clone_settings=r_clone_settings, + r_clone_settings=sync_settings.r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=sync_settings.aws_s3_cli_settings, ) assert download_dir.exists() diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py b/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py index 0f524741f9d..739dadfcf6a 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py @@ -1,7 +1,6 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -import asyncio import filecmp import os import re @@ -41,18 +40,37 @@ @pytest.fixture -async def cleanup_bucket_after_test(r_clone_settings: RCloneSettings) -> None: +async def cleanup_bucket_after_test( + r_clone_settings: RCloneSettings, +) -> AsyncIterator[None]: session = aioboto3.Session( aws_access_key_id=r_clone_settings.R_CLONE_S3.S3_ACCESS_KEY, aws_secret_access_key=r_clone_settings.R_CLONE_S3.S3_SECRET_KEY, ) - async with session.resource( - "s3", - endpoint_url=r_clone_settings.R_CLONE_S3.S3_ENDPOINT, - ) as s_3: - bucket = await s_3.Bucket(r_clone_settings.R_CLONE_S3.S3_BUCKET_NAME) - s3_objects = [_ async for _ in bucket.objects.all()] - await asyncio.gather(*[o.delete() for o in s3_objects]) + + yield + + async with session.client("s3", endpoint_url=r_clone_settings.R_CLONE_S3.S3_ENDPOINT) as s3_client: # type: ignore + # List all object versions + paginator = s3_client.get_paginator("list_object_versions") + async for page in paginator.paginate( + Bucket=r_clone_settings.R_CLONE_S3.S3_BUCKET_NAME + ): + # Prepare delete markers and versions for deletion + delete_markers = page.get("DeleteMarkers", []) + versions = page.get("Versions", []) + + objects_to_delete = [ + {"Key": obj["Key"], "VersionId": obj["VersionId"]} + for obj in delete_markers + versions + ] + + # Perform deletion + if objects_to_delete: + await s3_client.delete_objects( + Bucket=r_clone_settings.R_CLONE_S3.S3_BUCKET_NAME, + Delete={"Objects": objects_to_delete, "Quiet": True}, + ) def _fake_s3_link(r_clone_settings: RCloneSettings, s3_object: str) -> AnyUrl: diff --git a/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py b/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py index 3dcb20fc10d..c1edb4f183c 100644 --- a/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py +++ b/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py @@ -103,6 +103,7 @@ async def test_push_folder( io_log_redirect_cb=mock_io_log_redirect_cb, progress_bar=progress_bar, r_clone_settings=r_clone_settings, + aws_s3_cli_settings=None, ) assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 @@ -116,6 +117,7 @@ async def test_push_folder( user_id=user_id, progress_bar=progress_bar, exclude_patterns=None, + aws_s3_cli_settings=None, ) @@ -151,6 +153,7 @@ async def test_push_file( io_log_redirect_cb=mock_io_log_redirect_cb, progress_bar=progress_bar, r_clone_settings=r_clone_settings, + aws_s3_cli_settings=None, ) assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 mock_temporary_directory.assert_not_called() @@ -164,6 +167,7 @@ async def test_push_file( user_id=user_id, progress_bar=progress_bar, exclude_patterns=None, + aws_s3_cli_settings=None, ) mock_filemanager.reset_mock() @@ -237,6 +241,7 @@ async def test_pull_legacy_archive( io_log_redirect_cb=mock_io_log_redirect_cb, r_clone_settings=None, progress_bar=progress_bar._children[0], # noqa: SLF001 + aws_s3_cli_settings=None, ) matchs, mismatchs, errors = cmpfiles( @@ -281,6 +286,7 @@ async def test_pull_directory( io_log_redirect_cb=mock_io_log_redirect_cb, r_clone_settings=r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=None, ) assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 mock_filemanager.download_path_from_s3.assert_called_once_with( @@ -292,4 +298,5 @@ async def test_pull_directory( io_log_redirect_cb=mock_io_log_redirect_cb, r_clone_settings=r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=None, ) diff --git a/packages/simcore-sdk/tests/unit/test_node_ports_v2_port.py b/packages/simcore-sdk/tests/unit/test_node_ports_v2_port.py index d4613b39d5f..9cf9d89eb9a 100644 --- a/packages/simcore-sdk/tests/unit/test_node_ports_v2_port.py +++ b/packages/simcore-sdk/tests/unit/test_node_ports_v2_port.py @@ -615,6 +615,7 @@ class FakeNodePorts: node_uuid: str r_clone_settings: Any | None = None io_log_redirect_cb: LogRedirectCB | None = _io_log_redirect_cb + aws_s3_cli_settings: Any | None = None @staticmethod async def get(key: str, progress_bar: ProgressBarData | None = None): diff --git a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py index 35d1e9e7afb..98ce21fc6a4 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py @@ -10,6 +10,7 @@ ensure_unique_list_values_validator, ) from pydantic import Field, PositiveInt, validator +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.base import BaseCustomSettings from settings_library.efs import AwsEfsSettings from settings_library.r_clone import RCloneSettings as SettingsLibraryRCloneSettings @@ -125,6 +126,9 @@ class DynamicSidecarSettings(BaseCustomSettings, MixinLoggingSettings): DYNAMIC_SIDECAR_R_CLONE_SETTINGS: RCloneSettings = Field(auto_default_from_env=True) + DYNAMIC_SIDECAR_AWS_S3_CLI_SETTINGS: AwsS3CliSettings | None = Field( + auto_default_from_env=True + ) DYNAMIC_SIDECAR_EFS_SETTINGS: AwsEfsSettings | None = Field( auto_default_from_env=True ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index b26576068d8..eaae9e43edb 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -19,7 +19,9 @@ from servicelib.rabbitmq import RabbitMQRPCClient from servicelib.rabbitmq.rpc_interfaces.efs_guardian import efs_manager from servicelib.utils import unused_port +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.node_ports import StorageAuthSettings +from settings_library.utils_cli import create_json_encoder_wo_secrets from ....constants import DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL from ....core.dynamic_services_settings.scheduler import ( @@ -96,6 +98,14 @@ def _get_environment_variables( r_clone_settings = ( app_settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_R_CLONE_SETTINGS ) + dy_sidecar_aws_s3_cli_settings = None + if ( + app_settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_AWS_S3_CLI_SETTINGS + and app_settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_AWS_S3_CLI_SETTINGS.AWS_S3_CLI_S3 + ): + dy_sidecar_aws_s3_cli_settings = app_settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_AWS_S3_CLI_SETTINGS.json( + encoder=create_json_encoder_wo_secrets(AwsS3CliSettings), + ) state_exclude = set() if scheduler_data.paths_mapping.state_exclude is not None: @@ -130,6 +140,7 @@ def _get_environment_variables( f"{x}" for x in scheduler_data.paths_mapping.state_paths ), "DY_SIDECAR_USER_ID": f"{scheduler_data.user_id}", + "DY_SIDECAR_AWS_S3_CLI_SETTINGS": dy_sidecar_aws_s3_cli_settings, "DYNAMIC_SIDECAR_COMPOSE_NAMESPACE": compose_namespace, "DYNAMIC_SIDECAR_LOG_LEVEL": app_settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_LOG_LEVEL, "DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED": f"{app_settings.DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED}", diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 6d58aeb2cd7..2fe09c42286 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -678,6 +678,7 @@ async def _fetch_data_via_data_manager( io_log_redirect_cb=io_log_redirect_cb, r_clone_settings=r_clone_settings, progress_bar=progress_bar, + aws_s3_cli_settings=None, ) return save_to diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py index 4001403c684..d9d87920ba3 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py @@ -16,6 +16,7 @@ # PLEASE keep alphabetical to simplify debugging EXPECTED_DYNAMIC_SIDECAR_ENV_VAR_NAMES: Final[set[str]] = { + "DY_SIDECAR_AWS_S3_CLI_SETTINGS", "DY_SIDECAR_CALLBACKS_MAPPING", "DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED", "DY_SIDECAR_NODE_ID", diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py index ce475fa5555..7152afe80e1 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py @@ -61,6 +61,7 @@ def mock_env( env_vars = mock_env.copy() env_vars.update( { + "AWS_S3_CLI_S3": '{"S3_ACCESS_KEY":"12345678","S3_BUCKET_NAME":"simcore","S3_ENDPOINT":"http://172.17.0.1:9001","S3_REGION":"us-east-1","S3_SECRET_KEY":"12345678"}', "DYNAMIC_SIDECAR_IMAGE": "local/dynamic-sidecar:MOCK", "LOG_LEVEL": "DEBUG", "POSTGRES_DB": "test", @@ -239,6 +240,10 @@ def expected_dynamic_sidecar_spec( "FORWARD_ENV_DISPLAY": ":0", "NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS": "3", "DYNAMIC_SIDECAR_LOG_LEVEL": "DEBUG", + "DY_SIDECAR_AWS_S3_CLI_SETTINGS": ( + '{"AWS_S3_CLI_S3": {"S3_ACCESS_KEY": "12345678", "S3_BUCKET_NAME": "simcore", ' + '"S3_ENDPOINT": "http://172.17.0.1:9001", "S3_REGION": "us-east-1", "S3_SECRET_KEY": "12345678"}}' + ), "DY_SIDECAR_CALLBACKS_MAPPING": ( '{"metrics": {"service": "rt-web", "command": "ls", "timeout": 1.0}, "before_shutdown"' ': [{"service": "rt-web", "command": "ls", "timeout": 1.0}, {"service": "s4l-core", ' diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 271c29e7c83..1564a1b3185 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -272,6 +272,8 @@ services: init: true hostname: "{{.Node.Hostname}}-{{.Task.Slot}}" environment: + AWS_S3_CLI_S3: ${AWS_S3_CLI_S3} + CATALOG_HOST: ${CATALOG_HOST} CATALOG_PORT: ${CATALOG_PORT} diff --git a/services/dynamic-sidecar/Dockerfile b/services/dynamic-sidecar/Dockerfile index 2c2229fe4ad..07a1701b657 100644 --- a/services/dynamic-sidecar/Dockerfile +++ b/services/dynamic-sidecar/Dockerfile @@ -52,6 +52,16 @@ RUN \ --mount=type=bind,source=scripts/install_rclone.bash,target=install_rclone.bash \ ./install_rclone.bash +RUN AWS_CLI_VERSION="2.11.11" \ + && curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip" -o "awscliv2.zip" \ + && apt-get update && apt-get install -y unzip \ + && unzip awscliv2.zip \ + && ./aws/install \ + && apt-get remove --purge -y unzip \ + && rm awscliv2.zip \ + && rm -rf awscliv2 \ + && aws --version + # simcore-user uid=8004(scu) gid=8004(scu) groups=8004(scu) ENV SC_USER_ID=8004 \ SC_USER_NAME=scu \ diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py index a925bb1c5ee..41af4b9f853 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py @@ -12,6 +12,7 @@ from models_library.services import DynamicServiceKey, RunID, ServiceVersion from models_library.users import UserID from pydantic import ByteSize, Field, PositiveInt, parse_obj_as, validator +from settings_library.aws_s3_cli import AwsS3CliSettings from settings_library.base import BaseCustomSettings from settings_library.docker_registry import RegistrySettings from settings_library.node_ports import StorageAuthSettings @@ -146,7 +147,10 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): auto_default_from_env=True ) DY_SIDECAR_R_CLONE_SETTINGS: RCloneSettings = Field(auto_default_from_env=True) - + DY_SIDECAR_AWS_S3_CLI_SETTINGS: AwsS3CliSettings | None = Field( + None, + description="AWS S3 settings are used for the AWS S3 CLI. If these settings are filled, the AWS S3 CLI is used instead of RClone.", + ) POSTGRES_SETTINGS: PostgresSettings = Field(auto_default_from_env=True) RABBIT_SETTINGS: RabbitSettings = Field(auto_default_from_env=True) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 592ce9c39c3..52754a6687a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -336,6 +336,7 @@ async def _restore_state_folder( ), r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS, progress_bar=progress_bar, + aws_s3_cli_settings=settings.DY_SIDECAR_AWS_S3_CLI_SETTINGS, ) @@ -407,6 +408,7 @@ async def _save_state_folder( post_sidecar_log_message, app, log_level=logging.INFO ), progress_bar=progress_bar, + aws_s3_cli_settings=settings.DY_SIDECAR_AWS_S3_CLI_SETTINGS, ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 44f8ad9bfab..e3f56fcafdc 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -85,6 +85,7 @@ async def upload_outputs( node_uuid=NodeIDStr(settings.DY_SIDECAR_NODE_ID), r_clone_settings=None, io_log_redirect_cb=io_log_redirect_cb, + aws_s3_cli_settings=None, ) # let's gather the tasks @@ -267,6 +268,7 @@ async def download_target_ports( node_uuid=NodeIDStr(settings.DY_SIDECAR_NODE_ID), r_clone_settings=None, io_log_redirect_cb=io_log_redirect_cb, + aws_s3_cli_settings=None, ) # let's gather all the data diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index 2b630b9e030..7084838eae0 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -276,15 +276,19 @@ async def create_file_upload_links( conn, parse_obj_as(SimcoreS3FileID, file_id) ) - # ensure file is deleted first in case it already exists - await self.delete_file( - user_id=user_id, - file_id=file_id, - # NOTE: bypassing check since the project access rights don't play well - # with collaborators - # SEE https://github.com/ITISFoundation/osparc-simcore/issues/5159 - enforce_access_rights=False, - ) + if ( + not is_directory + ): # NOTE: Delete is not needed for directories that are synced via an external tool (rclone/aws s3 cli). + # ensure file is deleted first in case it already exists + # https://github.com/ITISFoundation/osparc-simcore/pull/5108 + await self.delete_file( + user_id=user_id, + file_id=file_id, + # NOTE: bypassing check since the project access rights don't play well + # with collaborators + # SEE https://github.com/ITISFoundation/osparc-simcore/issues/5159 + enforce_access_rights=False, + ) async with self.engine.acquire() as conn: # initiate the file meta data table fmd = await self._create_fmd_for_upload(