Skip to content

Commit

Permalink
feature: enable automatically syncing project files with databricks (#44
Browse files Browse the repository at this point in the history
)
  • Loading branch information
steven-mi authored Aug 9, 2023
1 parent 2c317a3 commit dc87e2a
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 78 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog db-rocket

## Version 1.3.0

- Remove `rocket trigger` CLI
- Add synchronization of project files to databricks file system
- Replace `print` statements with `logger.info`
- Replace running watch in shell with python code

## Version 1.2.0

- Fix security issue with command injection, changes the behaviour of the watch command.
Expand Down Expand Up @@ -46,5 +53,4 @@

## Version 1.0.2

feature: Add support for poetry projects
test: Add test for dbrocket build process
feature: Add support for poetry projects test: Add test for dbrocket build process
Binary file removed logos/WhatsApp Image 2021-05-11 at 14.29.21 (2).xcf
Binary file not shown.
3 changes: 3 additions & 0 deletions rocket/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ def configure_logger() -> logging.Logger:
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.INFO)
return logger


logger = configure_logger()
153 changes: 78 additions & 75 deletions rocket/rocket.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import subprocess
import time
from typing import Optional

import fire
from watchdog.observers import Observer

from rocket.logger import configure_logger

logger = configure_logger()
from rocket.logger import logger
from rocket.utils import execute_shell_command, extract_project_name_from_wheel, \
extract_python_package_dirs, extract_python_files_from_folder, execute_for_each_multithreaded
from rocket.watcher import Watcher


def _add_index_urls_to_cmd(cmd, index_urls):
Expand All @@ -29,7 +31,7 @@ def setup(self):
Initialize the application.
"""
if os.path.exists("setup.py") or os.path.exists(f"pyproject.toml"):
print("Packaing file already exists so no need to create a new one")
logger.info("Packaing file already exists so no need to create a new one")
return

content = """
Expand All @@ -48,20 +50,20 @@ def setup(self):

with open("setup.py", "a") as myfile:
myfile.write(content)

print("Setup.py file created, feel free to modify it with your needs.")
logger.info("Setup.py file created, feel free to modify it with your needs.")

def launch(
self,
project_location: str = ".",
dbfs_path: Optional[str] = None,
watch=True,
disable_watch=False,
_deploy=True,
):
"""
Entrypoint of the application, triggers a build and deploy
:param project_location:
:param dbfs_folder: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject
:param watch: Set to false if you don't want to automatically sync your files
:return:
"""

