Skip to content

Adds Databricks backend #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 67 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
5bcb0a4
Initial DB framework, ls works (no details)
edgararuiz Mar 29, 2025
3e8768c
Switches to using folder_url instead of path
edgararuiz Mar 29, 2025
e82efd4
pin_exists() works now
edgararuiz Mar 29, 2025
38238eb
pin_versions() works
edgararuiz Mar 29, 2025
e6e950e
pin_open() works
edgararuiz Mar 31, 2025
477e6d0
Makes _list_folders into _list_items
edgararuiz Mar 31, 2025
80570d9
Adds mkdir & put, gets pin_write() working
edgararuiz Mar 31, 2025
e3e9f54
Formatting improvements
edgararuiz Mar 31, 2025
4a0dac5
Removes conditional to use a custom board object
edgararuiz Mar 31, 2025
0982eb4
Improvements to file reading, and adds initial rm
edgararuiz Mar 31, 2025
cd8e032
pin_delete() works, new 'exists' aproach
edgararuiz Mar 31, 2025
7092718
Adds some test pieces
edgararuiz Apr 1, 2025
4e7549a
Partially runs tests
edgararuiz Apr 2, 2025
409d89c
Adds support for `detail` in ls
edgararuiz Apr 2, 2025
b1e629a
Figures out how to properly cache the fs
edgararuiz Apr 2, 2025
66bd1c4
Fixes teardown for tests
edgararuiz Apr 2, 2025
263dded
extends `rm` one more level (fix later)
edgararuiz Apr 2, 2025
788ec93
No errors in tests, moving on to addressing failures
edgararuiz Apr 3, 2025
794b22a
Adds recursive file/folder mapper
edgararuiz Apr 3, 2025
20e481d
fully recursive put (clean later)
edgararuiz Apr 3, 2025
0209d28
Improvements to _map_put
edgararuiz Apr 3, 2025
80cd4da
Starts moving to discrete functions
edgararuiz Apr 4, 2025
3e96d19
Finishes moving everything to discrete functions
edgararuiz Apr 4, 2025
00ebe78
Fixes typo
edgararuiz Apr 4, 2025
6f65e9d
Creates custom cache class, fixes issue with reading wrong pin
edgararuiz Apr 4, 2025
2f400d1
Removes _open from PinsDBCache after confirming that it's not needed
edgararuiz Apr 6, 2025
7171510
Removes data.csv file
edgararuiz Apr 6, 2025
50da1c9
Adds get() and _databricks_get()
edgararuiz Apr 7, 2025
228d6f1
Removes test data
edgararuiz Apr 7, 2025
bf317f0
Fixes issue with not handling board versioning for test boards
edgararuiz Apr 7, 2025
9c3d7dd
Passes constructor test
edgararuiz Apr 8, 2025
df63ef5
Removes PinsDBCache and manual registration of dbc
edgararuiz Apr 8, 2025
9e286ea
Emulates the structure of the other test boards
edgararuiz Apr 10, 2025
98dd383
Adds notes to the function, cleans up arguments
edgararuiz Apr 10, 2025
9c96fb2
Restores BoardRsConnect to helpers
edgararuiz Apr 10, 2025
b672620
Adds full BoardRSconnect call
edgararuiz Apr 10, 2025
b5f375b
Gets fs via calling the function for the tests
edgararuiz Apr 10, 2025
db8cb82
Removes Databricks from CI tests
edgararuiz Apr 10, 2025
2bc220a
Adds databricks-sdk to requirements dev file
edgararuiz Apr 10, 2025
c59e074
Updates rest of dev reqs versions
edgararuiz Apr 10, 2025
9ca444d
Attempts to avoid double forward slash in Linux
edgararuiz Apr 11, 2025
06fb022
Checks if path_to_version is a str before removing trailing slash
edgararuiz Apr 11, 2025
b5a817a
Fixes typo on isinstance call
edgararuiz Apr 11, 2025
2dbe9be
Removes protocol assignment for DatabricksFs
edgararuiz Apr 11, 2025
7e70f16
Passes pre-commit
edgararuiz Apr 11, 2025
1361d27
Adds databricks-sdk to minimum reqs
edgararuiz Apr 11, 2025
7a21bd6
Addresses additional issue from precommit and attempts to solve pyrig…
edgararuiz Apr 11, 2025
9d7f9e4
Update pins/databricks/fs.py
edgararuiz Apr 28, 2025
f70b0d9
Update requirements/minimum.txt
edgararuiz Apr 28, 2025
4a27260
Update pins/databricks/fs.py
edgararuiz Apr 28, 2025
b1f9aec
Update pins/databricks/fs.py
edgararuiz Apr 28, 2025
400a743
Switches from os to pathlib
edgararuiz Apr 28, 2025
641c3ce
Converts functions to staticmethods
edgararuiz Apr 28, 2025
7748204
Merge branch 'main' into databricks
isabelizimm Jun 2, 2025
12310e0
add docs for board_databricks
isabelizimm Jun 2, 2025
6271cb8
load in quartodoc
isabelizimm Jun 2, 2025
d93a77d
update all tests
isabelizimm Jun 3, 2025
b688bf4
clean up class, make true optional import
isabelizimm Jun 3, 2025
564f9ed
run dbc tests
isabelizimm Jun 3, 2025
f7e1aaf
add databricks into pyright deps
isabelizimm Jun 3, 2025
6e766a7
load databricks creds
isabelizimm Jun 3, 2025
4f559a5
error earlier
isabelizimm Jun 3, 2025
0c45eea
pass for windows cache
isabelizimm Jun 3, 2025
5d20577
Merge branch 'main' into databricks
isabelizimm Jun 4, 2025
6d07e6a
resolve dependencies
isabelizimm Jun 4, 2025
220f86d
resolve dependencies again
isabelizimm Jun 4, 2025
95e7873
cannot clean up constructors, skip
isabelizimm Jun 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ RSC_LICENSE=
# (Note that the local file backend always uses a temporary directory.)
#
# PINS_TEST_S3__PATH="ci-pins"

