1
1
import os
2
- import glob
3
2
from typing import Optional , List , Union
4
3
5
4
import fire
6
5
6
+ from databricks .sdk import WorkspaceClient
7
7
from rocket .file_watcher import FileWatcher
8
8
from rocket .logger import logger
9
9
from rocket .utils import (
@@ -54,49 +54,63 @@ def launch(
54
54
project_location : str = "." ,
55
55
dbfs_path : Optional [str ] = None ,
56
56
watch : bool = True ,
57
- glob_path : Optional [Union [str , List [str ]]] = None
57
+ glob_path : Optional [Union [str , List [str ]]] = None ,
58
+ use_volumes : Optional [bool ] = False ,
59
+ dst_path : Optional [str ] = None ,
58
60
) -> None :
59
61
"""
60
62
Entrypoint of the application, triggers a build and deploy
61
63
:param project_location: path to project code, default: `"."`
62
- :param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject
64
+ :param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject. Only support dbfs path.
63
65
:param watch: Set to false if you don't want to automatically sync your files
64
66
:param glob_path: glob string or list of strings for additional files to deploy, e.g. "*.json"
67
+ :param use_volumes: upload files to unity catalog volumes.
68
+ :param dst_path: Destination path to store the files. Support both dbfs:/ and /Volumes. Ideally, we should use dst_path and deprecate dbfs_path.
65
69
:return:
66
70
"""
67
71
if os .getenv ("DATABRICKS_TOKEN" ) is None :
68
72
raise Exception ("DATABRICKS_TOKEN must be set for db-rocket to work" )
69
73
70
- if dbfs_path is not None and not dbfs_path .startswith ("dbfs:/" ):
71
- raise Exception ("`dbfs_path` must start with dbfs:/" )
72
-
73
- try :
74
- execute_shell_command (f"databricks fs ls dbfs:/" )
75
- except Exception as e :
76
- raise Exception (
77
- f"Error accessing DBFS via databricks-cli. Please check if your databricks token is set and valid? Try to generate a new token and update existing one with `databricks configure --token`. Error details: { e } "
78
- )
74
+ base_dbfs_access_error_message = ("Please check if your databricks token is set and valid? "
75
+ "Try to generate a new token and update existing one with "
76
+ "`databricks configure --token`." )
77
+ if use_volumes :
78
+ try :
79
+ workspace_client = WorkspaceClient ()
80
+ workspace_client .dbutils .fs .ls ("dbfs:/" )
81
+ except Exception as e :
82
+ raise Exception (
83
+ f"Could not access dbfs using databricks SDK. { base_dbfs_access_error_message } Error details: { e } "
84
+ )
85
+ db_path = self .get_volumes_path (dst_path )
86
+ else :
87
+ try :
88
+ execute_shell_command (f"databricks fs ls dbfs:/" )
89
+ except Exception as e :
90
+ raise Exception (
91
+ f"Error accessing DBFS via databricks-cli. { base_dbfs_access_error_message } Error details: { e } "
92
+ )
93
+ path_to_use = dst_path if dst_path else dbfs_path
94
+ db_path = self .get_dbfs_path (path_to_use )
79
95
80
- if not dbfs_path :
81
- dbfs_path = f"dbfs:/temp/{ os .environ ['USER' ]} "
82
96
if watch :
83
97
project_name = os .path .abspath (project_location ).split ("/" )[- 1 ]
84
- dbfs_path = f"{ dbfs_path } /{ project_name } "
98
+ db_path = f"{ db_path } /{ project_name } "
85
99
86
100
glob_paths = []
87
101
if isinstance (glob_path , str ):
88
102
glob_paths = [os .path .join (project_location , glob_path )]
89
103
elif isinstance (glob_path , list ):
90
104
glob_paths = [os .path .join (project_location , path ) for path in glob_path ]
91
105
92
- self ._build_and_deploy (watch = watch , project_location = project_location , dbfs_path = dbfs_path , glob_paths = glob_paths )
106
+ self ._build_and_deploy (watch = watch , project_location = project_location , db_path = db_path , glob_paths = glob_paths )
93
107
if watch :
94
108
watcher = FileWatcher (
95
109
project_location ,
96
110
lambda x : self ._build_and_deploy (
97
111
watch = watch ,
98
112
modified_files = watcher .modified_files ,
99
- dbfs_path = dbfs_path ,
113
+ db_path = db_path ,
100
114
project_location = project_location ,
101
115
glob_paths = glob_path
102
116
),
@@ -108,15 +122,15 @@ def _build_and_deploy(
108
122
self ,
109
123
watch : bool ,
110
124
project_location : str ,
111
- dbfs_path : str ,
125
+ db_path : str ,
112
126
modified_files : Optional [List [str ]] = None ,
113
127
glob_paths : Optional [List [str ]] = None
114
128
) -> None :
115
129
if modified_files :
116
130
logger .info (f"Found changes in { modified_files } . Overwriting them." )
117
131
self ._deploy (
118
132
file_paths = modified_files ,
119
- dbfs_path = dbfs_path ,
133
+ db_path = db_path ,
120
134
project_location = project_location ,
121
135
)
122
136
return
@@ -128,10 +142,10 @@ def _build_and_deploy(
128
142
wheel_path , wheel_file = self ._create_python_project_wheel (project_location )
129
143
self ._deploy (
130
144
file_paths = [wheel_path ],
131
- dbfs_path = dbfs_path ,
145
+ db_path = db_path ,
132
146
project_location = os .path .dirname (wheel_path ),
133
147
)
134
- install_path = f' { dbfs_path . replace ( "dbfs:/" , "/dbfs/" )} /{ wheel_file } '
148
+ install_path = f" { self . get_install_path ( db_path )} /{ wheel_file } "
135
149
136
150
dependency_files = ["requirements.in" , "requirements.txt" ]
137
151
index_urls = []
@@ -183,10 +197,10 @@ def _build_and_deploy(
183
197
line .strip () for line in f .readlines () if "index-url" in line
184
198
]
185
199
self ._deploy (
186
- file_paths = list (files ), dbfs_path = dbfs_path , project_location = project_location
200
+ file_paths = list (files ), db_path = db_path , project_location = project_location
187
201
)
188
202
189
- install_path = f' { dbfs_path . replace ( "dbfs:/" , "/dbfs/" ) } '
203
+ install_path = self . get_install_path ( db_path )
190
204
index_urls_options = " " .join (index_urls )
191
205
192
206
if dependency_file_exist :
@@ -215,16 +229,54 @@ def _build_and_deploy(
215
229
def _deploy (
216
230
self ,
217
231
file_paths : List [str ],
218
- dbfs_path : str ,
232
+ db_path : str ,
219
233
project_location : str
220
234
) -> None :
235
+ if self .is_dbfs (db_path ):
236
+ self ._deploy_dbfs (file_paths , db_path , project_location )
237
+ else :
238
+ w = WorkspaceClient ()
239
+ self ._deploy_volumes (file_paths , db_path , project_location , w )
240
+
241
+ def _deploy_dbfs (
242
+ self ,
243
+ file_paths : List [str ],
244
+ db_path : str ,
245
+ project_location : str
246
+ ):
221
247
def helper (file : str ) -> None :
222
- target_path = f"{ dbfs_path } /{ os .path .relpath (file , project_location )} "
248
+ target_path = f"{ db_path } /{ os .path .relpath (file , project_location )} "
223
249
execute_shell_command (f"databricks fs cp --recursive --overwrite { file } { target_path } " )
224
250
logger .info (f"Uploaded { file } to { target_path } " )
225
251
226
252
execute_for_each_multithreaded (file_paths , lambda x : helper (x ))
227
253
254
+ def _deploy_volumes (
255
+ self ,
256
+ file_paths : List [str ],
257
+ db_path : str ,
258
+ project_location : str ,
259
+ workspace_client
260
+ ):
261
+ def helper (wc , file : str ) -> None :
262
+ # sdk asks an absolute path
263
+ if not os .path .isabs (file ):
264
+ cwd = os .getcwd ()
265
+ file = f"{ cwd } /{ file } "
266
+ target_path = f"{ db_path } /{ os .path .relpath (file , project_location )} "
267
+ # if the file already exists, sdk returns error message: The file being created already exists.
268
+ # a feature request is already here: https://github.com/databricks/databricks-sdk-py/issues/548
269
+ try :
270
+ wc .dbutils .fs .rm (target_path )
271
+ except Exception :
272
+ pass
273
+ # sdk uses urllibs3 to parse paths.
274
+ # It need to be file:// to be recognized as a local file. Otherwise it raises file not exist error
275
+ wc .dbutils .fs .cp (f"file://{ file } " , target_path )
276
+ logger .info (f"Uploaded { file } to { target_path } " )
277
+
278
+ execute_for_each_multithreaded (file_paths , lambda x : helper (workspace_client , x ))
279
+
228
280
def _create_python_project_wheel (self , project_location : str ) -> (str , str ):
229
281
dist_location = f"{ project_location } /dist"
230
282
execute_shell_command (f"rm { dist_location } /* 2>/dev/null || true" )
@@ -250,6 +302,26 @@ def _create_python_project_wheel(self, project_location: str) -> (str, str):
250
302
wheel_path = f"{ dist_location } /{ wheel_file } "
251
303
return wheel_path , wheel_file
252
304
305
+ def get_dbfs_path (self , path : Optional [str ]) -> str :
306
+ if path :
307
+ logger .warning ("The `dbfs_path` parameter is planned for deprecation. Please use the `dst_path` parameter instead." )
308
+ if not self .is_dbfs (path ):
309
+ raise Exception ("`dbfs_path` must start with dbfs:/" )
310
+ return path or f"dbfs:/temp/{ os .environ ['USER' ]} "
311
+
312
+ def get_volumes_path (self , path : Optional [str ]) -> str :
313
+ if path and not path .startswith ("/Volumes" ):
314
+ raise Exception ("`use_volumes` is true. `dst_path` must start with /Volumes" )
315
+ return path or f"/Volumes/main/data_products/volume/db_rocket/{ os .environ ['USER' ]} "
316
+
317
+ def get_install_path (self , db_path ):
318
+ if self .is_dbfs (db_path ):
319
+ return f'{ db_path .replace ("dbfs:/" , "/dbfs/" )} '
320
+ return db_path
321
+
322
+ def is_dbfs (self , db_path : str ):
323
+ return db_path .startswith ("dbfs:/" )
324
+
253
325
254
326
def main ():
255
327
fire .Fire (Rocket )
0 commit comments