diff --git a/CHANGELOG.md b/CHANGELOG.md index cbc8bae..9474c20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog db-rocket +## Version 2.2.0 +- Add `use_volumes` and `dst_path` arguments to support uploading to Unity Catalog Volumes. + ## Version 2.1.0 - New paramter for ``rocket launch --glob_path=<...>``, which allows to specify a list of globs for files to deploy during launch. diff --git a/README.md b/README.md index 680c3d9..10358f6 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,7 @@ stevenmi@MacBook db-rocket % rocket launch --watch=False - Databricks: >=7 - Python: >=3.7 - Tested on Platform: Linux, MacOs. Windows will probably not work but contributions are welcomed! +- Supports uploading to Unity Catalog Volumes starting from version 3.0.0. Note that the underlying dependency, `databricks-sdk`, is still in beta. We do not recommend using UC Volumes in production. ## Acknowledgments diff --git a/requirements.txt b/requirements.txt index e05293c..4c6ff7f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ poetry mypy SecretStorage readme-renderer -twine \ No newline at end of file +twine +databricks-sdk==0.33.0 diff --git a/rocket/rocket.py b/rocket/rocket.py index 7f2edd4..6a768fd 100644 --- a/rocket/rocket.py +++ b/rocket/rocket.py @@ -1,9 +1,9 @@ import os -import glob from typing import Optional, List, Union import fire +from databricks.sdk import WorkspaceClient from rocket.file_watcher import FileWatcher from rocket.logger import logger from rocket.utils import ( @@ -54,34 +54,48 @@ def launch( project_location: str = ".", dbfs_path: Optional[str] = None, watch: bool = True, - glob_path: Optional[Union[str, List[str]]] = None + glob_path: Optional[Union[str, List[str]]] = None, + use_volumes: Optional[bool] = False, + dst_path: Optional[str] = None, ) -> None: """ Entrypoint of the application, triggers a build and deploy :param project_location: path to project code, default: `"."` - :param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject + :param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject. Only support dbfs path. :param watch: Set to false if you don't want to automatically sync your files :param glob_path: glob string or list of strings for additional files to deploy, e.g. "*.json" + :param use_volumes: upload files to unity catalog volumes. + :param dst_path: Destination path to store the files. Support both dbfs:/ and /Volumes. Ideally, we should use dst_path and deprecate dbfs_path. :return: """ if os.getenv("DATABRICKS_TOKEN") is None: raise Exception("DATABRICKS_TOKEN must be set for db-rocket to work") - if dbfs_path is not None and not dbfs_path.startswith("dbfs:/"): - raise Exception("`dbfs_path` must start with dbfs:/") - - try: - execute_shell_command(f"databricks fs ls dbfs:/") - except Exception as e: - raise Exception( - f"Error accessing DBFS via databricks-cli. Please check if your databricks token is set and valid? Try to generate a new token and update existing one with `databricks configure --token`. Error details: {e}" - ) + base_dbfs_access_error_message = ("Please check if your databricks token is set and valid? " + "Try to generate a new token and update existing one with " + "`databricks configure --token`.") + if use_volumes: + try: + workspace_client = WorkspaceClient() + workspace_client.dbutils.fs.ls("dbfs:/") + except Exception as e: + raise Exception( + f"Could not access dbfs using databricks SDK. {base_dbfs_access_error_message} Error details: {e}" + ) + db_path = self.get_volumes_path(dst_path) + else: + try: + execute_shell_command(f"databricks fs ls dbfs:/") + except Exception as e: + raise Exception( + f"Error accessing DBFS via databricks-cli. {base_dbfs_access_error_message} Error details: {e}" + ) + path_to_use = dst_path if dst_path else dbfs_path + db_path = self.get_dbfs_path(path_to_use) - if not dbfs_path: - dbfs_path = f"dbfs:/temp/{os.environ['USER']}" if watch: project_name = os.path.abspath(project_location).split("/")[-1] - dbfs_path = f"{dbfs_path}/{project_name}" + db_path = f"{db_path}/{project_name}" glob_paths = [] if isinstance(glob_path, str): @@ -89,14 +103,14 @@ def launch( elif isinstance(glob_path, list): glob_paths = [os.path.join(project_location, path) for path in glob_path] - self._build_and_deploy(watch=watch, project_location=project_location, dbfs_path=dbfs_path, glob_paths=glob_paths) + self._build_and_deploy(watch=watch, project_location=project_location, db_path=db_path, glob_paths=glob_paths) if watch: watcher = FileWatcher( project_location, lambda x: self._build_and_deploy( watch=watch, modified_files=watcher.modified_files, - dbfs_path=dbfs_path, + db_path=db_path, project_location=project_location, glob_paths=glob_path ), @@ -108,7 +122,7 @@ def _build_and_deploy( self, watch: bool, project_location: str, - dbfs_path: str, + db_path: str, modified_files: Optional[List[str]] = None, glob_paths: Optional[List[str]] = None ) -> None: @@ -116,7 +130,7 @@ def _build_and_deploy( logger.info(f"Found changes in {modified_files}. Overwriting them.") self._deploy( file_paths=modified_files, - dbfs_path=dbfs_path, + db_path=db_path, project_location=project_location, ) return @@ -128,10 +142,10 @@ def _build_and_deploy( wheel_path, wheel_file = self._create_python_project_wheel(project_location) self._deploy( file_paths=[wheel_path], - dbfs_path=dbfs_path, + db_path=db_path, project_location=os.path.dirname(wheel_path), ) - install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}/{wheel_file}' + install_path = f"{self.get_install_path(db_path)}/{wheel_file}" dependency_files = ["requirements.in", "requirements.txt"] index_urls = [] @@ -183,10 +197,10 @@ def _build_and_deploy( line.strip() for line in f.readlines() if "index-url" in line ] self._deploy( - file_paths=list(files), dbfs_path=dbfs_path, project_location=project_location + file_paths=list(files), db_path=db_path, project_location=project_location ) - install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}' + install_path = self.get_install_path(db_path) index_urls_options = " ".join(index_urls) if dependency_file_exist: @@ -215,16 +229,54 @@ def _build_and_deploy( def _deploy( self, file_paths: List[str], - dbfs_path: str, + db_path: str, project_location: str ) -> None: + if self.is_dbfs(db_path): + self._deploy_dbfs(file_paths, db_path, project_location) + else: + w = WorkspaceClient() + self._deploy_volumes(file_paths, db_path, project_location, w) + + def _deploy_dbfs( + self, + file_paths: List[str], + db_path: str, + project_location: str + ): def helper(file: str) -> None: - target_path = f"{dbfs_path}/{os.path.relpath(file, project_location)}" + target_path = f"{db_path}/{os.path.relpath(file, project_location)}" execute_shell_command(f"databricks fs cp --recursive --overwrite {file} {target_path}") logger.info(f"Uploaded {file} to {target_path}") execute_for_each_multithreaded(file_paths, lambda x: helper(x)) + def _deploy_volumes( + self, + file_paths: List[str], + db_path: str, + project_location: str, + workspace_client + ): + def helper(wc, file: str) -> None: + # sdk asks an absolute path + if not os.path.isabs(file): + cwd = os.getcwd() + file = f"{cwd}/{file}" + target_path = f"{db_path}/{os.path.relpath(file, project_location)}" + # if the file already exists, sdk returns error message: The file being created already exists. + # a feature request is already here: https://github.com/databricks/databricks-sdk-py/issues/548 + try: + wc.dbutils.fs.rm(target_path) + except Exception: + pass + # sdk uses urllibs3 to parse paths. + # It need to be file:// to be recognized as a local file. Otherwise it raises file not exist error + wc.dbutils.fs.cp(f"file://{file}", target_path) + logger.info(f"Uploaded {file} to {target_path}") + + execute_for_each_multithreaded(file_paths, lambda x: helper(workspace_client, x)) + def _create_python_project_wheel(self, project_location: str) -> (str, str): dist_location = f"{project_location}/dist" execute_shell_command(f"rm {dist_location}/* 2>/dev/null || true") @@ -250,6 +302,26 @@ def _create_python_project_wheel(self, project_location: str) -> (str, str): wheel_path = f"{dist_location}/{wheel_file}" return wheel_path, wheel_file + def get_dbfs_path(self, path: Optional[str]) -> str: + if path: + logger.warning("The `dbfs_path` parameter is planned for deprecation. Please use the `dst_path` parameter instead.") + if not self.is_dbfs(path): + raise Exception("`dbfs_path` must start with dbfs:/") + return path or f"dbfs:/temp/{os.environ['USER']}" + + def get_volumes_path(self, path: Optional[str]) -> str: + if path and not path.startswith("/Volumes"): + raise Exception("`use_volumes` is true. `dst_path` must start with /Volumes") + return path or f"/Volumes/main/data_products/volume/db_rocket/{os.environ['USER']}" + + def get_install_path(self, db_path): + if self.is_dbfs(db_path): + return f'{db_path.replace("dbfs:/", "/dbfs/")}' + return db_path + + def is_dbfs(self, db_path: str): + return db_path.startswith("dbfs:/") + def main(): fire.Fire(Rocket) diff --git a/setup.py b/setup.py index 9edb298..91f8c62 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setuptools.setup( name="databricks-rocket", - version="2.1.0", + version="3.0.0", author="GetYourGuide", author_email="engineering.data-products@getyourguide.com", description="Keep your local python scripts installed and in sync with a databricks notebook. Shortens the feedback loop to develop projects using a hybrid enviroment", @@ -17,7 +17,7 @@ long_description_content_type="text/markdown", url="https://github.com/getyourguide/db-rocket", packages=setuptools.find_packages(), - install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli"], + install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli", "databricks-sdk==0.33.0"], entry_points={ "console_scripts": ["rocket=rocket.rocket:main", "dbrocket=rocket.rocket:main"] },