# Databricks backend ----
DATABRICKS_HOST=
DATABRICKS_TOKEN=
DATABRICKS_VOLUME=
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
AWS_REGION: "us-east-1"
AZURE_STORAGE_ACCOUNT_NAME: ${{ secrets.AZURE_STORAGE_ACCOUNT_NAME }}
AZURE_STORAGE_ACCOUNT_KEY: ${{ secrets.AZURE_STORAGE_ACCOUNT_KEY }}
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
PYTEST_OPTS: ${{ matrix.pytest_opts }}
REQUIREMENTS: ${{ matrix.requirements }}
ACTION_OS: ${{ matrix.os }}
Expand Down
3 changes: 3 additions & 0 deletions docs/_quarto.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ website:
href: reference/board_gcs.qmd
- text: "`board_azure`"
href: reference/board_azure.qmd
- text: "`board_databricks`"
href: reference/board_databricks.qmd
- text: "`board_connect`"
href: reference/board_connect.qmd
- text: "`board_url`"
Expand Down Expand Up @@ -99,6 +101,7 @@ quartodoc:
- board_s3
- board_gcs
- board_azure
- board_databricks
- board_connect
- board_url
- board
Expand Down
1 change: 1 addition & 0 deletions pins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
board_azure,
board_s3,
board_gcs,
board_databricks,
board,
)
from .boards import board_deparse
2 changes: 2 additions & 0 deletions pins/boards.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,8 @@ def board_deparse(board: BaseBoard):
return f"board_gcs({repr(board.board)}{allow_pickle})"
elif prot == "http":
return f"board_url({repr(board.board)}, {board.pin_paths}{allow_pickle})"
elif prot == "dbc":
return f"board_databricks({repr(board.board)}{allow_pickle})"
else:
raise NotImplementedError(
f"board deparsing currently not supported for protocol: {prot}"
Expand Down
64 changes: 64 additions & 0 deletions pins/constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .boards import BaseBoard, BoardManual, BoardRsConnect, board_deparse
from .cache import PinsAccessTimeCache, PinsCache, PinsRscCacheMapper, prefix_cache
from .config import get_cache_dir, get_data_dir
from .errors import PinsError

# Kept here for backward-compatibility reasons
# Note that this is not a constructor, but a function to represent them.
Expand Down Expand Up @@ -87,6 +88,11 @@ def board(

fs = RsConnectFs(**storage_options)

elif protocol == "dbc":
from pins.databricks.fs import DatabricksFs

fs = DatabricksFs(**storage_options)

else:
fs = fsspec.filesystem(protocol, **storage_options)

Expand Down Expand Up @@ -569,3 +575,61 @@ def board_azure(path, versioned=True, cache=DEFAULT, allow_pickle_read=None):

opts = {"use_listings_cache": False}
return board("abfs", path, versioned, cache, allow_pickle_read, storage_options=opts)


def board_databricks(path, versioned=True, cache=DEFAULT, allow_pickle_read=None):
"""Create a board to read and write pins from an Databricks Volume folder.

Parameters
----------
path:
The path to the target folder inside Unity Catalog. The path must include the
catalog, schema, and volume names, preceded by 'Volumes/', for example:
"/Volumes/my-catalog/my-schema/my-volume".
versioned:
Whether or not pins should be versioned.
cache:
Whether to use a cache. By default, pins attempts to select the right cache
directory, given your filesystem. If `None` is passed, then no cache will be
used. You can set the cache using the `PINS_CACHE_DIR` environment variable.
allow_pickle_read: optional, bool
Whether to allow reading pins that use the pickle protocol. Pickles are unsafe,
and can execute arbitrary code. Only allow reading pickles if you trust the
board to execute Python code on your computer.

You can enable reading pickles by setting this to `True`, or by setting the
environment variable `PINS_ALLOW_PICKLE_READ`. If both are set, this argument
takes precedence.

Notes
-----
The Databricks board uses the `databricks-sdk` library to authenticate and interact
with the Databricks Volume.

See <https://docs.databricks.com/aws/en/dev-tools/sdk-python>


Examples
--------

>>> import pytest; pytest.skip()

>>> import pins
>>> from dotenv import load_dotenv
>>> load_dotenv() # eg, for a .env file with DATABRICKS_HOST and DATABRICKS_TOKEN set
>>> board = pins.board_databricks("/Volumes/examples/my-board/test-volume")
>>> board.pin_list()
['df_csv']

>>> board.pin_read("df_csv")
x y z
0 1 a 3
1 2 b 4
"""
try:
import databricks.sdk # noqa: F401
except ModuleNotFoundError:
raise PinsError(
"Install the `databricks-sdk` package for Databricks board support."
)
return board("dbc", path, versioned, cache, allow_pickle_read)
Empty file added pins/databricks/__init__.py
Empty file.
196 changes: 196 additions & 0 deletions pins/databricks/fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import shutil
from io import BytesIO
from pathlib import Path, PurePath

from fsspec import AbstractFileSystem

from pins.errors import PinsError


class DatabricksFs(AbstractFileSystem):
protocol = "dbc"

def ls(self, path, detail=False, **kwargs):
return self._databricks_ls(path, detail)

def exists(self, path: str, **kwargs):
return self._databricks_exists(path)

def open(self, path: str, mode: str = "rb", *args, **kwargs):
if mode != "rb":
raise NotImplementedError
return self._databricks_open(path)

def get(self, rpath, lpath, recursive=False, **kwargs):
self._databricks_get(rpath, lpath, recursive, **kwargs)

def mkdir(self, path, create_parents=True, **kwargs):
if not create_parents:
raise NotImplementedError
self._databricks_mkdir(path)

def put(
self,
lpath,
rpath,
recursive=True,
maxdepth=None,
**kwargs,
):
if not recursive:
raise NotImplementedError
if maxdepth is not None:
raise NotImplementedError
self._databricks_put(lpath, rpath)

def rm(self, path, recursive=True, maxdepth=None) -> None:
if not recursive:
raise NotImplementedError
if maxdepth is not None:
raise NotImplementedError
if self._databricks_exists(path):
self._databricks_rm_dir(path)

@staticmethod
def _databricks_put(lpath, rpath):
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
path = Path(lpath).absolute()
orig_path = path

def _upload_files(path):
contents = Path(path)
for item in contents.iterdir():
abs_path = PurePath(path).joinpath(item)
is_file = Path(abs_path).is_file()
if is_file:
rel_path = abs_path.relative_to(orig_path)
db_path = PurePath(rpath).joinpath(rel_path)
file = open(abs_path, "rb")
w.files.upload(str(db_path), BytesIO(file.read()), overwrite=True)
else:
_upload_files(abs_path)

_upload_files(path)

def _databricks_get(self, board, rpath, lpath, recursive=False, **kwargs):
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
file_type = self._databricks_is_type(rpath)
if file_type == "file":
board.fs.get(rpath, lpath, **kwargs)
return

def _get_files(path, recursive, **kwargs):
raw_contents = w.files.list_directory_contents(path)
contents = list(raw_contents)
details = list(map(self._databricks_content_details, contents))
for item in details:
item_path = item.get("path")
if item.get("is_directory"):
if recursive:
_get_files(item_path, recursive=recursive, **kwargs)
else:
rel_path = PurePath(item_path).relative_to(rpath)
target_path = PurePath(lpath).joinpath(rel_path)
board.fs.get(item_path, str(target_path))

_get_files(rpath, recursive, **kwargs)

def _databricks_open(self, path):
from databricks.sdk import WorkspaceClient

if not self._databricks_exists(path):
raise PinsError(f"File or directory does not exist at path: {path}")
w = WorkspaceClient()
resp = w.files.download(path)
f = BytesIO()
shutil.copyfileobj(resp.contents, f)
f.seek(0)
return f

def _databricks_exists(self, path: str):
if self._databricks_is_type(path) == "nothing":
return False
else:
return True

@staticmethod
def _databricks_is_type(path: str):
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound

w = WorkspaceClient()
try:
w.files.get_metadata(path)
except NotFound:
try:
w.files.get_directory_metadata(path)
except NotFound:
return "nothing"
else:
return "directory"
else:
return "file"

def _databricks_ls(self, path, detail):
from databricks.sdk import WorkspaceClient

if not self._databricks_exists(path):
raise PinsError(f"File or directory does not exist at path: {path}")
w = WorkspaceClient()
if self._databricks_is_type(path) == "file":
if detail:
return [dict(name=path, size=None, type="file")]
else:
return path

contents_raw = w.files.list_directory_contents(path)
contents = list(contents_raw)
items = []
for item in contents:
item = self._databricks_content_details(item)
item_path = item.get("path")
item_path = item_path.rstrip("/")
if detail:
if item.get("is_directory"):
item_type = "directory"
else:
item_type = "file"
items.append(dict(name=item_path, size=None, type=item_type))
else:
items.append(item_path)
return items

def _databricks_rm_dir(self, path):
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
raw_contents = w.files.list_directory_contents(path)
contents = list(raw_contents)
details = list(map(self._databricks_content_details, contents))
for item in details:
item_path = item.get("path")
if item.get("is_directory"):
self._databricks_rm_dir(item_path)
else:
w.files.delete(item_path)
w.files.delete_directory(path)

@staticmethod
def _databricks_mkdir(path):
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
w.files.create_directory(path)

@staticmethod
def _databricks_content_details(item):
details = {
"path": item.path,
"name": item.name,
"is_directory": item.is_directory,
}
return details
2 changes: 2 additions & 0 deletions pins/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def load_path(filename: str, path_to_version, pin_type=None):
filename = "data.csv"

if path_to_version is not None:
if isinstance(path_to_version, str):
path_to_version = path_to_version.rstrip("/")
path_to_file = f"{path_to_version}/{filename}"
else:
# BoardUrl doesn't have versions, and the file is the full url
Expand Down
10 changes: 9 additions & 1 deletion pins/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@
from importlib_resources import files
from pytest import mark as m

from pins.tests.helpers import BoardBuilder, RscBoardBuilder, Snapshot, rm_env
from pins.tests.helpers import (
BoardBuilder,
DbcBoardBuilder,
RscBoardBuilder,
Snapshot,
rm_env,
)

EXAMPLE_REL_PATH = "pins/tests/pins-compat"
PATH_TO_EXAMPLE_BOARD = files("pins") / "tests/pins-compat"
PATH_TO_EXAMPLE_BOARD_DBC = "/Volumes/workshops/my-board/my-volume/test"
PATH_TO_EXAMPLE_VERSION = PATH_TO_EXAMPLE_BOARD / "df_csv/20220214T163720Z-9bfad/"
EXAMPLE_PIN_NAME = "df_csv"

Expand All @@ -21,6 +28,7 @@
pytest.param(lambda: BoardBuilder("s3"), id="s3", marks=m.fs_s3),
pytest.param(lambda: BoardBuilder("gcs"), id="gcs", marks=m.fs_gcs),
pytest.param(lambda: BoardBuilder("abfs"), id="abfs", marks=m.fs_abfs),
pytest.param(lambda: DbcBoardBuilder("dbc"), id="dbc", marks=m.fs_dbc),
]

# rsc should only be used once, because users are created at docker setup time
Expand Down
Loading
Loading