forked from GoogleCloudPlatform/ml-on-gcp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcs_helper.py
107 lines (73 loc) · 2.99 KB
/
gcs_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# Copyright 2017, Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for accessing Google Cloud Storage in Python code.
`pickle_and_upload`: Upload a Python object after pickling to
user-specified `bucket_name` and `object_name`.
`download_and_unpickle`: The opposite of `pickle_and_upload`.
For more information:
https://cloud.google.com/storage/
"""
import os
import re
import shutil
import pickle
from google.cloud import storage
def _make_gcs_uri(bucket_name, object_name):
return 'gs://{}/{}'.format(bucket_name, object_name)
def _split_uri(gcs_uri):
"""Splits gs://bucket_name/object_name to (bucket_name, object_name)"""
pattern = r'gs://([^/]+)/(.+)'
match = re.match(pattern, gcs_uri)
bucket_name = match.group(1)
object_name = match.group(2)
return bucket_name, object_name
def get_blob(bucket_name, object_name):
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
return blob
def get_uri_blob(gcs_uri):
bucket_name, object_name = _split_uri(gcs_uri)
return get_blob(bucket_name, object_name)
def archive_and_upload(bucket_name, directory, extension='zip', object_name=None):
"""Archives a directory and upload to GCS.
Returns the object's GCS uri.
"""
storage_client = storage.Client()
object_name = object_name or '{}.{}'.format(directory, extension)
temp_filename = shutil.make_archive('_tmp', extension, directory)
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
blob.upload_from_filename(temp_filename)
os.remove(temp_filename)
return _make_gcs_uri(bucket_name, object_name)
def pickle_and_upload(obj, bucket_name, object_name):
"""Returns the object's GCS uri."""
print('pickling data')
pickle_str = pickle.dumps(obj)
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
print('uploading object {} to bucket {}'.format(object_name, bucket_name))
blob.upload_from_string(pickle_str)
return _make_gcs_uri(bucket_name, object_name)
def download_and_unpickle(bucket_name, object_name):
blob = get_blob(bucket_name, object_name)
pickle_str = blob.download_as_string()
obj = pickle.loads(pickle_str)
return obj
def download_uri_and_unpickle(gcs_uri):
bucket_name, object_name = _split_uri(gcs_uri)
obj = download_and_unpickle(bucket_name, object_name)
return obj