Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[🍒]Wrap error thrown in Storage Client #1503

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 103 additions & 31 deletions src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,7 +72,14 @@ public Blob pickABlob(String path) {
return null;
}
GCSPath gcsPath = GCSPath.from(path);
Page<Blob> blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
Page<Blob> blobPage;
try {
blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
} catch (Exception e) {
String errorReason = String.format("Unable to list objects in bucket %s.", gcsPath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
Iterator<Blob> iterator = blobPage.getValues().iterator();
while (iterator.hasNext()) {
Blob blob = iterator.next();
Expand All @@ -89,7 +100,13 @@ public void setMetaData(Blob blob, Map<String, String> metaData) {
if (blob == null || metaData == null || metaData.isEmpty()) {
return;
}
storage.update(BlobInfo.newBuilder(blob.getBlobId()).setMetadata(metaData).build());
try {
storage.update(BlobInfo.newBuilder(blob.getBlobId()).setMetadata(metaData).build());
} catch (Exception e) {
String errorReason = String.format("Unable to update metadata for blob %s.", blob.getName());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
}

/**
Expand All @@ -102,7 +119,14 @@ public void mapMetaDataForAllBlobs(String path, Consumer<Map<String, String>> fu
return;
}
GCSPath gcsPath = GCSPath.from(path);
Page<Blob> blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
Page<Blob> blobPage;
try {
blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
} catch (Exception e) {
String errorReason = String.format("Unable to list objects in bucket %s.", gcsPath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
Iterator<Blob> blobIterator = blobPage.iterateAll().iterator();
while (blobIterator.hasNext()) {
Blob blob = blobIterator.next();
Expand Down Expand Up @@ -132,9 +156,11 @@ public void createBucketIfNotExists(GCSPath path, @Nullable String location, @Nu
LOG.warn("Getting 409 Conflict: {} Bucket at destination path {} may already exist.",
e.getMessage(), path.getUri());
} else {
throw new RuntimeException(
String errorReason =
String.format("Unable to create bucket %s. Ensure you entered the correct bucket path and " +
"have permissions for it.", path.getBucket()), e);
"have permissions for it.", path.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
}
}
Expand Down Expand Up @@ -173,9 +199,16 @@ public void move(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolea
* Get all the matching wildcard paths given the regex input.
*/
public List<GCSPath> getMatchedPaths(GCSPath sourcePath, boolean recursive, Pattern wildcardRegex) {
Page<Blob> blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix(
Page<Blob> blobPage;
try {
blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix(
getWildcardPathPrefix(sourcePath, wildcardRegex)
));
));
} catch (Exception e) {
String errorReason = String.format("Unable to list objects in bucket %s.", sourcePath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
List<String> blobPageNames = new ArrayList<>();
blobPage.getValues().forEach(blob -> blobPageNames.add(blob.getName()));
return getFilterMatchedPaths(sourcePath, blobPageNames, recursive);
Expand Down Expand Up @@ -212,58 +245,84 @@ static List<GCSPath> getFilterMatchedPaths(GCSPath sourcePath, List<String> blob
private void pairTraverse(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolean overwrite,
Consumer<BlobPair> consumer) {

Bucket sourceBucket = null;
Bucket sourceBucket;
try {
sourceBucket = storage.get(sourcePath.getBucket());
} catch (StorageException e) {
} catch (Exception e) {
// Add more descriptive error message
throw new RuntimeException(
String.format("Unable to access source bucket %s. ", sourcePath.getBucket())
+ "Ensure you entered the correct bucket path.", e);
String errorReason = String.format("Unable to access GCS bucket '%s'", sourcePath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
if (sourceBucket == null) {
throw new IllegalArgumentException(
String.format("Source bucket '%s' does not exist.", sourcePath.getBucket()));
String errorReason = String.format("Source bucket '%s' does not exist.", sourcePath.getBucket());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorReason, ErrorType.USER, true, null);
}
Bucket destBucket = null;
Bucket destBucket;
try {
destBucket = storage.get(destPath.getBucket());
} catch (StorageException e) {
} catch (Exception e) {
// Add more descriptive error message
throw new RuntimeException(
String.format("Unable to access destination bucket %s. ", destPath.getBucket())
+ "Ensure you entered the correct bucket path.", e);
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
if (destBucket == null) {
throw new IllegalArgumentException(
String.format("Destination bucket '%s' does not exist. Please create it first.", destPath.getBucket()));
String errorReason =
String.format("Destination bucket '%s' does not exist. Please create it first.", destPath.getBucket());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorReason, ErrorType.USER, true, null);
}

boolean destinationBaseExists;
String baseDestName = destPath.getName();
if (destPath.isBucket() || storage.get(BlobId.of(destPath.getBucket(), baseDestName)) != null) {
boolean destinationBlobExists;
try {
destinationBlobExists = destPath.isBucket() || storage.get(BlobId.of(destPath.getBucket(), baseDestName)) != null;
} catch (Exception e) {
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
if (destinationBlobExists) {
destinationBaseExists = true;
} else {
// if gs://bucket2/subdir doesn't exist, check if gs://bucket2/subdir/ exists
// similarly, if gs://bucket2/subdir/ doesn't exist, check if gs://bucket2/subdir exists
// this is because "cp dir0 subdir" and "cp dir0 subdir/" are equivalent if the 'subdir' directory exists
String modifiedName = baseDestName.endsWith("/") ?
baseDestName.substring(0, baseDestName.length() - 1) : baseDestName + "/";
destinationBaseExists = storage.get(BlobId.of(destPath.getBucket(), modifiedName)) != null;
try {
destinationBaseExists = storage.get(BlobId.of(destPath.getBucket(), modifiedName)) != null;
} catch (Exception e) {
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
}

List<BlobPair> copyList = new ArrayList<>();
traverse(BlobId.of(sourcePath.getBucket(), sourcePath.getName()), recursive, sourceBlob -> {
BlobId destBlobID = resolve(sourcePath.getName(), sourceBlob.getBlobId().getName(),
destPath, destinationBaseExists);
if (!overwrite) {
Blob destBlob = storage.get(destBlobID);
Blob destBlob;
try {
destBlob = storage.get(destBlobID);
} catch (Exception e) {
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
// we can't just use Blob's isDirectory() because the cloud console will create a 'directory' by creating
// a 0 size placeholder blob that ends with '/'. This placeholder blob's isDirectory() method returns false,
// but we don't want the overwrite check to fail on it. So we explicitly ignore the check for these 0 size
// placeholder blobs.
if (destBlob != null && !destBlob.getName().endsWith("/") && destBlob.getSize() != 0) {
throw new IllegalArgumentException(String.format("%s already exists.", toPath(destBlobID)));
String errorReason = String.format("%s already exists.", toPath(destBlobID));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorReason, ErrorType.USER, true, null);
}
}
copyList.add(new BlobPair(sourceBlob, destBlobID));
Expand Down Expand Up @@ -347,8 +406,15 @@ static String append(String base, String part) {
* @param consumer the blob consumer
*/
private void traverse(BlobId blobId, boolean recursive, Consumer<Blob> consumer) {
Page<Blob> blobList = storage.list(blobId.getBucket(), Storage.BlobListOption.currentDirectory(),
Storage.BlobListOption.prefix(blobId.getName()));
Page<Blob> blobList;
try {
blobList = storage.list(blobId.getBucket(), Storage.BlobListOption.currentDirectory(),
Storage.BlobListOption.prefix(blobId.getName()));
} catch (Exception e) {
String errorReason = String.format("");
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
}
for (Blob blob : blobList.iterateAll()) {
if (!blob.isDirectory()) {
consumer.accept(blob);
Expand All @@ -363,11 +429,17 @@ private static String toPath(BlobId blobId) {
}

public static StorageClient create(String project, @Nullable String serviceAccount,
Boolean isServiceAccountFilePath, @Nullable Integer readTimeout)
throws IOException {
Boolean isServiceAccountFilePath, @Nullable Integer readTimeout) {
StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(project);
if (serviceAccount != null) {
builder.setCredentials(GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
try {
builder.setCredentials(GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
} catch (IOException e) {
String errorReason = "Unable to load service account credentials.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()),
ErrorType.UNKNOWN, false, null);
}
}
if (readTimeout != null) {
builder.setTransportOptions(HttpTransportOptions.newBuilder().setReadTimeout(readTimeout * 1000).build());
Expand All @@ -376,7 +448,7 @@ public static StorageClient create(String project, @Nullable String serviceAccou
return new StorageClient(storage);
}

public static StorageClient create(GCPConnectorConfig config) throws IOException {
public static StorageClient create(GCPConnectorConfig config) {
return create(config.getProject(), config.getServiceAccount(), config.isServiceAccountFilePath(), null);
}

Expand Down
Loading