Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring back GCS ops. #1229

Open
wants to merge 85 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
59ddbc4
Add a standalone binary build for GCS ops
michaelbanfield Jun 18, 2020
ca8e327
Revert "Deprecate gcs-config (#1024)"
michaelbanfield Dec 14, 2020
cb8ffe7
Rebase change
michaelbanfield Dec 14, 2020
3536338
Clean up merge
michaelbanfield Dec 14, 2020
11c8a51
Fix lint errors
michaelbanfield Dec 14, 2020
3560ac6
Skip tests for windows
michaelbanfield Mar 29, 2021
42c2867
Move build target to exclude windows
michaelbanfield Mar 29, 2021
7c04ee3
Update the API Compatibility test to include tf-nightly vs. tensorflo…
yongtang Dec 15, 2020
760dd25
Bump Apache Arrow to 2.0.0 (#1231)
yongtang Dec 16, 2020
9a3663c
Bump Avro to 1.10.1 (#1235)
yongtang Dec 17, 2020
2e6936f
Add emulator for gcs (#1234)
vnghia Dec 18, 2020
04d6913
fix nightly build because of missing `google-cloud-storage` (#1238)
vnghia Dec 18, 2020
aa1a95d
Remove the CI build for CentOS 8 (#1237)
yongtang Dec 18, 2020
6c29813
[MongoDB] update API docstrings (#1243)
kvignesh1420 Dec 22, 2020
371877e
Remove redundant output of dataset.element_spec in PostgreSQL tutoria…
terrytangyuan Dec 22, 2020
88b9d8d
add tf-c-header rule (#1244)
vnghia Dec 22, 2020
d695644
Add `fail-fast: false` to API Compatibility GitHub Actions (#1246)
yongtang Dec 23, 2020
b0ffa2e
Skip tf-nightly:tensorflow-io==0.17.0 on API compatibility test (#1247)
yongtang Dec 23, 2020
a68633e
S3 Improvements (#1248)
vnghia Dec 23, 2020
e29e8a3
Add missed function RecursivelyCreateDir for hdfs file system impleme…
yongtang Dec 24, 2020
fd7bc1a
update bazel version to 3.7.2 (#1251)
kvignesh1420 Dec 26, 2020
7bb3d30
[audio] cleanup vorbis file after usage (#1249)
kvignesh1420 Dec 27, 2020
9966944
[s3] add support for testing on macOS (#1253)
kvignesh1420 Jan 4, 2021
0342a16
add notebook formatting instruction in README (#1256)
burgerkingeater Jan 5, 2021
b56ad5d
[docs] Restructure README.md content (#1257)
kvignesh1420 Jan 5, 2021
ab9004d
Update libtiff/libgeotiff dependency (#1258)
yongtang Jan 5, 2021
0381b91
Update openjpeg to 2.4.0 (#1259)
yongtang Jan 5, 2021
ac8da58
[arrow] using eager exec for examples in README.md (#1261)
kvignesh1420 Jan 6, 2021
3641d2a
remove unstable elasticsearch test setup on macOS (#1263)
kvignesh1420 Jan 6, 2021
4f340e0
Exposes num_parallel_reads and num_parallel_calls (#1232)
i-ony Jan 7, 2021
0663d38
Fix incomplete row reading issue in parquet files (#1262)
yongtang Jan 7, 2021
7cb6b0f
Tests to train a keras model using MongoDBIODataset (#1264)
kvignesh1420 Jan 7, 2021
cebc613
add avro tutorial testing data (#1267)
burgerkingeater Jan 11, 2021
fc8d472
Update Kafka tutorial to work with Apache Kafka (#1266)
dalelane Jan 12, 2021
5df32c6
Update pulsar download link. (#1270)
yongtang Jan 14, 2021
2bbdd40
add github workflow for performance benchmarking (#1269)
kvignesh1420 Jan 14, 2021
f8efb10
handle missing dependencies while benchmarking (#1271)
kvignesh1420 Jan 18, 2021
544740a
Disable s3 macOS for now as docker is not working on GitHub Actions f…
yongtang Jan 19, 2021
b71fcb3
rename testing data files (#1278)
burgerkingeater Jan 19, 2021
337ef96
Add tutorial for avro dataset API (#1250)
burgerkingeater Jan 19, 2021
c0d56ee
remove docker based mongodb tests in macos (#1279)
kvignesh1420 Jan 20, 2021
f16c613
trigger benchmarks workflow only on commits (#1282)
kvignesh1420 Jan 26, 2021
ef2927c
Bump Apache Arrow to 3.0.0 (#1285)
yongtang Jan 27, 2021
03b77de
Add bazel cache (#1287)
yongtang Jan 29, 2021
f492bc8
Add initial bigtable stub test (#1286)
yongtang Jan 29, 2021
2808ac3
Update azure lite v0.3.0 (#1288)
yongtang Jan 30, 2021
5a42d1e
Add reference to github-pages benchmarks in README (#1289)
kvignesh1420 Jan 31, 2021
ff6245a
Update _toc.yaml (#1290)
burgerkingeater Jan 31, 2021
8a1fead
Clear outputs (#1292)
MarkDaoust Feb 2, 2021
b07d1ab
fix kafka online-learning section in tutorial notebook (#1274)
kvignesh1420 Feb 2, 2021
5299d14
Only enable bazel caching writes for tensorflow/io github actions (#1…
yongtang Feb 2, 2021
f02af15
Enable ready-only bazel cache (#1294)
yongtang Feb 2, 2021
84bba4c
Update xz to 5.2.5, and switch the download link. (#1296)
yongtang Feb 3, 2021
0a7c5a2
Enable bazel remote cache for kokoro tests (#1295)
yongtang Feb 3, 2021
880c8b3
Rename tests (#1297)
yongtang Feb 3, 2021
7010a48
Combine Ubuntu 20.04 and CentOS 7 tests into one GitHub jobs (#1299)
yongtang Feb 4, 2021
5091b94
Update names of api tests (#1300)
yongtang Feb 4, 2021
79ccf5e
Fix wrong benchmark tests names (#1301)
yongtang Feb 4, 2021
7945ec5
Patch arrow to temporarily resolve the ARROW-11518 issue (#1304)
yongtang Feb 8, 2021
a398d26
Avoid error if plugins .so module is not available (#1302)
yongtang Feb 8, 2021
df04e37
Remove AWS headers from tensorflow, and use headers from third_party …
yongtang Feb 8, 2021
cc93afa
Fix docstring. (#1305)
MarkDaoust Feb 8, 2021
f34d193
Switch to use github to download libgeotiff (#1307)
yongtang Feb 9, 2021
801569f
Add @com_google_absl//absl/strings:cord (#1308)
yongtang Feb 9, 2021
a871e52
Switch to modular file system for hdfs (#1309)
yongtang Feb 11, 2021
5b77f96
Disable test_write_kafka test for now. (#1310)
yongtang Feb 12, 2021
53c9a71
Modify --plat-name for macosx wheels (#1311)
kvignesh1420 Feb 13, 2021
3f7f292
Switch to modular file system for s3 (#1312)
yongtang Feb 13, 2021
33fca56
Update to enable python 3.9 building on Linux (#1314)
yongtang Feb 26, 2021
314f406
Add python 3.9 on Windows (#1316)
yongtang Feb 27, 2021
fb5cab8
Use `-p 9000:9000` (and hide 8088) when launch hadoop (#1317)
yongtang Mar 2, 2021
221e221
Experimental: Add initial wavefront/obj parser for vertices (#1315)
yongtang Mar 5, 2021
3b81b85
update `protobuf` version to `3.11.4` to match tensorflow-nightly (#1…
vnghia Mar 7, 2021
1c85b77
Revert "update `protobuf` version to `3.11.4` to match tensorflow-nig…
vnghia Mar 12, 2021
ef46f8c
Enable python 3.9 build on macOS (#1324)
yongtang Mar 13, 2021
3121308
switch mnist dataset mirror to a more reliable one (#1327)
vnghia Mar 14, 2021
57d840b
remove flaky centos 7 based build action (#1328)
kvignesh1420 Mar 14, 2021
c7e99a5
Adds AVRO_PARSER_NUM_MINIBATCH to override num_minibatches and logs t…
i-ony Mar 18, 2021
de54c3c
Super Serial- automatically save and load TFRecords from Tensorflow d…
markemus Mar 18, 2021
9644be3
Fix link in avro reader notebook (#1333)
oliverhu Mar 21, 2021
8d7d28f
Bump abseil-cpp to 6f9d96a1f41439ac172ee2ef7ccd8edf0e5d068c (#1336)
yongtang Mar 23, 2021
3de431d
Release nightly even if test fails (#1339)
yongtang Mar 23, 2021
ef8a5d5
remove unused/stale azure_ops (#1338)
kvignesh1420 Mar 23, 2021
4154a2c
gcs switch to env (#1319)
vnghia Mar 24, 2021
64eb761
improvements for `s3` environements variables (#1343)
vnghia Mar 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tensorflow_io/gcs/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
licenses(["notice"]) # Apache 2.0

package(default_visibility = ["//visibility:public"])

load(
"//:tools/build/tensorflow_io.bzl",
"tf_io_copts",
)

cc_binary(
name = "_gcs_config_ops.so",
copts = tf_io_copts(),
linkshared = 1,
deps = [
":gcs_config_ops",
],
)

cc_library(
name = "gcs_config_ops",
srcs = [
"kernels/gcs_config_op_kernels.cc",
"ops/gcs_config_ops.cc",
],
copts = tf_io_copts(),
linkstatic = True,
deps = [
"@curl",
"@jsoncpp_git//:jsoncpp",
"@local_config_tf//:libtensorflow_framework",
"@local_config_tf//:tf_header_lib",
],
alwayslink = 1,
)
3 changes: 3 additions & 0 deletions tensorflow_io/gcs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## Cloud Storage (GCS) ##

The Google Cloud Storage ops allow the user to configure the GCS File System.
29 changes: 29 additions & 0 deletions tensorflow_io/gcs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Module for cloud ops."""


from tensorflow.python.util.all_util import remove_undocumented

# pylint: disable=line-too-long,wildcard-import,g-import-not-at-top
from tensorflow_io.gcs.python.ops.gcs_config_ops import *

_allowed_symbols = [
"configure_colab_session",
"configure_gcs",
"BlockCacheParams",
"ConfigureGcsHook",
]
remove_undocumented(__name__, _allowed_symbols)
206 changes: 206 additions & 0 deletions tensorflow_io/gcs/kernels/gcs_config_op_kernels.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include <sstream>

#include "include/json/json.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/tensor_shape.h"
#include "tensorflow/core/platform/cloud/curl_http_request.h"
#include "tensorflow/core/platform/cloud/gcs_file_system.h"
#include "tensorflow/core/platform/cloud/oauth_client.h"
#include "tensorflow/core/util/ptr_util.h"

namespace tensorflow {
namespace {

// The default initial delay between retries with exponential backoff.
constexpr int kInitialRetryDelayUsec = 500000; // 0.5 sec

// The minimum time delta between now and the token expiration time
// for the token to be re-used.
constexpr int kExpirationTimeMarginSec = 60;

// The URL to retrieve the auth bearer token via OAuth with a refresh token.
constexpr char kOAuthV3Url[] = "https://www.googleapis.com/oauth2/v3/token";

// The URL to retrieve the auth bearer token via OAuth with a private key.
constexpr char kOAuthV4Url[] = "https://www.googleapis.com/oauth2/v4/token";

// The authentication token scope to request.
constexpr char kOAuthScope[] = "https://www.googleapis.com/auth/cloud-platform";

Status RetrieveGcsFs(OpKernelContext* ctx, RetryingGcsFileSystem** fs) {
DCHECK(fs != nullptr);
*fs = nullptr;

FileSystem* filesystem = nullptr;
TF_RETURN_IF_ERROR(
ctx->env()->GetFileSystemForFile("gs://fake/file.text", &filesystem));
if (filesystem == nullptr) {
return errors::FailedPrecondition("The GCS file system is not registered.");
}

*fs = dynamic_cast<RetryingGcsFileSystem*>(filesystem);
if (*fs == nullptr) {
return errors::Internal(
"The filesystem registered under the 'gs://' scheme was not a "
"tensorflow::RetryingGcsFileSystem*.");
}
Comment on lines +56 to +61
Copy link
Member

@kvignesh1420 kvignesh1420 Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelbanfield I think the dynamic_cast<RetryingGcsFileSystem*>(filesystem) is returning a null pointer which in turn is raising the error. I think this is because the gs file system plugin has already been registered and there is a type mismatch. Can we remove this typecast operation? Even the tests are failing for linux and macos: https://github.com/tensorflow/io/pull/1229/checks?check_run_id=2221952977

return Status::OK();
}

template <typename T>
Status ParseScalarArgument(OpKernelContext* ctx, StringPiece argument_name,
T* output) {
const Tensor* argument_t;
TF_RETURN_IF_ERROR(ctx->input(argument_name, &argument_t));
if (!TensorShapeUtils::IsScalar(argument_t->shape())) {
return errors::InvalidArgument(argument_name, " must be a scalar");
}
*output = argument_t->scalar<T>()();
return Status::OK();
}

// GcsCredentialsOpKernel overrides the credentials used by the gcs_filesystem.
class GcsCredentialsOpKernel : public OpKernel {
public:
explicit GcsCredentialsOpKernel(OpKernelConstruction* ctx) : OpKernel(ctx) {}
void Compute(OpKernelContext* ctx) override {
// Get a handle to the GCS file system.
RetryingGcsFileSystem* gcs = nullptr;
OP_REQUIRES_OK(ctx, RetrieveGcsFs(ctx, &gcs));

tstring json_string;
OP_REQUIRES_OK(ctx,
ParseScalarArgument<tstring>(ctx, "json", &json_string));

Json::Value json;
Json::Reader reader;
std::stringstream json_stream(json_string);
OP_REQUIRES(ctx, reader.parse(json_stream, json),
errors::InvalidArgument("Could not parse json: ", json_string));

OP_REQUIRES(
ctx, json.isMember("refresh_token") || json.isMember("private_key"),
errors::InvalidArgument("JSON format incompatible; did not find fields "
"`refresh_token` or `private_key`."));

auto provider =
tensorflow::MakeUnique<ConstantAuthProvider>(json, ctx->env());

// Test getting a token
string dummy_token;
OP_REQUIRES_OK(ctx, provider->GetToken(&dummy_token));
OP_REQUIRES(ctx, !dummy_token.empty(),
errors::InvalidArgument(
"Could not retrieve a token with the given credentials."));

// Set the provider.
gcs->underlying()->SetAuthProvider(std::move(provider));
}

private:
class ConstantAuthProvider : public AuthProvider {
public:
ConstantAuthProvider(const Json::Value& json,
std::unique_ptr<OAuthClient> oauth_client, Env* env,
int64 initial_retry_delay_usec)
: json_(json),
oauth_client_(std::move(oauth_client)),
env_(env),
initial_retry_delay_usec_(initial_retry_delay_usec) {}

ConstantAuthProvider(const Json::Value& json, Env* env)
: ConstantAuthProvider(json, tensorflow::MakeUnique<OAuthClient>(), env,
kInitialRetryDelayUsec) {}

~ConstantAuthProvider() override {}

Status GetToken(string* token) override {
mutex_lock l(mu_);
const uint64 now_sec = env_->NowSeconds();

if (!current_token_.empty() &&
now_sec + kExpirationTimeMarginSec < expiration_timestamp_sec_) {
*token = current_token_;
return Status::OK();
}
if (json_.isMember("refresh_token")) {
TF_RETURN_IF_ERROR(oauth_client_->GetTokenFromRefreshTokenJson(
json_, kOAuthV3Url, &current_token_, &expiration_timestamp_sec_));
} else if (json_.isMember("private_key")) {
TF_RETURN_IF_ERROR(oauth_client_->GetTokenFromServiceAccountJson(
json_, kOAuthV4Url, kOAuthScope, &current_token_,
&expiration_timestamp_sec_));
} else {
return errors::FailedPrecondition(
"Unexpected content of the JSON credentials file.");
}

*token = current_token_;
return Status::OK();
}

private:
Json::Value json_;
std::unique_ptr<OAuthClient> oauth_client_;
Env* env_;

mutex mu_;
string current_token_ TF_GUARDED_BY(mu_);
uint64 expiration_timestamp_sec_ TF_GUARDED_BY(mu_) = 0;

// The initial delay for exponential backoffs when retrying failed calls.
const int64 initial_retry_delay_usec_;
TF_DISALLOW_COPY_AND_ASSIGN(ConstantAuthProvider);
};
};

REGISTER_KERNEL_BUILDER(Name("IO>GcsConfigureCredentials").Device(DEVICE_CPU),
GcsCredentialsOpKernel);

class GcsBlockCacheOpKernel : public OpKernel {
public:
explicit GcsBlockCacheOpKernel(OpKernelConstruction* ctx) : OpKernel(ctx) {}
void Compute(OpKernelContext* ctx) override {
// Get a handle to the GCS file system.
RetryingGcsFileSystem* gcs = nullptr;
OP_REQUIRES_OK(ctx, RetrieveGcsFs(ctx, &gcs));

size_t max_cache_size, block_size, max_staleness;
OP_REQUIRES_OK(ctx, ParseScalarArgument<size_t>(ctx, "max_cache_size",
&max_cache_size));
OP_REQUIRES_OK(ctx,
ParseScalarArgument<size_t>(ctx, "block_size", &block_size));
OP_REQUIRES_OK(
ctx, ParseScalarArgument<size_t>(ctx, "max_staleness", &max_staleness));

if (gcs->underlying()->block_size() == block_size &&
gcs->underlying()->max_bytes() == max_cache_size &&
gcs->underlying()->max_staleness() == max_staleness) {
LOG(INFO) << "Skipping resetting the GCS block cache.";
return;
}
gcs->underlying()->ResetFileBlockCache(block_size, max_cache_size,
max_staleness);
}
};

REGISTER_KERNEL_BUILDER(Name("IO>GcsConfigureBlockCache").Device(DEVICE_CPU),
GcsBlockCacheOpKernel);

} // namespace
} // namespace tensorflow
66 changes: 66 additions & 0 deletions tensorflow_io/gcs/ops/gcs_config_ops.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include "tensorflow/core/framework/common_shape_fns.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/shape_inference.h"

namespace tensorflow {

using shape_inference::InferenceContext;

REGISTER_OP("IO>GcsConfigureCredentials")
.Input("json: string")
.SetShapeFn(shape_inference::NoOutputs)
.Doc(R"doc(
Configures the credentials used by the GCS client of the local TF runtime.
The json input can be of the format:
1. Refresh Token:
{
"client_id": "<redacted>",
"client_secret": "<redacted>",
"refresh_token: "<redacted>",
"type": "authorized_user",
}
2. Service Account:
{
"type": "service_account",
"project_id": "<redacted>",
"private_key_id": "<redacted>",
"private_key": "------BEGIN PRIVATE KEY-----\n<REDACTED>\n-----END PRIVATE KEY------\n",
"client_email": "<REDACTED>@<REDACTED>.iam.gserviceaccount.com",
"client_id": "<REDACTED>",
# Some additional fields elided
}
Note the credentials established through this method are shared across all
sessions run on this runtime.
Note be sure to feed the inputs to this op to ensure the credentials are not
stored in a constant op within the graph that might accidentally be checkpointed
or in other ways be persisted or exfiltrated.
)doc");

REGISTER_OP("IO>GcsConfigureBlockCache")
.Input("max_cache_size: uint64")
.Input("block_size: uint64")
.Input("max_staleness: uint64")
.SetShapeFn(shape_inference::NoOutputs)
.Doc(R"doc(
Re-configures the GCS block cache with the new configuration values.
If the values are the same as already configured values, this op is a no-op. If
they are different, the current contents of the block cache is dropped, and a
new block cache is created fresh.
)doc");

} // namespace tensorflow
16 changes: 16 additions & 0 deletions tensorflow_io/gcs/python/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""This module contains Python API methods for GCS integration."""
16 changes: 16 additions & 0 deletions tensorflow_io/gcs/python/ops/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""This module contains the Python API methods for GCS integration."""
Loading