-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathai4e_azure_utils.py
188 lines (156 loc) · 6.71 KB
/
ai4e_azure_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
Miscellaneous Azure Blob Storage utilities
Requires azure-storage-blob>=12.4.0
"""
import json
from typing import Any, Iterable, List, Optional, Sequence, Tuple, Union
from azure.storage.blob import BlobPrefix, ContainerClient
import sas_blob_utils
def walk_container(container_client: ContainerClient,
max_depth: int = -1,
prefix: str = '',
store_folders: bool = True,
store_blobs: bool = True,
debug_max_items: int = -1) -> Tuple[List[str], List[str]]:
"""
Recursively walk folders a Azure Blob Storage container.
Based on:
https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/storage/azure-storage-blob/samples/blob_samples_walk_blob_hierarchy.py
"""
depth = 1
def walk_blob_hierarchy(prefix: str,
folders: Optional[List[str]] = None,
blobs: Optional[List[str]] = None
) -> Tuple[List[str], List[str]]:
if folders is None:
folders = []
if blobs is None:
blobs = []
nonlocal depth
if 0 < max_depth < depth:
return folders, blobs
for item in container_client.walk_blobs(name_starts_with=prefix):
short_name = item.name[len(prefix):]
if isinstance(item, BlobPrefix):
# print('F: ' + prefix + short_name)
if store_folders:
folders.append(prefix + short_name)
depth += 1
walk_blob_hierarchy(item.name, folders=folders, blobs=blobs)
if (debug_max_items > 0
and len(folders) + len(blobs) > debug_max_items):
return folders, blobs
depth -= 1
else:
if store_blobs:
blobs.append(prefix + short_name)
return folders, blobs
folders, blobs = walk_blob_hierarchy(prefix=prefix)
assert all(s.endswith('/') for s in folders)
folders = [s.strip('/') for s in folders]
return folders, blobs
def list_top_level_blob_folders(container_client: ContainerClient) -> List[str]:
"""
List all top-level folders in a container.
"""
top_level_folders, _ = walk_container(
container_client, max_depth=1, store_blobs=False)
return top_level_folders
def concatenate_json_lists(input_files: Iterable[str],
output_file: Optional[str] = None
) -> List[Any]:
"""
Given a list of JSON files that contain lists (typically string
filenames), concatenates the lists into a single list and optionally
writes out this list to a new output JSON file.
"""
output_list = []
for fn in input_files:
with open(fn, 'r') as f:
file_list = json.load(f)
output_list.extend(file_list)
if output_file is not None:
with open(output_file, 'w') as f:
json.dump(output_list, f, indent=1)
return output_list
def write_list_to_file(output_file: str, strings: Sequence[str]) -> None:
"""
Writes a list of strings to either a JSON file or text file,
depending on extension of the given file name.
"""
with open(output_file, 'w') as f:
if output_file.endswith('.json'):
json.dump(strings, f, indent=1)
else:
f.write('\n'.join(strings))
def read_list_from_file(filename: str) -> List[str]:
"""
Reads a json-formatted list of strings from a file.
"""
assert filename.endswith('.json')
with open(filename, 'r') as f:
file_list = json.load(f)
assert isinstance(file_list, list)
for s in file_list:
assert isinstance(s, str)
return file_list
def upload_file_to_blob(account_name: str,
container_name: str,
local_path: str,
blob_name: str,
sas_token: str,
overwrite: bool=False) -> str:
"""
Uploads a local file to Azure Blob Storage and returns the uploaded
blob URI with SAS token.
"""
container_uri = sas_blob_utils.build_azure_storage_uri(
account=account_name, container=container_name, sas_token=sas_token)
with open(local_path, 'rb') as data:
return sas_blob_utils.upload_blob(
container_uri=container_uri, blob_name=blob_name, data=data,
overwrite=overwrite)
def enumerate_blobs_to_file(
output_file: str,
account_name: str,
container_name: str,
sas_token: Optional[str] = None,
blob_prefix: Optional[str] = None,
blob_suffix: Optional[Union[str, Tuple[str]]] = None,
rsearch: Optional[str] = None,
limit: Optional[int] = None,
verbose: Optional[bool] = True
) -> List[str]:
"""
Enumerates blobs in a container, and writes the blob names to an output
file.
Args:
output_file: str, path to save list of files in container
If ends in '.json', writes a JSON string. Otherwise, writes a
newline-delimited list. Can be None, in which case this is just a
convenient wrapper for blob enumeration.
account_name: str, Azure Storage account name
container_name: str, Azure Blob Storage container name
sas_token: optional str, container SAS token, leading ? will be removed if present.
blob_prefix: optional str, returned results will only contain blob names
to with this prefix
blob_suffix: optional str or tuple of str, returned results will only
contain blob names with this/these suffix(es). The blob names will
be lowercased first before comparing with the suffix(es).
rsearch: optional str, returned results will only contain blob names
that match this regex. Can also be a list of regexes, in which case
blobs matching *any* of the regex's will be returned.
limit: int, maximum # of blob names to list
if None, then returns all blob names
Returns: list of str, sorted blob names, of length limit or shorter.
"""
if sas_token is not None and len(sas_token) > 9 and sas_token[0] == '?':
sas_token = sas_token[1:]
container_uri = sas_blob_utils.build_azure_storage_uri(
account=account_name, container=container_name, sas_token=sas_token)
matched_blobs = sas_blob_utils.list_blobs_in_container(
container_uri=container_uri, blob_prefix=blob_prefix,
blob_suffix=blob_suffix, rsearch=rsearch, limit=limit, verbose=verbose)
if output_file is not None:
write_list_to_file(output_file, matched_blobs)
return matched_blobs