From 987825254d45a97d1a4684ecefcf206a9649287e Mon Sep 17 00:00:00 2001 From: "Vammi, Vijay" Date: Sun, 2 Feb 2025 03:33:21 +0000 Subject: [PATCH 1/5] feat: catalog uses cloudpathlib with s3 added --- extensions/catalog/any_path.py | 262 ++++++++++++++++++ extensions/catalog/file_system.py | 252 +---------------- extensions/catalog/s3.py | 12 + pyproject.toml | 2 + runnable/utils.py | 39 +-- .../{test_file_system.py => test_any_path.py} | 2 +- uv.lock | 14 + 7 files changed, 317 insertions(+), 266 deletions(-) create mode 100644 extensions/catalog/any_path.py create mode 100644 extensions/catalog/s3.py rename tests/extensions/catalog/{test_file_system.py => test_any_path.py} (99%) diff --git a/extensions/catalog/any_path.py b/extensions/catalog/any_path.py new file mode 100644 index 00000000..ed9f9e17 --- /dev/null +++ b/extensions/catalog/any_path.py @@ -0,0 +1,262 @@ +import logging +import os +import shutil +from abc import abstractmethod +from pathlib import Path +from typing import Any, Dict, List, Optional, Type + +from cloudpathlib import S3Path + +from runnable import defaults, utils +from runnable.catalog import BaseCatalog, is_catalog_out_of_sync +from runnable.datastore import DataCatalog + +logger = logging.getLogger(defaults.LOGGER_NAME) + + +class AnyPathCatalog(BaseCatalog): + """ + A Catalog handler that uses the local file system for cataloging. + + Note: Do not use this if the steps of the pipeline run on different compute environments. + + Example config: + + catalog: + type: file-system + config: + catalog_location: The location to store the catalog. + compute_data_folder: The folder to source the data from. + + """ + + catalog_location: str = defaults.CATALOG_LOCATION_FOLDER + + def get_catalog_location(self): + return self.catalog_location + + def get_summary(self) -> Dict[str, Any]: + summary = { + "Catalog Location": self.get_catalog_location(), + } + + return summary + + @abstractmethod + def get_path_client(self) -> Type[S3Path] | Type[Path]: ... + + def get( + self, name: str, run_id: str, compute_data_folder: str = "", **kwargs + ) -> List[DataCatalog]: + """ + Get the file by matching glob pattern to the name + + Args: + name ([str]): A glob matching the file name + run_id ([str]): The run id + + Raises: + Exception: If the catalog location does not exist + + Returns: + List(object) : A list of catalog objects + """ + logger.info( + f"Using the {self.service_name} catalog and trying to get {name} for run_id: {run_id}" + ) + + copy_to = self.compute_data_folder + if compute_data_folder: + copy_to = compute_data_folder + + client = self.get_path_client() + copy_to = client(copy_to) # type: ignore + + catalog_location = self.get_catalog_location() + run_catalog = client(catalog_location) / run_id / copy_to + + logger.debug( + f"Copying objects to {copy_to} from the run catalog location of {run_catalog}" + ) + + if not run_catalog.is_dir(): + msg = ( + f"Expected Catalog to be present at: {run_catalog} but not found.\n" + "Note: Please make sure that some data was put in the catalog before trying to get from it.\n" + ) + raise Exception(msg) + + # Iterate through the contents of the run_catalog and copy the files that fit the name pattern + # We should also return a list of data hashes + glob_files = run_catalog.glob(name) + logger.debug( + f"Glob identified {glob_files} as matches to from the catalog location: {run_catalog}" + ) + + data_catalogs = [] + run_log_store = self._context.run_log_store + for file in glob_files: + if file.is_dir(): + # Need not add a data catalog for the folder + continue + + if str(file).endswith(".execution.log"): + continue + + relative_file_path = file.relative_to(run_catalog) # type: ignore + + data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) + data_catalog.catalog_handler_location = catalog_location + data_catalog.catalog_relative_path = str(relative_file_path) + data_catalog.data_hash = utils.get_data_hash(str(file)) + data_catalog.stage = "get" + data_catalogs.append(data_catalog) + + # Make the directory in the data folder if required + Path(copy_to / relative_file_path.parent).mkdir(parents=True, exist_ok=True) + shutil.copy(file, copy_to / relative_file_path) + + logger.info(f"Copied {file} from {run_catalog} to {copy_to}") + + if not data_catalogs: + raise Exception(f"Did not find any files matching {name} in {run_catalog}") + + return data_catalogs + + def put( + self, + name: str, + run_id: str, + compute_data_folder: str = "", + synced_catalogs: Optional[List[DataCatalog]] = None, + **kwargs, + ) -> List[DataCatalog]: + """ + Put the files matching the glob pattern into the catalog. + + If previously synced catalogs are provided, and no changes were observed, we do not sync them. + + Args: + name (str): The glob pattern of the files to catalog + run_id (str): The run id of the run + compute_data_folder (str, optional): The compute data folder to sync from. Defaults to settings default. + synced_catalogs (dict, optional): dictionary of previously synced catalogs. Defaults to None. + + Raises: + Exception: If the compute data folder does not exist. + + Returns: + List(object) : A list of catalog objects + """ + logger.info( + f"Using the {self.service_name} catalog and trying to put {name} for run_id: {run_id}" + ) + + client = self.get_path_client() + + copy_from = self.compute_data_folder + if compute_data_folder: + copy_from = compute_data_folder + + copy_from = client(copy_from) # type: ignore + + catalog_location = self.get_catalog_location() + run_catalog = client(catalog_location) / run_id + run_catalog.mkdir(parents=True, exist_ok=True) + + logger.debug( + f"Copying objects from {copy_from} to the run catalog location of {run_catalog}" + ) + + if not utils.does_dir_exist(copy_from): + msg = ( + f"Expected compute data folder to be present at: {compute_data_folder} but not found. \n" + "Note: runnable does not create the compute data folder for you. Please ensure that the " + "folder exists.\n" + ) + raise Exception(msg) + + # Iterate through the contents of copy_from and if the name matches, we move them to the run_catalog + # We should also return a list of datastore.DataCatalog items + + glob_files = copy_from.glob(name) # type: ignore + logger.debug( + f"Glob identified {glob_files} as matches to from the compute data folder: {copy_from}" + ) + + data_catalogs = [] + run_log_store = self._context.run_log_store + for file in glob_files: + if file.is_dir(): + # Need not add a data catalog for the folder + continue + + relative_file_path = file.relative_to(".") + + data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) + data_catalog.catalog_handler_location = catalog_location + data_catalog.catalog_relative_path = ( + run_id + os.sep + str(relative_file_path) + ) + data_catalog.data_hash = utils.get_data_hash(str(file)) + data_catalog.stage = "put" + data_catalogs.append(data_catalog) + + if is_catalog_out_of_sync(data_catalog, synced_catalogs): + logger.info(f"{data_catalog.name} was found to be changed, syncing") + + # Make the directory in the catalog if required + client(run_catalog / relative_file_path.parent).mkdir( + parents=True, exist_ok=True + ) + shutil.copy(file, run_catalog / relative_file_path) + else: + logger.info( + f"{data_catalog.name} was found to be unchanged, ignoring syncing" + ) + + if not data_catalogs: + raise Exception(f"Did not find any files matching {name} in {copy_from}") + + return data_catalogs + + def sync_between_runs(self, previous_run_id: str, run_id: str): + """ + Given the previous run id, sync the catalogs between the current one and previous + + Args: + previous_run_id (str): The previous run id to sync the catalogs from + run_id (str): The run_id to which the data catalogs should be synced to. + + Raises: + Exception: If the previous run log does not exist in the catalog + + """ + logger.info( + f"Using the {self.service_name} catalog and syncing catalogs" + "between old: {previous_run_id} to new: {run_id}" + ) + + catalog_location = Path(self.get_catalog_location()) + run_catalog = catalog_location / run_id + utils.safe_make_dir(run_catalog) + + if not utils.does_dir_exist(catalog_location / previous_run_id): + msg = ( + f"Catalogs from previous run : {previous_run_id} are not found.\n" + "Note: Please provision the catalog objects generated by previous run in the same catalog location" + " as the current run, even if the catalog handler for the previous run was different" + ) + raise Exception(msg) + + cataloged_files = list((catalog_location / previous_run_id).glob("*")) + + for cataloged_file in cataloged_files: + if str(cataloged_file).endswith("execution.log"): + continue + + if cataloged_file.is_file(): + shutil.copy(cataloged_file, run_catalog / cataloged_file.name) + else: + shutil.copytree(cataloged_file, run_catalog / cataloged_file.name) + logger.info(f"Copied file from: {cataloged_file} to {run_catalog}") diff --git a/extensions/catalog/file_system.py b/extensions/catalog/file_system.py index 901ea086..826fc278 100644 --- a/extensions/catalog/file_system.py +++ b/extensions/catalog/file_system.py @@ -1,253 +1,11 @@ -import logging -import os -import shutil from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Type -from runnable import defaults, utils -from runnable.catalog import BaseCatalog, is_catalog_out_of_sync -from runnable.datastore import DataCatalog +from extensions.catalog.any_path import AnyPathCatalog -logger = logging.getLogger(defaults.LOGGER_NAME) - - -class FileSystemCatalog(BaseCatalog): - """ - A Catalog handler that uses the local file system for cataloging. - - Note: Do not use this if the steps of the pipeline run on different compute environments. - - Example config: - - catalog: - type: file-system - config: - catalog_location: The location to store the catalog. - compute_data_folder: The folder to source the data from. - - """ +class FileSystemCatalog(AnyPathCatalog): service_name: str = "file-system" - catalog_location: str = defaults.CATALOG_LOCATION_FOLDER - - def get_catalog_location(self): - return self.catalog_location - - def get_summary(self) -> Dict[str, Any]: - summary = { - "Catalog Location": self.get_catalog_location(), - } - - return summary - - def get( - self, name: str, run_id: str, compute_data_folder: str = "", **kwargs - ) -> List[DataCatalog]: - """ - Get the file by matching glob pattern to the name - - Args: - name ([str]): A glob matching the file name - run_id ([str]): The run id - - Raises: - Exception: If the catalog location does not exist - - Returns: - List(object) : A list of catalog objects - """ - logger.info( - f"Using the {self.service_name} catalog and trying to get {name} for run_id: {run_id}" - ) - - copy_to = self.compute_data_folder - if compute_data_folder: - copy_to = compute_data_folder - - copy_to = Path(copy_to) # type: ignore - - catalog_location = self.get_catalog_location() - run_catalog = Path(catalog_location) / run_id / copy_to - - logger.debug( - f"Copying objects to {copy_to} from the run catalog location of {run_catalog}" - ) - - if not utils.does_dir_exist(run_catalog): - msg = ( - f"Expected Catalog to be present at: {run_catalog} but not found.\n" - "Note: Please make sure that some data was put in the catalog before trying to get from it.\n" - ) - raise Exception(msg) - - # Iterate through the contents of the run_catalog and copy the files that fit the name pattern - # We should also return a list of data hashes - glob_files = run_catalog.glob(name) - logger.debug( - f"Glob identified {glob_files} as matches to from the catalog location: {run_catalog}" - ) - - data_catalogs = [] - run_log_store = self._context.run_log_store - for file in glob_files: - if file.is_dir(): - # Need not add a data catalog for the folder - continue - - if str(file).endswith(".execution.log"): - continue - - relative_file_path = file.relative_to(run_catalog) - - data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) - data_catalog.catalog_handler_location = catalog_location - data_catalog.catalog_relative_path = str(relative_file_path) - data_catalog.data_hash = utils.get_data_hash(str(file)) - data_catalog.stage = "get" - data_catalogs.append(data_catalog) - - # Make the directory in the data folder if required - Path(copy_to / relative_file_path.parent).mkdir(parents=True, exist_ok=True) - shutil.copy(file, copy_to / relative_file_path) - - logger.info(f"Copied {file} from {run_catalog} to {copy_to}") - - if not data_catalogs: - raise Exception(f"Did not find any files matching {name} in {run_catalog}") - - return data_catalogs - - def put( - self, - name: str, - run_id: str, - compute_data_folder: str = "", - synced_catalogs: Optional[List[DataCatalog]] = None, - **kwargs, - ) -> List[DataCatalog]: - """ - Put the files matching the glob pattern into the catalog. - - If previously synced catalogs are provided, and no changes were observed, we do not sync them. - - Args: - name (str): The glob pattern of the files to catalog - run_id (str): The run id of the run - compute_data_folder (str, optional): The compute data folder to sync from. Defaults to settings default. - synced_catalogs (dict, optional): dictionary of previously synced catalogs. Defaults to None. - - Raises: - Exception: If the compute data folder does not exist. - - Returns: - List(object) : A list of catalog objects - """ - logger.info( - f"Using the {self.service_name} catalog and trying to put {name} for run_id: {run_id}" - ) - - copy_from = self.compute_data_folder - if compute_data_folder: - copy_from = compute_data_folder - copy_from = Path(copy_from) # type: ignore - - catalog_location = self.get_catalog_location() - run_catalog = Path(catalog_location) / run_id - utils.safe_make_dir(run_catalog) - - logger.debug( - f"Copying objects from {copy_from} to the run catalog location of {run_catalog}" - ) - - if not utils.does_dir_exist(copy_from): - msg = ( - f"Expected compute data folder to be present at: {compute_data_folder} but not found. \n" - "Note: runnable does not create the compute data folder for you. Please ensure that the " - "folder exists.\n" - ) - raise Exception(msg) - - # Iterate through the contents of copy_from and if the name matches, we move them to the run_catalog - # We should also return a list of datastore.DataCatalog items - - glob_files = copy_from.glob(name) # type: ignore - logger.debug( - f"Glob identified {glob_files} as matches to from the compute data folder: {copy_from}" - ) - - data_catalogs = [] - run_log_store = self._context.run_log_store - for file in glob_files: - if file.is_dir(): - # Need not add a data catalog for the folder - continue - - relative_file_path = file.relative_to(".") - - data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) - data_catalog.catalog_handler_location = catalog_location - data_catalog.catalog_relative_path = ( - run_id + os.sep + str(relative_file_path) - ) - data_catalog.data_hash = utils.get_data_hash(str(file)) - data_catalog.stage = "put" - data_catalogs.append(data_catalog) - - if is_catalog_out_of_sync(data_catalog, synced_catalogs): - logger.info(f"{data_catalog.name} was found to be changed, syncing") - - # Make the directory in the catalog if required - Path(run_catalog / relative_file_path.parent).mkdir( - parents=True, exist_ok=True - ) - shutil.copy(file, run_catalog / relative_file_path) - else: - logger.info( - f"{data_catalog.name} was found to be unchanged, ignoring syncing" - ) - - if not data_catalogs: - raise Exception(f"Did not find any files matching {name} in {copy_from}") - - return data_catalogs - - def sync_between_runs(self, previous_run_id: str, run_id: str): - """ - Given the previous run id, sync the catalogs between the current one and previous - - Args: - previous_run_id (str): The previous run id to sync the catalogs from - run_id (str): The run_id to which the data catalogs should be synced to. - - Raises: - Exception: If the previous run log does not exist in the catalog - - """ - logger.info( - f"Using the {self.service_name} catalog and syncing catalogs" - "between old: {previous_run_id} to new: {run_id}" - ) - - catalog_location = Path(self.get_catalog_location()) - run_catalog = catalog_location / run_id - utils.safe_make_dir(run_catalog) - - if not utils.does_dir_exist(catalog_location / previous_run_id): - msg = ( - f"Catalogs from previous run : {previous_run_id} are not found.\n" - "Note: Please provision the catalog objects generated by previous run in the same catalog location" - " as the current run, even if the catalog handler for the previous run was different" - ) - raise Exception(msg) - - cataloged_files = list((catalog_location / previous_run_id).glob("*")) - - for cataloged_file in cataloged_files: - if str(cataloged_file).endswith("execution.log"): - continue - if cataloged_file.is_file(): - shutil.copy(cataloged_file, run_catalog / cataloged_file.name) - else: - shutil.copytree(cataloged_file, run_catalog / cataloged_file.name) - logger.info(f"Copied file from: {cataloged_file} to {run_catalog}") + def get_path_client(self) -> Type[Path]: + return Path diff --git a/extensions/catalog/s3.py b/extensions/catalog/s3.py new file mode 100644 index 00000000..758c8ab7 --- /dev/null +++ b/extensions/catalog/s3.py @@ -0,0 +1,12 @@ +from typing import Type + +from cloudpathlib import S3Path + +from extensions.catalog.any_path import AnyPathCatalog + + +class S3Catalog(AnyPathCatalog): + service_name: str = "s3" + + def get_path_client(self) -> Type[S3Path]: + return S3Path diff --git a/pyproject.toml b/pyproject.toml index 6bb16604..e3760daa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "setuptools>=75.6.0", "python-dotenv>=1.0.1", "typer>=0.15.1", + "cloudpathlib>=0.20.0", ] [project.optional-dependencies] @@ -112,6 +113,7 @@ include = [ [project.entry-points.'catalog'] "do-nothing" = "runnable.catalog:DoNothingCatalog" "file-system" = "extensions.catalog.file_system:FileSystemCatalog" +"s3" = "extensions.catalog.s3:S3Catalog" [project.entry-points.'run_log_store'] "buffered" = "runnable.datastore:BufferRunLogstore" diff --git a/runnable/utils.py b/runnable/utils.py index 0282fe87..68123066 100644 --- a/runnable/utils.py +++ b/runnable/utils.py @@ -359,26 +359,26 @@ def diff_dict(d1: Dict[str, Any], d2: Dict[str, Any]) -> Dict[str, Any]: return diff -def hash_bytestr_iter(bytesiter, hasher, ashexstr=True): # pylint: disable=C0116 - """Hashes the given bytesiter using the given hasher.""" - for block in bytesiter: # pragma: no cover - hasher.update(block) - return hasher.hexdigest() if ashexstr else hasher.digest() # pragma: no cover +# def hash_bytestr_iter(bytesiter, hasher, ashexstr=True): # pylint: disable=C0116 +# """Hashes the given bytesiter using the given hasher.""" +# for block in bytesiter: # pragma: no cover +# hasher.update(block) +# return hasher.hexdigest() if ashexstr else hasher.digest() # pragma: no cover -def file_as_blockiter(afile, blocksize=65536): # pylint: disable=C0116 - """From a StackOverflow answer: that is used to generate a MD5 hash of a large files. - # https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file. +# def file_as_blockiter(afile, blocksize=65536): # pylint: disable=C0116 +# """From a StackOverflow answer: that is used to generate a MD5 hash of a large files. +# # https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file. - """ - with afile: # pragma: no cover - block = afile.read(blocksize) - while len(block) > 0: - yield block - block = afile.read(blocksize) +# """ +# with afile: # pragma: no cover +# block = afile.read(blocksize) +# while len(block) > 0: +# yield block +# block = afile.read(blocksize) -def get_data_hash(file_name: str): +def get_data_hash(file_name: str) -> str: """Returns the hash of the data file. Args: @@ -389,9 +389,12 @@ def get_data_hash(file_name: str): """ # https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file # TODO: For a big file, we should only hash the first few bytes - return hash_bytestr_iter( - file_as_blockiter(open(file_name, "rb")), hashlib.sha256() - ) # pragma: no cover + with open(file_name, "rb") as f: + file_hash = hashlib.md5() + for chunk in iter(lambda: f.read(4096), b""): + file_hash.update(chunk) + + return file_hash.hexdigest() # TODO: This is not the right place for this. diff --git a/tests/extensions/catalog/test_file_system.py b/tests/extensions/catalog/test_any_path.py similarity index 99% rename from tests/extensions/catalog/test_file_system.py rename to tests/extensions/catalog/test_any_path.py index f6b8036a..473a4c62 100644 --- a/tests/extensions/catalog/test_file_system.py +++ b/tests/extensions/catalog/test_any_path.py @@ -3,7 +3,7 @@ import pytest -from extensions.catalog import file_system as module +from extensions.catalog import any_path as module from extensions.catalog.file_system import FileSystemCatalog from runnable import defaults diff --git a/uv.lock b/uv.lock index a7fb3f02..dc183794 100644 --- a/uv.lock +++ b/uv.lock @@ -256,6 +256,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e9/da/824b92d9942f4e472702488857914bdd50f73021efea15b4cad9aca8ecef/click_plugins-1.1.1-py2.py3-none-any.whl", hash = "sha256:5d262006d3222f5057fd81e1623d4443e41dcda5dc815c06b442aa3c02889fc8", size = 7497 }, ] +[[package]] +name = "cloudpathlib" +version = "0.20.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/71/0b/a47d78ed2816db100543b504fdbfc2070f422aac858e6bcf775713e37b8a/cloudpathlib-0.20.0.tar.gz", hash = "sha256:f6ef7ca409a510f7ba4639ba50ab3fc5b6dee82d6dff0d7f5715fd0c9ab35891", size = 45149 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1f/6e/b64600156934dab14cc8b403095a9ea8bd722aad2e775673c68346b76220/cloudpathlib-0.20.0-py3-none-any.whl", hash = "sha256:7af3bcefbf73392ae7f31c08b3660ec31607f8c01b7f6262d4d73469a845f641", size = 52547 }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -2088,6 +2100,7 @@ source = { editable = "." } dependencies = [ { name = "click" }, { name = "click-plugins" }, + { name = "cloudpathlib" }, { name = "dill" }, { name = "pydantic" }, { name = "python-dotenv" }, @@ -2136,6 +2149,7 @@ release = [ requires-dist = [ { name = "click", specifier = "<=8.1.3" }, { name = "click-plugins", specifier = ">=1.1.1" }, + { name = "cloudpathlib", specifier = ">=0.20.0" }, { name = "dill", specifier = ">=0.3.9" }, { name = "docker", marker = "extra == 'docker'", specifier = ">=7.1.0" }, { name = "kubernetes", marker = "extra == 'k8s'", specifier = ">=31.0.0" }, From e7850f09e814a361690dfd55b561cf1aa2c5be1f Mon Sep 17 00:00:00 2001 From: "Vammi, Vijay" Date: Sun, 2 Feb 2025 16:05:19 +0000 Subject: [PATCH 2/5] feat: generic AnyPath catalog implementation --- extensions/catalog/any_path.py | 107 ++---- extensions/catalog/file_system.py | 47 ++- extensions/catalog/minio.py | 17 + extensions/catalog/s3.py | 7 +- extensions/pipeline_executor/__init__.py | 37 +-- pyproject.toml | 4 + runnable/catalog.py | 36 +- runnable/datastore.py | 4 +- runnable/executor.py | 17 - runnable/tasks.py | 4 +- tests/extensions/catalog/test_any_path.py | 312 ------------------ .../catalog/test_catalog_extension.py | 52 --- .../test_generic_executor.py | 92 +----- tests/runnable/test_catalog.py | 8 +- uv.lock | 58 ++++ 15 files changed, 168 insertions(+), 634 deletions(-) create mode 100644 extensions/catalog/minio.py delete mode 100644 tests/extensions/catalog/test_any_path.py delete mode 100644 tests/extensions/catalog/test_catalog_extension.py diff --git a/extensions/catalog/any_path.py b/extensions/catalog/any_path.py index ed9f9e17..bbbcd6a1 100644 --- a/extensions/catalog/any_path.py +++ b/extensions/catalog/any_path.py @@ -3,12 +3,12 @@ import shutil from abc import abstractmethod from pathlib import Path -from typing import Any, Dict, List, Optional, Type +from typing import Any, Dict, List -from cloudpathlib import S3Path +from cloudpathlib import CloudPath from runnable import defaults, utils -from runnable.catalog import BaseCatalog, is_catalog_out_of_sync +from runnable.catalog import BaseCatalog from runnable.datastore import DataCatalog logger = logging.getLogger(defaults.LOGGER_NAME) @@ -30,24 +30,19 @@ class AnyPathCatalog(BaseCatalog): """ - catalog_location: str = defaults.CATALOG_LOCATION_FOLDER - - def get_catalog_location(self): - return self.catalog_location + @abstractmethod + def get_summary(self) -> Dict[str, Any]: ... - def get_summary(self) -> Dict[str, Any]: - summary = { - "Catalog Location": self.get_catalog_location(), - } + @abstractmethod + def upload_to_catalog(self, file: Path) -> None: ... - return summary + @abstractmethod + def download_from_catalog(self, file: Path | CloudPath) -> None: ... @abstractmethod - def get_path_client(self) -> Type[S3Path] | Type[Path]: ... + def get_catalog_location(self) -> Path | CloudPath: ... - def get( - self, name: str, run_id: str, compute_data_folder: str = "", **kwargs - ) -> List[DataCatalog]: + def get(self, name: str) -> List[DataCatalog]: """ Get the file by matching glob pattern to the name @@ -61,30 +56,7 @@ def get( Returns: List(object) : A list of catalog objects """ - logger.info( - f"Using the {self.service_name} catalog and trying to get {name} for run_id: {run_id}" - ) - - copy_to = self.compute_data_folder - if compute_data_folder: - copy_to = compute_data_folder - - client = self.get_path_client() - copy_to = client(copy_to) # type: ignore - - catalog_location = self.get_catalog_location() - run_catalog = client(catalog_location) / run_id / copy_to - - logger.debug( - f"Copying objects to {copy_to} from the run catalog location of {run_catalog}" - ) - - if not run_catalog.is_dir(): - msg = ( - f"Expected Catalog to be present at: {run_catalog} but not found.\n" - "Note: Please make sure that some data was put in the catalog before trying to get from it.\n" - ) - raise Exception(msg) + run_catalog = self.get_catalog_location() # Iterate through the contents of the run_catalog and copy the files that fit the name pattern # We should also return a list of data hashes @@ -106,31 +78,19 @@ def get( relative_file_path = file.relative_to(run_catalog) # type: ignore data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) - data_catalog.catalog_handler_location = catalog_location data_catalog.catalog_relative_path = str(relative_file_path) data_catalog.data_hash = utils.get_data_hash(str(file)) data_catalog.stage = "get" data_catalogs.append(data_catalog) - # Make the directory in the data folder if required - Path(copy_to / relative_file_path.parent).mkdir(parents=True, exist_ok=True) - shutil.copy(file, copy_to / relative_file_path) - - logger.info(f"Copied {file} from {run_catalog} to {copy_to}") + self.download_from_catalog(file) if not data_catalogs: raise Exception(f"Did not find any files matching {name} in {run_catalog}") return data_catalogs - def put( - self, - name: str, - run_id: str, - compute_data_folder: str = "", - synced_catalogs: Optional[List[DataCatalog]] = None, - **kwargs, - ) -> List[DataCatalog]: + def put(self, name: str) -> List[DataCatalog]: """ Put the files matching the glob pattern into the catalog. @@ -148,29 +108,16 @@ def put( Returns: List(object) : A list of catalog objects """ + run_id = self._context.run_id logger.info( f"Using the {self.service_name} catalog and trying to put {name} for run_id: {run_id}" ) - client = self.get_path_client() - - copy_from = self.compute_data_folder - if compute_data_folder: - copy_from = compute_data_folder - - copy_from = client(copy_from) # type: ignore - - catalog_location = self.get_catalog_location() - run_catalog = client(catalog_location) / run_id - run_catalog.mkdir(parents=True, exist_ok=True) + copy_from = Path(self.compute_data_folder) - logger.debug( - f"Copying objects from {copy_from} to the run catalog location of {run_catalog}" - ) - - if not utils.does_dir_exist(copy_from): + if not copy_from.is_dir(): msg = ( - f"Expected compute data folder to be present at: {compute_data_folder} but not found. \n" + f"Expected compute data folder to be present at: {copy_from} but not found. \n" "Note: runnable does not create the compute data folder for you. Please ensure that the " "folder exists.\n" ) @@ -178,8 +125,7 @@ def put( # Iterate through the contents of copy_from and if the name matches, we move them to the run_catalog # We should also return a list of datastore.DataCatalog items - - glob_files = copy_from.glob(name) # type: ignore + glob_files = copy_from.glob(name) logger.debug( f"Glob identified {glob_files} as matches to from the compute data folder: {copy_from}" ) @@ -194,7 +140,6 @@ def put( relative_file_path = file.relative_to(".") data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) - data_catalog.catalog_handler_location = catalog_location data_catalog.catalog_relative_path = ( run_id + os.sep + str(relative_file_path) ) @@ -202,18 +147,8 @@ def put( data_catalog.stage = "put" data_catalogs.append(data_catalog) - if is_catalog_out_of_sync(data_catalog, synced_catalogs): - logger.info(f"{data_catalog.name} was found to be changed, syncing") - - # Make the directory in the catalog if required - client(run_catalog / relative_file_path.parent).mkdir( - parents=True, exist_ok=True - ) - shutil.copy(file, run_catalog / relative_file_path) - else: - logger.info( - f"{data_catalog.name} was found to be unchanged, ignoring syncing" - ) + # TODO: Think about syncing only if the file is changed + self.upload_to_catalog(file) if not data_catalogs: raise Exception(f"Did not find any files matching {name} in {copy_from}") diff --git a/extensions/catalog/file_system.py b/extensions/catalog/file_system.py index 826fc278..8a4ecd41 100644 --- a/extensions/catalog/file_system.py +++ b/extensions/catalog/file_system.py @@ -1,11 +1,52 @@ +import logging +import shutil from pathlib import Path -from typing import Type +from typing import Any + +from cloudpathlib import CloudPath +from pydantic import Field from extensions.catalog.any_path import AnyPathCatalog +from runnable import defaults + +logger = logging.getLogger(defaults.LOGGER_NAME) class FileSystemCatalog(AnyPathCatalog): service_name: str = "file-system" - def get_path_client(self) -> Type[Path]: - return Path + catalog_location: str = Field(default=defaults.CATALOG_LOCATION_FOLDER) + + def get_summary(self) -> dict[str, Any]: + return { + "compute_data_folder": self.compute_data_folder, + "catalog_location": self.catalog_location, + } + + def get_catalog_location(self) -> Path: + run_id = self._context.run_id + return Path(self.catalog_location) / run_id / self.compute_data_folder + + def download_from_catalog(self, file: Path | CloudPath) -> None: + assert isinstance(file, Path) + + run_catalog = self.get_catalog_location() + relative_file_path = file.relative_to(run_catalog) + + copy_to = self.compute_data_folder + # Make the directory in the data folder if required + Path(copy_to / relative_file_path.parent).mkdir(parents=True, exist_ok=True) + shutil.copy(file, copy_to / relative_file_path) + + def upload_to_catalog(self, file: Path) -> None: + run_catalog = self.get_catalog_location() + run_catalog.mkdir(parents=True, exist_ok=True) + + logger.debug( + f"Copying objects from {self.compute_data_folder} to the run catalog location of {run_catalog}" + ) + + relative_file_path = file.relative_to(".") + + (run_catalog / relative_file_path.parent).mkdir(parents=True, exist_ok=True) + shutil.copy(file, run_catalog / relative_file_path) diff --git a/extensions/catalog/minio.py b/extensions/catalog/minio.py new file mode 100644 index 00000000..3a3bced6 --- /dev/null +++ b/extensions/catalog/minio.py @@ -0,0 +1,17 @@ +from cloudpathlib import S3Client, S3Path + +from extensions.catalog.any_path import AnyPathCatalog + + +class MinioCatalog(AnyPathCatalog): + service_name: str = "minio" + bucket: str = "runnable" + + def get_path(self, path: str) -> S3Path: + # TODO: Might need to assert the credentials are set + client = S3Client( + endpoint_url="http://localhost:9002", + aws_access_key_id="minioadmin", + aws_secret_access_key="minioadmin", + ) + return client.CloudPath(f"s3://{self.bucket}/{path}") diff --git a/extensions/catalog/s3.py b/extensions/catalog/s3.py index 758c8ab7..8d7f2468 100644 --- a/extensions/catalog/s3.py +++ b/extensions/catalog/s3.py @@ -1,5 +1,3 @@ -from typing import Type - from cloudpathlib import S3Path from extensions.catalog.any_path import AnyPathCatalog @@ -8,5 +6,6 @@ class S3Catalog(AnyPathCatalog): service_name: str = "s3" - def get_path_client(self) -> Type[S3Path]: - return S3Path + def get_path(self, path: str) -> S3Path: + # TODO: Might need to assert the credentials are set + return S3Path(path) diff --git a/extensions/pipeline_executor/__init__.py b/extensions/pipeline_executor/__init__.py index 7531e5a9..dbf0b05f 100644 --- a/extensions/pipeline_executor/__init__.py +++ b/extensions/pipeline_executor/__init__.py @@ -151,54 +151,25 @@ def _sync_catalog( # Nothing to get/put from the catalog return None - compute_data_folder = self.get_effective_compute_data_folder() - data_catalogs = [] for name_pattern in node_catalog_settings.get(stage) or []: if stage == "get": data_catalog = self._context.catalog_handler.get( name=name_pattern, - run_id=self._context.run_id, - compute_data_folder=compute_data_folder, ) elif stage == "put": data_catalog = self._context.catalog_handler.put( name=name_pattern, - run_id=self._context.run_id, - compute_data_folder=compute_data_folder, - synced_catalogs=synced_catalogs, ) + else: + raise Exception(f"Stage {stage} not supported") logger.debug(f"Added data catalog: {data_catalog} to step log") data_catalogs.extend(data_catalog) return data_catalogs - def get_effective_compute_data_folder(self) -> str: - """ - Get the effective compute data folder for the given stage. - If there is nothing to catalog, we return None. - - The default is the compute data folder of the catalog but this can be over-ridden by the node. - - Args: - stage (str): The stage we are in the process of cataloging - - - Returns: - str: The compute data folder as defined by the node defaulting to catalog handler - """ - assert isinstance(self._context_node, BaseNode) - compute_data_folder = self._context.catalog_handler.compute_data_folder - - catalog_settings = self._context_node._get_catalog_settings() - effective_compute_data_folder = ( - catalog_settings.get("compute_data_folder", "") or compute_data_folder - ) - - return effective_compute_data_folder - @property def step_attempt_number(self) -> int: """ @@ -219,9 +190,7 @@ def add_task_log_to_catalog( ) task_console.save_text(log_file_name) # Put the log file in the catalog - self._context.catalog_handler.put( - name=log_file_name, run_id=self._context.run_id - ) + self._context.catalog_handler.put(name=log_file_name) os.remove(log_file_name) def _execute_node( diff --git a/pyproject.toml b/pyproject.toml index e3760daa..855fce16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,9 @@ examples = [ k8s = [ "kubernetes>=31.0.0", ] +s3 = [ + "cloudpathlib[s3]" +] [dependency-groups] dev = [ @@ -114,6 +117,7 @@ include = [ "do-nothing" = "runnable.catalog:DoNothingCatalog" "file-system" = "extensions.catalog.file_system:FileSystemCatalog" "s3" = "extensions.catalog.s3:S3Catalog" +"minio" = "extensions.catalog.minio:MinioCatalog" [project.entry-points.'run_log_store'] "buffered" = "runnable.datastore:BufferRunLogstore" diff --git a/runnable/catalog.py b/runnable/catalog.py index f8c8f220..bca2e75e 100644 --- a/runnable/catalog.py +++ b/runnable/catalog.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field import runnable.context as context from runnable import defaults @@ -43,6 +43,9 @@ class BaseCatalog(ABC, BaseModel): service_name: str = "" service_type: str = "catalog" + + compute_data_folder: str = Field(default=defaults.COMPUTE_DATA_FOLDER) + model_config = ConfigDict(extra="forbid") @abstractmethod @@ -52,14 +55,8 @@ def get_summary(self) -> Dict[str, Any]: ... def _context(self): return context.run_context - @property - def compute_data_folder(self) -> str: - return defaults.COMPUTE_DATA_FOLDER - @abstractmethod - def get( - self, name: str, run_id: str, compute_data_folder: str = "", **kwargs - ) -> List[DataCatalog]: + def get(self, name: str) -> List[DataCatalog]: """ Get the catalog item by 'name' for the 'run id' and store it in compute data folder. @@ -79,14 +76,7 @@ def get( raise NotImplementedError @abstractmethod - def put( - self, - name: str, - run_id: str, - compute_data_folder: str = "", - synced_catalogs: Optional[List[DataCatalog]] = None, - **kwargs, - ) -> List[DataCatalog]: + def put(self, name: str) -> List[DataCatalog]: """ Put the file by 'name' from the 'compute_data_folder' in the catalog for the run_id. @@ -140,23 +130,14 @@ class DoNothingCatalog(BaseCatalog): def get_summary(self) -> Dict[str, Any]: return {} - def get( - self, name: str, run_id: str, compute_data_folder: str = "", **kwargs - ) -> List[DataCatalog]: + def get(self, name: str) -> List[DataCatalog]: """ Does nothing """ logger.info("Using a do-nothing catalog, doing nothing in get") return [] - def put( - self, - name: str, - run_id: str, - compute_data_folder: str = "", - synced_catalogs: Optional[List[DataCatalog]] = None, - **kwargs, - ) -> List[DataCatalog]: + def put(self, name: str) -> List[DataCatalog]: """ Does nothing """ @@ -168,4 +149,3 @@ def sync_between_runs(self, previous_run_id: str, run_id: str): Does nothing """ logger.info("Using a do-nothing catalog, doing nothing while sync between runs") - logger.info("Using a do-nothing catalog, doing nothing while sync between runs") diff --git a/runnable/datastore.py b/runnable/datastore.py index 48b4e31f..34a81ca3 100644 --- a/runnable/datastore.py +++ b/runnable/datastore.py @@ -114,7 +114,7 @@ def get_value(self) -> Any: # If the object was serialised, get it from the catalog catalog_handler = context.run_context.catalog_handler - catalog_handler.get(name=self.file_name, run_id=context.run_context.run_id) + catalog_handler.get(name=self.file_name) obj = context.run_context.pickler.load(path=self.file_name) os.remove(self.file_name) # Remove after loading return obj @@ -128,7 +128,7 @@ def put_object(self, data: Any) -> None: context.run_context.pickler.dump(data=data, path=self.file_name) catalog_handler = context.run_context.catalog_handler - catalog_handler.put(name=self.file_name, run_id=context.run_context.run_id) + catalog_handler.put(name=self.file_name) os.remove(self.file_name) # Remove after loading diff --git a/runnable/executor.py b/runnable/executor.py index ad3713a9..f142e0ad 100644 --- a/runnable/executor.py +++ b/runnable/executor.py @@ -173,23 +173,6 @@ def add_code_identities(self, node: BaseNode, step_log: StepLog, **kwargs): """ ... - @abstractmethod - def get_effective_compute_data_folder(self) -> Optional[str]: - """ - Get the effective compute data folder for the given stage. - If there is nothing to catalog, we return None. - - The default is the compute data folder of the catalog but this can be over-ridden by the node. - - Args: - stage (str): The stage we are in the process of cataloging - - - Returns: - Optional[str]: The compute data folder as defined by catalog handler or the node or None. - """ - ... - @abstractmethod def _sync_catalog( self, stage: str, synced_catalogs=None diff --git a/runnable/tasks.py b/runnable/tasks.py index 43a1e05c..3e62c4af 100644 --- a/runnable/tasks.py +++ b/runnable/tasks.py @@ -501,9 +501,7 @@ def execute_command( pm.execute_notebook(**kwds) task_console.print(out_file.getvalue()) - context.run_context.catalog_handler.put( - name=notebook_output_path, run_id=context.run_context.run_id - ) + context.run_context.catalog_handler.put(name=notebook_output_path) client = PloomberClient.from_path(path=notebook_output_path) namespace = client.get_namespace() diff --git a/tests/extensions/catalog/test_any_path.py b/tests/extensions/catalog/test_any_path.py deleted file mode 100644 index 473a4c62..00000000 --- a/tests/extensions/catalog/test_any_path.py +++ /dev/null @@ -1,312 +0,0 @@ -import os -import tempfile - -import pytest - -from extensions.catalog import any_path as module -from extensions.catalog.file_system import FileSystemCatalog -from runnable import defaults - - -def test_file_system_catalog_inits_default_values_if_none_config(): - catalog_handler = FileSystemCatalog() - assert catalog_handler.compute_data_folder == defaults.COMPUTE_DATA_FOLDER - assert catalog_handler.catalog_location == defaults.CATALOG_LOCATION_FOLDER - - -def test_file_system_catalog_get_catalog_location_defaults_if_location_not_provided( - monkeypatch, mocker -): - catalog_handler = FileSystemCatalog() - - assert catalog_handler.catalog_location == defaults.CATALOG_LOCATION_FOLDER - - -def test_file_system_catalog_catalog_location_returns_config_catalog_location_if_provided( - monkeypatch, mocker -): - catalog_handler = FileSystemCatalog(catalog_location="this") - - assert catalog_handler.catalog_location == "this" - - -def test_file_system_catalog_get_raises_exception_if_catalog_does_not_exist( - monkeypatch, mocker -): - def mock_does_dir_exist(dir_name): - if dir_name == "this_compute_folder": - return True - return False - - monkeypatch.setattr(module.utils, "does_dir_exist", mock_does_dir_exist) - - catalog_handler = FileSystemCatalog(catalog_location="this_location") - with pytest.raises(Exception, match="Expected Catalog to be present at"): - catalog_handler.get( - "testing", run_id="dummy_run_id", compute_data_folder="this_compute_folder" - ) - - -def test_file_system_catalog_get_copies_files_from_catalog_to_compute_folder_with_all( - mocker, monkeypatch -): - mock_run_store = mocker.MagicMock() - mock_context = mocker.MagicMock() - mock_context.run_log_store = mock_run_store - - mocker.patch( - "runnable.catalog.BaseCatalog._context", - new_callable=mocker.PropertyMock, - return_value=mock_context, - ) - - with tempfile.TemporaryDirectory() as catalog_location: - with tempfile.TemporaryDirectory(dir=".") as compute_folder: - catalog_location_path = module.Path(catalog_location) - run_id = "testing" - module.Path(catalog_location_path / run_id / compute_folder).mkdir( - parents=True - ) - with open( - module.Path(catalog_location) - / run_id - / compute_folder - / "catalog_file", - "w", - ) as fw: - fw.write("hello") - - catalog_handler = FileSystemCatalog() - catalog_handler.catalog_location = catalog_location - - catalog_handler.get(name="**/*", run_id=run_id) - - _, _, files = next(os.walk(compute_folder)) - - assert len(list(files)) == 1 - - -def test_file_system_catalog_get_copies_files_from_catalog_to_compute_folder_with_pattern( - mocker, monkeypatch -): - mock_run_store = mocker.MagicMock() - mock_context = mocker.MagicMock() - mock_context.run_log_store = mock_run_store - - mocker.patch( - "runnable.catalog.BaseCatalog._context", - new_callable=mocker.PropertyMock, - return_value=mock_context, - ) - - with tempfile.TemporaryDirectory() as catalog_location: - with tempfile.TemporaryDirectory(dir=".") as compute_folder: - catalog_location_path = module.Path(catalog_location) - run_id = "testing" - module.Path(catalog_location_path / run_id / compute_folder).mkdir( - parents=True - ) - with open( - module.Path(catalog_location) - / run_id - / compute_folder - / "catalog_file", - "w", - ) as fw: - fw.write("hello") - - with open( - module.Path(catalog_location) / run_id / compute_folder / "not_catalog", - "w", - ) as fw: - fw.write("hello") - - catalog_handler = FileSystemCatalog(catalog_location=catalog_location) - catalog_handler.get(name="**/catalog*", run_id=run_id) - - _, _, files = next(os.walk(compute_folder)) - - assert len(list(files)) == 1 - - -def test_file_system_catalog_put_copies_files_from_compute_folder_to_catalog_if_synced_changed_all( - mocker, monkeypatch -): - monkeypatch.setattr( - module, "is_catalog_out_of_sync", mocker.MagicMock(return_value=True) - ) - mock_run_store = mocker.MagicMock() - mock_context = mocker.MagicMock() - mock_context.run_log_store = mock_run_store - - mocker.patch( - "runnable.catalog.BaseCatalog._context", - new_callable=mocker.PropertyMock, - return_value=mock_context, - ) - - with tempfile.TemporaryDirectory() as catalog_location: - with tempfile.TemporaryDirectory(dir=".") as compute_folder: - catalog_location_path = module.Path(catalog_location) - run_id = "testing" - module.Path(catalog_location_path / run_id).mkdir(parents=True) - - with open(module.Path(compute_folder) / "catalog_file", "w") as fw: - fw.write("hello") - - catalog_handler = FileSystemCatalog(catalog_location=catalog_location) - catalog_handler.put(name=str(compute_folder) + "/*", run_id=run_id) - - _, _, files = next(os.walk(catalog_location_path / run_id / compute_folder)) - - assert len(list(files)) == 1 - - -def test_file_system_catalog_put_copies_files_from_compute_folder_to_catalog_if_synced_changed_pattern( - mocker, monkeypatch -): - monkeypatch.setattr( - module, "is_catalog_out_of_sync", mocker.MagicMock(return_value=True) - ) - mock_run_store = mocker.MagicMock() - mock_context = mocker.MagicMock() - mock_context.run_log_store = mock_run_store - - mocker.patch( - "runnable.catalog.BaseCatalog._context", - new_callable=mocker.PropertyMock, - return_value=mock_context, - ) - with tempfile.TemporaryDirectory() as catalog_location: - with tempfile.TemporaryDirectory(dir=".") as compute_folder: - catalog_location_path = module.Path(catalog_location) - run_id = "testing" - module.Path(catalog_location_path / run_id).mkdir(parents=True) - with open(module.Path(compute_folder) / "catalog_file", "w") as fw: - fw.write("hello") - - with open(module.Path(compute_folder) / "not_catalog_file", "w") as fw: - fw.write("hello") - - catalog_handler = FileSystemCatalog(catalog_location=catalog_location) - - catalog_handler.put(name=str(compute_folder) + "/catalog*", run_id=run_id) - - _, _, files = next(os.walk(catalog_location_path / run_id / compute_folder)) - - assert len(list(files)) == 1 - - -def test_file_system_catalog_put_copies_files_from_compute_folder_to_catalog_if_synced_true( - mocker, monkeypatch -): - monkeypatch.setattr( - module, "is_catalog_out_of_sync", mocker.MagicMock(return_value=False) - ) - mock_run_store = mocker.MagicMock() - mock_context = mocker.MagicMock() - mock_context.run_log_store = mock_run_store - - mocker.patch( - "runnable.catalog.BaseCatalog._context", - new_callable=mocker.PropertyMock, - return_value=mock_context, - ) - - with tempfile.TemporaryDirectory() as catalog_location: - with tempfile.TemporaryDirectory(dir=".") as compute_folder: - catalog_location_path = module.Path(catalog_location) - run_id = "testing" - module.Path(catalog_location_path / run_id).mkdir(parents=True) - with open(module.Path(compute_folder) / "catalog_file", "w") as fw: - fw.write("hello") - - with open(module.Path(compute_folder) / "not_catalog_file", "w") as fw: - fw.write("hello") - - catalog_handler = FileSystemCatalog(catalog_location=catalog_location) - - catalog_handler.put(name=str(compute_folder) + "/*", run_id=run_id) - - with pytest.raises(FileNotFoundError): - _ = os.listdir(catalog_location_path / run_id / compute_folder) - assert True - - -def test_file_system_catalog_put_uses_compute_folder_by_default(monkeypatch, mocker): - mock_safe_make_dir = mocker.MagicMock() - monkeypatch.setattr(module.utils, "safe_make_dir", mock_safe_make_dir) - - mock_does_dir_exist = mocker.MagicMock(side_effect=Exception()) - monkeypatch.setattr(module.utils, "does_dir_exist", mock_does_dir_exist) - - catalog_handler = FileSystemCatalog(catalog_location="this_location") - with pytest.raises(Exception): - catalog_handler.put("testing", run_id="dummy_run_id") - - mock_does_dir_exist.assert_called_once_with(module.Path(".")) - - -def test_file_system_catalog_put_uses_compute_folder_provided(monkeypatch, mocker): - mock_safe_make_dir = mocker.MagicMock() - monkeypatch.setattr(module.utils, "safe_make_dir", mock_safe_make_dir) - - mock_does_dir_exist = mocker.MagicMock(side_effect=Exception()) - monkeypatch.setattr(module.utils, "does_dir_exist", mock_does_dir_exist) - - catalog_handler = FileSystemCatalog(catalog_location="this_location") - with pytest.raises(Exception): - catalog_handler.put( - "testing", run_id="dummy_run_id", compute_data_folder="not_data" - ) - - mock_does_dir_exist.assert_called_once_with(module.Path("not_data")) - - -def test_file_system_catalog_put_raises_exception_if_compute_data_folder_does_not_exist( - monkeypatch, mocker -): - mock_safe_make_dir = mocker.MagicMock() - monkeypatch.setattr(module.utils, "safe_make_dir", mock_safe_make_dir) - - mock_does_dir_exist = mocker.MagicMock(return_value=False) - monkeypatch.setattr(module.utils, "does_dir_exist", mock_does_dir_exist) - - catalog_handler = FileSystemCatalog(catalog_location="this_location") - with pytest.raises(Exception): - catalog_handler.put( - "testing", run_id="dummy_run_id", compute_data_folder="this_compute_folder" - ) - - -def test_file_system_catalog_put_creates_catalog_location_using_run_id( - monkeypatch, mocker -): - mock_safe_make_dir = mocker.MagicMock() - monkeypatch.setattr(module.utils, "safe_make_dir", mock_safe_make_dir) - - mock_does_dir_exist = mocker.MagicMock(side_effect=Exception()) - monkeypatch.setattr(module.utils, "does_dir_exist", mock_does_dir_exist) - - catalog_handler = FileSystemCatalog(catalog_location="this_location") - - with pytest.raises(Exception): - catalog_handler.put("testing", run_id="dummy_run_id") - - mock_safe_make_dir.assert_called_once_with( - module.Path("this_location") / "dummy_run_id" - ) - - -def test_file_system_sync_between_runs_raises_exception_if_previous_catalog_does_not_exist( - monkeypatch, mocker -): - mock_safe_make_dir = mocker.MagicMock() - monkeypatch.setattr(module.utils, "safe_make_dir", mock_safe_make_dir) - - mock_does_dir_exist = mocker.MagicMock(return_value=False) - monkeypatch.setattr(module.utils, "does_dir_exist", mock_does_dir_exist) - - catalog_handler = FileSystemCatalog(catalog_location="this_location") - with pytest.raises(Exception): - catalog_handler.sync_between_runs("previous", "current") diff --git a/tests/extensions/catalog/test_catalog_extension.py b/tests/extensions/catalog/test_catalog_extension.py deleted file mode 100644 index 685b6dba..00000000 --- a/tests/extensions/catalog/test_catalog_extension.py +++ /dev/null @@ -1,52 +0,0 @@ -from runnable.catalog import is_catalog_out_of_sync - - -def test_is_catalog_out_of_sync_returns_true_for_empty_synced_catalogs(): - assert is_catalog_out_of_sync(1, []) is True - - -def test_is_catalog_out_of_sync_returns_false_for_same_objects(): - class MockCatalog: - catalog_relative_path = None - data_hash = None - - catalog_item = MockCatalog() - catalog_item.catalog_relative_path = "path" - catalog_item.data_hash = "hash" - - synced_catalog = [catalog_item] - assert is_catalog_out_of_sync(catalog_item, synced_catalog) is False - - -def test_is_catalog_out_of_sync_returns_true_for_different_hash(): - class MockCatalog: - catalog_relative_path = None - data_hash = None - - catalog_item1 = MockCatalog() - catalog_item1.catalog_relative_path = "path" - catalog_item1.data_hash = "hash" - - catalog_item2 = MockCatalog() - catalog_item2.catalog_relative_path = "path" - catalog_item2.data_hash = "not-hash" - - synced_catalog = [catalog_item1] - assert is_catalog_out_of_sync(catalog_item2, synced_catalog) is True - - -def test_is_catalog_out_of_sync_returns_true_for_different_paths(): - class MockCatalog: - catalog_relative_path = None - data_hash = None - - catalog_item1 = MockCatalog() - catalog_item1.catalog_relative_path = "path" - catalog_item1.data_hash = "hash" - - catalog_item2 = MockCatalog() - catalog_item2.catalog_relative_path = "path1" - catalog_item2.data_hash = "hash" - - synced_catalog = [catalog_item1] - assert is_catalog_out_of_sync(catalog_item2, synced_catalog) is True diff --git a/tests/extensions/pipeline_executor/test_generic_executor.py b/tests/extensions/pipeline_executor/test_generic_executor.py index 3389f3e0..439e1304 100644 --- a/tests/extensions/pipeline_executor/test_generic_executor.py +++ b/tests/extensions/pipeline_executor/test_generic_executor.py @@ -223,13 +223,6 @@ def test_sync_catalog_returns_empty_list_if_asked_nothing_in_stage( mock_node = mock_base_node() setattr(mock_node, "_get_catalog_settings", lambda: {"get": [], "put": []}) - mock_get_effective_compute_folder = mocker.MagicMock(return_value="compute_folder") - monkeypatch.setattr( - GenericExecutor, - "get_effective_compute_data_folder", - mock_get_effective_compute_folder, - ) - test_executor = GenericExecutor() test_executor._context_node = mock_node @@ -243,13 +236,6 @@ def test_sync_catalog_calls_get_from_catalog_handler( mock_node = mock_base_node() setattr(mock_node, "_get_catalog_settings", lambda: {"get": ["me"], "put": []}) - mock_get_effective_compute_folder = mocker.MagicMock(return_value="compute_folder") - monkeypatch.setattr( - GenericExecutor, - "get_effective_compute_data_folder", - mock_get_effective_compute_folder, - ) - mock_catalog_handler_get = mocker.MagicMock() mock_catalog_handler_get.return_value = ["data_catalog"] mock_run_context.catalog_handler.get = mock_catalog_handler_get @@ -261,9 +247,7 @@ def test_sync_catalog_calls_get_from_catalog_handler( data_catalogs = test_executor._sync_catalog(stage="get") assert data_catalogs == ["data_catalog"] - mock_catalog_handler_get.assert_called_once_with( - name="me", run_id="run_id", compute_data_folder="compute_folder" - ) + mock_catalog_handler_get.assert_called_once_with(name="me") def test_sync_catalog_calls_get_from_catalog_handler_as_per_input( @@ -274,13 +258,6 @@ def test_sync_catalog_calls_get_from_catalog_handler_as_per_input( mock_node, "_get_catalog_settings", lambda: {"get": ["me", "you"], "put": []} ) - mock_get_effective_compute_folder = mocker.MagicMock(return_value="compute_folder") - monkeypatch.setattr( - GenericExecutor, - "get_effective_compute_data_folder", - mock_get_effective_compute_folder, - ) - mock_catalog_handler_get = mocker.MagicMock() mock_catalog_handler_get.return_value = ["data_catalog"] mock_run_context.catalog_handler.get = mock_catalog_handler_get @@ -301,13 +278,6 @@ def test_sync_catalog_calls_put_from_catalog_handler( mock_node = mock_base_node() setattr(mock_node, "_get_catalog_settings", lambda: {"get": [], "put": ["me"]}) - mock_get_effective_compute_folder = mocker.MagicMock(return_value="compute_folder") - monkeypatch.setattr( - GenericExecutor, - "get_effective_compute_data_folder", - mock_get_effective_compute_folder, - ) - mock_catalog_handler_put = mocker.MagicMock() mock_catalog_handler_put.return_value = ["data_catalog"] mock_run_context.catalog_handler.put = mock_catalog_handler_put @@ -319,12 +289,7 @@ def test_sync_catalog_calls_put_from_catalog_handler( data_catalogs = test_executor._sync_catalog(stage="put") assert data_catalogs == ["data_catalog"] - mock_catalog_handler_put.assert_called_once_with( - name="me", - run_id="run_id", - compute_data_folder="compute_folder", - synced_catalogs=None, - ) + mock_catalog_handler_put.assert_called_once_with(name="me") def test_sync_catalog_calls_put_from_catalog_handler_as_per_input( @@ -335,13 +300,6 @@ def test_sync_catalog_calls_put_from_catalog_handler_as_per_input( mock_node, "_get_catalog_settings", lambda: {"get": [], "put": ["me", "you"]} ) - mock_get_effective_compute_folder = mocker.MagicMock(return_value="compute_folder") - monkeypatch.setattr( - GenericExecutor, - "get_effective_compute_data_folder", - mock_get_effective_compute_folder, - ) - mock_catalog_handler_put = mocker.MagicMock() mock_catalog_handler_put.return_value = ["data_catalog"] mock_run_context.catalog_handler.put = mock_catalog_handler_put @@ -362,13 +320,6 @@ def test_sync_catalog_calls_put_sends_synced_catalogs_to_catalog_handler( mock_node = mock_base_node() setattr(mock_node, "_get_catalog_settings", lambda: {"get": [], "put": ["me"]}) - mock_get_effective_compute_folder = mocker.MagicMock(return_value="compute_folder") - monkeypatch.setattr( - GenericExecutor, - "get_effective_compute_data_folder", - mock_get_effective_compute_folder, - ) - mock_catalog_handler_put = mocker.MagicMock() mock_catalog_handler_put.return_value = ["data_catalog"] mock_run_context.catalog_handler.put = mock_catalog_handler_put @@ -380,44 +331,7 @@ def test_sync_catalog_calls_put_sends_synced_catalogs_to_catalog_handler( data_catalogs = test_executor._sync_catalog(stage="put", synced_catalogs="in_sync") assert data_catalogs == ["data_catalog"] - mock_catalog_handler_put.assert_called_once_with( - name="me", - run_id="run_id", - compute_data_folder="compute_folder", - synced_catalogs="in_sync", - ) - - -def test_get_effective_compute_data_folder_returns_default( - mocker, mock_run_context, mock_base_node -): - mock_run_context.catalog_handler.compute_data_folder = "default" - - mock_node = mock_base_node() - setattr(mock_node, "_get_catalog_settings", lambda: {}) - - test_executor = GenericExecutor() - test_executor._context_node = mock_node - - assert test_executor.get_effective_compute_data_folder() == "default" - - -def test_get_effective_compute_data_folder_returns_from_node_settings( - mocker, mock_run_context, mock_base_node -): - mock_run_context.catalog_handler.compute_data_folder = "default" - - mock_node = mock_base_node() - setattr( - mock_node, - "_get_catalog_settings", - lambda: {"compute_data_folder": "not_default"}, - ) - - test_executor = GenericExecutor() - test_executor._context_node = mock_node - - assert test_executor.get_effective_compute_data_folder() == "not_default" + mock_catalog_handler_put.assert_called_once_with(name="me") def test_step_attempt_returns_one_by_default(): diff --git a/tests/runnable/test_catalog.py b/tests/runnable/test_catalog.py index e643c209..1608c6ce 100644 --- a/tests/runnable/test_catalog.py +++ b/tests/runnable/test_catalog.py @@ -25,13 +25,13 @@ def test_base_run_log_store_context_property( def test_base_catalog_get_raises_exception(instantiable_base_class): base_catalog = catalog.BaseCatalog() with pytest.raises(NotImplementedError): - base_catalog.get(name="test", run_id="test") + base_catalog.get(name="test") def test_base_catalog_put_raises_exception(instantiable_base_class): base_catalog = catalog.BaseCatalog() with pytest.raises(NotImplementedError): - base_catalog.put(name="test", run_id="test") + base_catalog.put(name="test") def test_base_catalog_sync_between_runs_raises_exception(instantiable_base_class): @@ -48,12 +48,12 @@ def test_base_catalog_config_default_compute_folder_if_none_config( def test_do_nothing_catalog_get_returns_empty_list(monkeypatch, mocker): catalog_handler = catalog.DoNothingCatalog() - assert catalog_handler.get(name="does not matter", run_id="none") == [] + assert catalog_handler.get(name="does not matter") == [] def test_do_nothing_catalog_put_returns_empty_list(monkeypatch, mocker): catalog_handler = catalog.DoNothingCatalog() - assert catalog_handler.put(name="does not matter", run_id="none") == [] + assert catalog_handler.put(name="does not matter") == [] def test_do_nothing_catalog_sync_between_runs_does_nothing(monkeypatch, mocker): diff --git a/uv.lock b/uv.lock index dc183794..04c9640f 100644 --- a/uv.lock +++ b/uv.lock @@ -62,6 +62,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/df/73/b6e24bd22e6720ca8ee9a85a0c4a2971af8497d8f3193fa05390cbd46e09/backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8", size = 15148 }, ] +[[package]] +name = "boto3" +version = "1.36.11" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, + { name = "jmespath" }, + { name = "s3transfer" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ee/ac/2b4346474bd3ae501a2fc0e2b5b4f12f412dc89c05bf321a8108d3a95b5c/boto3-1.36.11.tar.gz", hash = "sha256:b40fbf2c0f22e55b67df95475a68bb72be5169097180a875726b6b884339ac8b", size = 111010 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4e/a5/e6f90b0a768560a0c44cb9076a313ee6d669ec98fd2747a8451832403ffe/boto3-1.36.11-py3-none-any.whl", hash = "sha256:641dd772eac111d9443258f0f5491c57c2af47bddae94a8d32de19edb5bf7b1c", size = 139177 }, +] + +[[package]] +name = "botocore" +version = "1.36.11" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "jmespath" }, + { name = "python-dateutil" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a5/0f/6b92050154ad0e286b82ca36de5f87a466723e1cdc525df53270bcc36f60/botocore-1.36.11.tar.gz", hash = "sha256:c919be883f95b9e0c3021429a365d40cd7944b8345a07af30dc8d891ceefe07a", size = 13497505 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2c/ce/e97be00389d51a010c0680ea688a073737ca3b2de6f924800fc61bf68e41/botocore-1.36.11-py3-none-any.whl", hash = "sha256:82c5660027f696608d0e55feb08c146c11c7ebeba7615961c7765dcf6009a00d", size = 13327743 }, +] + [[package]] name = "cachetools" version = "5.5.0" @@ -268,6 +296,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/6e/b64600156934dab14cc8b403095a9ea8bd722aad2e775673c68346b76220/cloudpathlib-0.20.0-py3-none-any.whl", hash = "sha256:7af3bcefbf73392ae7f31c08b3660ec31607f8c01b7f6262d4d73469a845f641", size = 52547 }, ] +[package.optional-dependencies] +s3 = [ + { name = "boto3" }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -619,6 +652,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/31/80/3a54838c3fb461f6fec263ebf3a3a41771bd05190238de3486aae8540c36/jinja2-3.1.4-py3-none-any.whl", hash = "sha256:bc5dd2abb727a5319567b7a813e6a2e7318c39f4f487cfe6c89c6f9c7d25197d", size = 133271 }, ] +[[package]] +name = "jmespath" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256 }, +] + [[package]] name = "job-executor" version = "0.0.0" @@ -2124,6 +2166,9 @@ k8s = [ notebook = [ { name = "ploomber-engine" }, ] +s3 = [ + { name = "cloudpathlib", extra = ["s3"] }, +] [package.dev-dependencies] dev = [ @@ -2150,6 +2195,7 @@ requires-dist = [ { name = "click", specifier = "<=8.1.3" }, { name = "click-plugins", specifier = ">=1.1.1" }, { name = "cloudpathlib", specifier = ">=0.20.0" }, + { name = "cloudpathlib", extras = ["s3"], marker = "extra == 's3'" }, { name = "dill", specifier = ">=0.3.9" }, { name = "docker", marker = "extra == 'docker'", specifier = ">=7.1.0" }, { name = "kubernetes", marker = "extra == 'k8s'", specifier = ">=31.0.0" }, @@ -2182,6 +2228,18 @@ docs = [ ] release = [{ name = "python-semantic-release", specifier = ">=9.15.2" }] +[[package]] +name = "s3transfer" +version = "0.11.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/62/45/2323b5928f86fd29f9afdcef4659f68fa73eaa5356912b774227f5cf46b5/s3transfer-0.11.2.tar.gz", hash = "sha256:3b39185cb72f5acc77db1a58b6e25b977f28d20496b6e58d6813d75f464d632f", size = 147885 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1b/ac/e7dc469e49048dc57f62e0c555d2ee3117fa30813d2a1a2962cce3a2a82a/s3transfer-0.11.2-py3-none-any.whl", hash = "sha256:be6ecb39fadd986ef1701097771f87e4d2f821f27f6071c872143884d2950fbc", size = 84151 }, +] + [[package]] name = "secrets" version = "0.0.0" From c6a03564279715f6995ee8cfece7ada4e3abb201 Mon Sep 17 00:00:00 2001 From: "Vammi, Vijay" Date: Sun, 2 Feb 2025 22:37:28 +0000 Subject: [PATCH 3/5] feat: Working minio client --- extensions/catalog/any_path.py | 14 ++++--- extensions/catalog/file_system.py | 2 +- extensions/catalog/minio.py | 67 +++++++++++++++++++++++++++---- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/extensions/catalog/any_path.py b/extensions/catalog/any_path.py index bbbcd6a1..d7585ca2 100644 --- a/extensions/catalog/any_path.py +++ b/extensions/catalog/any_path.py @@ -40,7 +40,12 @@ def upload_to_catalog(self, file: Path) -> None: ... def download_from_catalog(self, file: Path | CloudPath) -> None: ... @abstractmethod - def get_catalog_location(self) -> Path | CloudPath: ... + def get_catalog_location(self) -> Path | CloudPath: + """ + For local file systems, this is the .catalog/run_id/compute_data_folder + For cloud systems, this is s3://bucket/run_id/compute_data_folder + """ + ... def get(self, name: str) -> List[DataCatalog]: """ @@ -75,16 +80,15 @@ def get(self, name: str) -> List[DataCatalog]: if str(file).endswith(".execution.log"): continue + self.download_from_catalog(file) relative_file_path = file.relative_to(run_catalog) # type: ignore data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) data_catalog.catalog_relative_path = str(relative_file_path) - data_catalog.data_hash = utils.get_data_hash(str(file)) + data_catalog.data_hash = utils.get_data_hash(str(relative_file_path)) data_catalog.stage = "get" data_catalogs.append(data_catalog) - self.download_from_catalog(file) - if not data_catalogs: raise Exception(f"Did not find any files matching {name} in {run_catalog}") @@ -137,7 +141,7 @@ def put(self, name: str) -> List[DataCatalog]: # Need not add a data catalog for the folder continue - relative_file_path = file.relative_to(".") + relative_file_path = file.relative_to(copy_from) data_catalog = run_log_store.create_data_catalog(str(relative_file_path)) data_catalog.catalog_relative_path = ( diff --git a/extensions/catalog/file_system.py b/extensions/catalog/file_system.py index 8a4ecd41..33a0aa68 100644 --- a/extensions/catalog/file_system.py +++ b/extensions/catalog/file_system.py @@ -46,7 +46,7 @@ def upload_to_catalog(self, file: Path) -> None: f"Copying objects from {self.compute_data_folder} to the run catalog location of {run_catalog}" ) - relative_file_path = file.relative_to(".") + relative_file_path = file.relative_to(self.compute_data_folder) (run_catalog / relative_file_path.parent).mkdir(parents=True, exist_ok=True) shutil.copy(file, run_catalog / relative_file_path) diff --git a/extensions/catalog/minio.py b/extensions/catalog/minio.py index 3a3bced6..7e4e30c2 100644 --- a/extensions/catalog/minio.py +++ b/extensions/catalog/minio.py @@ -1,17 +1,68 @@ -from cloudpathlib import S3Client, S3Path +import logging +from functools import lru_cache +from pathlib import Path +from typing import Any + +from cloudpathlib import CloudPath, S3Client, S3Path from extensions.catalog.any_path import AnyPathCatalog +from runnable import defaults + +logger = logging.getLogger(defaults.LOGGER_NAME) + + +@lru_cache +def get_minio_client( + endpoint_url: str, aws_access_key_id: str, aws_secret_access_key: str +) -> S3Client: + return S3Client( + endpoint_url=endpoint_url, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) class MinioCatalog(AnyPathCatalog): service_name: str = "minio" + + endpoint_url: str = "http://localhost:9002" + aws_access_key_id: str = "minioadmin" + aws_secret_access_key: str = "minioadmin" bucket: str = "runnable" - def get_path(self, path: str) -> S3Path: - # TODO: Might need to assert the credentials are set - client = S3Client( - endpoint_url="http://localhost:9002", - aws_access_key_id="minioadmin", - aws_secret_access_key="minioadmin", + def get_summary(self) -> dict[str, Any]: + return { + "service_name": self.service_name, + "compute_data_folder": self.compute_data_folder, + "endpoint_url": self.endpoint_url, + "bucket": self.bucket, + } + + def get_catalog_location(self) -> S3Path: + run_id = self._context.run_id + + return S3Path( + f"s3://{self.bucket}/{run_id}/{self.compute_data_folder}".strip("."), + client=get_minio_client( + self.endpoint_url, self.aws_access_key_id, self.aws_secret_access_key + ), ) - return client.CloudPath(f"s3://{self.bucket}/{path}") + + def download_from_catalog(self, file: Path | CloudPath) -> None: + assert isinstance(file, S3Path) + + relative_file_path = file.relative_to(self.get_catalog_location()) + + file_to_download = Path(self.compute_data_folder) / relative_file_path + file_to_download.parent.mkdir(parents=True, exist_ok=True) + + file.download_to(file_to_download) + + def upload_to_catalog(self, file: Path) -> None: + run_catalog = self.get_catalog_location() + + relative_file_path = file.relative_to(self.compute_data_folder) + (run_catalog / relative_file_path.parent).mkdir(parents=True, exist_ok=True) + + file_in_cloud = run_catalog / file + file_in_cloud.upload_from(file) From 5eef5f4a201cca8ebb2b78ca889dd7f5050c3db6 Mon Sep 17 00:00:00 2001 From: "Vammi, Vijay" Date: Sun, 2 Feb 2025 22:41:07 +0000 Subject: [PATCH 4/5] feat: Working minio client --- extensions/catalog/minio.py | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions/catalog/minio.py b/extensions/catalog/minio.py index 7e4e30c2..1123f314 100644 --- a/extensions/catalog/minio.py +++ b/extensions/catalog/minio.py @@ -65,4 +65,5 @@ def upload_to_catalog(self, file: Path) -> None: (run_catalog / relative_file_path.parent).mkdir(parents=True, exist_ok=True) file_in_cloud = run_catalog / file + assert isinstance(file_in_cloud, S3Path) file_in_cloud.upload_from(file) From 02b6595225d2b0bd1bf9a0e3e8bf62b7b28ad20c Mon Sep 17 00:00:00 2001 From: "Vammi, Vijay" Date: Sun, 2 Feb 2025 22:43:31 +0000 Subject: [PATCH 5/5] fix: adding minio config.yaml --- examples/configs/minio.yaml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 examples/configs/minio.yaml diff --git a/examples/configs/minio.yaml b/examples/configs/minio.yaml new file mode 100644 index 00000000..5c52d760 --- /dev/null +++ b/examples/configs/minio.yaml @@ -0,0 +1,2 @@ +catalog: + type: minio