This repository has been archived by the owner on Sep 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbucketfs_location.py
147 lines (128 loc) · 5.38 KB
/
bucketfs_location.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from typing import Any, Tuple, IO, Iterable, Union, Optional
from pathlib import PurePosixPath, Path
from urllib.parse import ParseResult
from exasol_bucketfs_utils_python import download, upload, list_files, \
delete, bucketfs_utils
from exasol_bucketfs_utils_python import load_file_from_local_fs as from_BFS
from exasol_bucketfs_utils_python.bucket_config import BucketConfig
from exasol_bucketfs_utils_python.abstract_bucketfs_location import \
AbstractBucketFSLocation
class BucketFSLocation(AbstractBucketFSLocation):
"""
BucketFSLocation implements AbstractBucketFSLocation.
BucketFSLocation is used to upload fileobjects, strings or joblib objects to
the BucketFS given a path and the object, or to download objects into
strings, fileobjects or joblib objects from the BucketFS given a file path.
Also able to read files from the BucketFS directly, if called from inside of
an UDF. If reading an object via joblib inside of an UDF, make sure the
object type is known inside the UDF.
"""
def __init__(self, bucket_config: BucketConfig,
base_path: Optional[PurePosixPath]):
self.base_path = "" if base_path is None else base_path
self.bucket_config = bucket_config
def generate_bucket_udf_path(
self, path_in_bucket: Optional[Union[str, PurePosixPath]] = None) \
-> PurePosixPath:
return bucketfs_utils.generate_bucket_udf_path(
self.bucket_config,
self.get_complete_file_path_in_bucket(path_in_bucket))
def get_complete_file_path_in_bucket(
self, bucket_file_path: Optional[Union[str, PurePosixPath]] = None) -> str:
if bucket_file_path is not None:
bucket_file_path = bucketfs_utils \
.make_path_relative(bucket_file_path)
else:
bucket_file_path = ""
return str(PurePosixPath(self.base_path, bucket_file_path))
def download_from_bucketfs_to_string(
self,
bucket_file_path: str) -> str:
return download.download_from_bucketfs_to_string(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path)
)
def download_object_from_bucketfs_via_joblib(
self,
bucket_file_path: str) -> Any:
return download.download_object_from_bucketfs_via_joblib(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path)
)
def upload_string_to_bucketfs(
self,
bucket_file_path: str,
string: str) -> Tuple[ParseResult, PurePosixPath]:
return upload.upload_string_to_bucketfs(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path),
string
)
def upload_object_to_bucketfs_via_joblib(
self, object: Any,
bucket_file_path: str,
**kwargs) -> Tuple[ParseResult, PurePosixPath]:
return upload.upload_object_to_bucketfs_via_joblib(
object,
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path),
**kwargs
)
def upload_fileobj_to_bucketfs(
self,
fileobj: IO,
bucket_file_path: str) -> Tuple[ParseResult, PurePosixPath]:
return upload.upload_fileobj_to_bucketfs(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path),
fileobj
)
def read_file_from_bucketfs_to_string(
self,
bucket_file_path: str) -> str:
return from_BFS.read_file_from_bucketfs_to_string(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config
)
def read_file_from_bucketfs_to_file(
self,
bucket_file_path: str,
local_file_path: Path) -> None:
from_BFS.read_file_from_bucketfs_to_file(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config,
local_file_path
)
def read_file_from_bucketfs_to_fileobj(
self,
bucket_file_path: str,
fileobj: IO) -> None:
from_BFS.read_file_from_bucketfs_to_fileobj(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config,
fileobj
)
def read_file_from_bucketfs_via_joblib(
self,
bucket_file_path: str) -> Any:
return from_BFS.read_file_from_bucketfs_via_joblib(
self.get_complete_file_path_in_bucket(bucket_file_path),
self.bucket_config
)
def list_files_in_bucketfs(
self,
bucket_file_path: str) -> Iterable[str]:
return list_files.list_files_in_bucketfs(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path)
)
def delete_file_in_bucketfs(
self,
bucket_file_path: str) -> None:
delete.delete_file_in_bucketfs(
self.bucket_config,
self.get_complete_file_path_in_bucket(bucket_file_path)
)
def joinpath(self, *others: Union[str, PurePosixPath]) -> AbstractBucketFSLocation:
return BucketFSLocation(bucket_config=self.bucket_config,
base_path=PurePosixPath(self.base_path).joinpath(*others))