Expand All @@ -77,80 +79,85 @@ def launch(

self.dbfs_folder = dbfs_path + project_directory

if watch and not disable_watch:
# first time build and then watch so we have an immediate build
self._build_and_deploy()
return self._watch()
else:
logger.debug("Watch disabled")

return self._build_and_deploy()

def _build_and_deploy(self):
if _deploy:
self._build_and_deploy(watch)

if watch:
observer = Observer()
watcher = Watcher(observer)
observer.schedule(watcher, project_location, recursive=True)
observer.start()
try:
time.sleep(2)
finally:
observer.stop()
observer.join()
if watcher.modified_files:
self._deploy(watch=watch, modified_files=watcher.modified_files)
return self.launch(project_location=project_location, dbfs_path=dbfs_path, watch=True, _deploy=False)

def _build_and_deploy(self, watch, modified_files=None):
self._build()
result = self._deploy()
result = self._deploy(watch=watch, modified_files=modified_files)
return result

def _watch(self) -> None:
"""
Listen to filesystem changes to trigger again
"""
command = 'rocket trigger --disable_watch=True'

cmd = f"""watchmedo \
shell-command \
--patterns='*.py' \
--wait --drop \
--interval {self._interval_repeat_watch} \
--debug-force-polling \
--ignore-directories \
--ignore-pattern '*.pyc;*dist*;\..*;*egg-info' \
--recursive \
--command='{command}'
"""
logger.debug(f"watch command: {cmd}")
os.system(cmd)

def trigger(
self,
project_location: str = ".",
dbfs_path: Optional[str] = None,
watch=True,
disable_watch=False,
):
"""
Entrypoint of the application, triggers a build and deploy
:param project_location:
:param dbfs_folder: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject
:return:
"""
# use launch rather than trigger
self.launch(project_location=project_location, dbfs_path=dbfs_path, watch=watch, disable_watch=disable_watch)

def _deploy(self):
def _deploy(self, watch, modified_files):
"""
Copies the built library to dbfs
"""

try:
self._shell(
f"databricks fs cp --overwrite {self.wheel_path} {self.dbfs_folder}/{self.wheel_file}"
)
if modified_files:
logger.info(f"Found changes in {modified_files}. Overwriting them.")
for file in modified_files:
logger.info(f"Sync {file}")
execute_shell_command(
f"databricks fs cp --recursive --overwrite {file} {self.dbfs_folder}/{os.path.relpath(file, self.project_location)}"
)
else:
execute_shell_command(
f"databricks fs cp --overwrite {self.wheel_path} {self.dbfs_folder}/{self.wheel_file}"
)
if watch:
package_dirs = extract_python_package_dirs(self.project_location)
for package_dir in package_dirs:
python_files = extract_python_files_from_folder(package_dir)

def helper(file):
execute_shell_command(
f"databricks fs cp --recursive --overwrite {file} {self.dbfs_folder}/{os.path.relpath(file, self.project_location)}"
)
logger.info(f"Sync {file}")
execute_for_each_multithreaded(python_files, helper)
except Exception as e:
raise Exception(
f"Error while copying files to databricks, is your databricks token set and valid? Try to generate a new token and update existing one with `databricks configure --token`. Error details: {e}"
)

install_cmd = f'{self.dbfs_folder.replace("dbfs:/", "/dbfs/")}/{self.wheel_file}'
base_path = self.dbfs_folder.replace("dbfs:/", "/dbfs/")
install_cmd = f'{base_path}/{self.wheel_file}'
install_cmd = _add_index_urls_to_cmd(install_cmd, self.index_urls)
project_name = extract_project_name_from_wheel(self.wheel_file)

print(
f"""Done! in your notebook install the library by running:
if modified_files:
logger.info("Changes are applied")
elif watch:
logger.info(
f"""You have watch activated. Your project will be automatically synchronised with databricks. Add following in one cell:
%pip install --upgrade pip
%pip install {install_cmd} --force-reinstall
"""
)
%pip uninstall -y {project_name}
and then in new Python cell:
%load_ext autoreload
%autoreload 2
import sys
import os
sys.path.append(os.path.abspath('{base_path}')""")
else:
logger.info(f"""Install your library in your databricks notebook by running:
%pip install --upgrade pip
%pip install {install_cmd} --force-reinstall""")

def _build(self):
"""
Expand All @@ -160,11 +167,11 @@ def _build(self):

# cleans up dist folder from previous build
dist_location = f"{self.project_location}/dist"
self._shell(f"rm {dist_location}/* 2>/dev/null || true")
execute_shell_command(f"rm {dist_location}/* 2>/dev/null || true")

if os.path.exists(f"{self.project_location}/setup.py"):
logger.info("Found setup.py. Building python library")
self._shell(
execute_shell_command(
f"cd {self.project_location} ; {self._python_executable} -m build --outdir {dist_location} 2>/dev/null"
)
self.index_urls = []
Expand All @@ -174,25 +181,21 @@ def _build(self):

elif os.path.exists(f"{self.project_location}/pyproject.toml"):
logger.info("Found pyproject.toml. Building python library with poetry")
self._shell(f"cd {self.project_location} ; poetry build --format wheel")
requirements = self._shell(
execute_shell_command(f"cd {self.project_location} ; poetry build --format wheel")
requirements = execute_shell_command(
f"cd {self.project_location} ; poetry export --with-credentials --without-hashes")
self.index_urls = [line.strip() for line in requirements.split("\n") if "index-url" in line]
else:
raise Exception(
"To be turned into a library your project has to contain a setup.py or pyproject.toml file"
)

self.wheel_file = self._shell(
self.wheel_file = execute_shell_command(
f"cd {dist_location}; ls *.whl 2>/dev/null | head -n 1"
).replace("\n", "")
self.wheel_path = f"{dist_location}/{self.wheel_file}"
logger.debug(f"Build Successful. Wheel: '{self.wheel_path}' ")

@staticmethod
def _shell(cmd) -> str:
logger.debug(f"Running shell command: {cmd} ")
return subprocess.check_output(cmd, shell=True).decode("utf-8")

def main():
fire.Fire(Rocket)
55 changes: 55 additions & 0 deletions rocket/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import concurrent.futures
import os
import subprocess

from rocket.logger import logger


def execute_for_each_multithreaded(lst, func, max_threads=None):
"""
Execute a given function for each entry in the list using multiple threads.
Parameters:
- lst: List of items to process
- func: Function to apply to each item
- max_threads: Maximum number of threads to use (default is None, which means as many as items in the list)
Returns:
- List of results after applying the function
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
return list(executor.map(func, lst))


def extract_package_name_from_wheel(wheel_filename):
# Split the filename on '-' and take the first part
return wheel_filename.split('-')[0]


def extract_project_name_from_wheel(wheel_filename):
return extract_package_name_from_wheel(wheel_filename).replace("_", "-")


def extract_python_package_dirs(root_dir):
packages = []
for item in os.listdir(root_dir):
item_path = os.path.join(root_dir, item)
if os.path.isdir(item_path) and '__init__.py' in os.listdir(item_path):
packages.append(item_path)
return packages


def execute_shell_command(cmd) -> str:
logger.debug(f"Running shell command: {cmd} ")
return subprocess.check_output(cmd, shell=True).decode("utf-8")


def extract_python_files_from_folder(path):
py_files = []

for root, dirs, files in os.walk(path):
for file in files:
if file.endswith('.py'):
py_files.append(os.path.join(root, file))

return py_files
17 changes: 17 additions & 0 deletions rocket/watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os

from watchdog.events import FileSystemEventHandler


class Watcher(FileSystemEventHandler):

def __init__(self, observer):
self.modified_files = []
self.observer = observer

def on_modified(self, event):
if event.is_directory:
return
if os.path.splitext(event.src_path)[1] == '.py':
self.modified_files.append(event.src_path)
self.observer.stop()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setuptools.setup(
name="databricks-rocket",
version="1.2.0",
version="1.3.0",
author="GetYourGuide",
author_email="[email protected]",
description="Keep your local python scripts installed and in sync with a databricks notebook. Shortens the feedback loop to develop projects using a hybrid enviroment",
Expand Down

0 comments on commit dc87e2a

Please sign in to comment.