Skip to content

Commit

Permalink
Merge pull request #1482 from data-integrations/PLUGIN-1807-5
Browse files Browse the repository at this point in the history
[PLUGIN-1807] Fix error messages in GCS & BQ
  • Loading branch information
itsankit-google authored Jan 10, 2025
2 parents 3fa80a2 + 2165056 commit b8ca08a
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
Expand All @@ -43,6 +46,7 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,18 +94,32 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
Credentials credentials = serviceAccount == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
String project = config.getProject();
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
FailureCollector collector = context.getFailureCollector();
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
baseConfiguration = getBaseConfiguration(cmekKeyName);

// Get required dataset ID and dataset instance (if it exists)
DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset());
Dataset dataset = bigQuery.getDataset(datasetId);
Dataset dataset;
try {
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
dataset = bigQuery.getDataset(datasetId);
} catch (Exception e) {
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.WRITING));
throw ex == null ? e : ex;
}

// Get the required bucket name and bucket instance (if it exists)
Storage storage = GCPUtils.getStorage(project, credentials);
Storage storage;
try {
storage = GCPUtils.getStorage(project, credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.WRITING));
throw ex == null ? e : ex;
}
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
dataset, config.getBucket());
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullabl
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.BQ_SUPPORTED_DOC_URL);
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
GCPUtils.BQ_SUPPORTED_DOC_URL, e);
}
Expand Down Expand Up @@ -249,8 +251,10 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.GCS_SUPPORTED_DOC_URL);
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
GCPUtils.GCS_SUPPORTED_DOC_URL, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand All @@ -48,7 +49,9 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
Expand All @@ -60,6 +63,7 @@
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
Expand Down Expand Up @@ -147,9 +151,16 @@ public void prepareRun(BatchSourceContext context) throws Exception {
collector.getOrThrowException();
}

BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
BigQuery bigQuery;
Dataset dataset;
try {
bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
} catch (Exception e) {
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}

// Get Configuration for this run
bucketPath = UUID.randomUUID().toString();
Expand All @@ -169,10 +180,18 @@ public void prepareRun(BatchSourceContext context) throws Exception {
dataset, config.getBucket());

// Configure GCS Bucket to use
Storage storage;
try {
storage = GCPUtils.getStorage(config.getProject(), credentials);;
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
String bucket = null;
try {
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath,
cmekKeyName);
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
bucketPath, cmekKeyName);
} catch (Exception e) {
String errorReason = "Failed to create bucket.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* A custom ErrorDetailsProvider for GCP plugins.
*/
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";

/**
* Get a ProgramFailureException with the given error
Expand Down Expand Up @@ -71,12 +72,12 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";

String errorMessage = e.getMessage();
String externalDocumentationLink = null;
Expand All @@ -95,7 +96,8 @@ private ProgramFailureException getProgramFailureException(HttpResponseException
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage),
errorReason, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage),
pair.getErrorType(), true, ErrorCodeType.HTTP, statusCode.toString(),
externalDocumentationLink, e);
}
Expand All @@ -122,11 +124,12 @@ private String getErrorMessage(GoogleJsonResponseException exception) {
* @param e The IllegalArgumentException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(IllegalArgumentException e,
ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e);
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage), ErrorType.USER, false, e);
}

/**
Expand All @@ -136,11 +139,12 @@ private ProgramFailureException getProgramFailureException(IllegalArgumentExcept
* @param e The IllegalStateException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(IllegalStateException e,
ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e);
}

/**
Expand Down
18 changes: 14 additions & 4 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
Expand Down Expand Up @@ -138,10 +141,14 @@ public void prepareRun(BatchSinkContext context) throws Exception {
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
Bucket bucket;
String location = null;
try {
Expand All @@ -153,6 +160,9 @@ public void prepareRun(BatchSinkContext context) throws Exception {
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
}
} catch (StorageException e) {
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
Expand Down
20 changes: 15 additions & 5 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.api.plugin.PluginProperties;
Expand All @@ -41,7 +42,9 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
Expand Down Expand Up @@ -126,7 +129,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
}

@Override
public void prepareRun(BatchSinkContext context) throws IOException, InstantiationException {
public void prepareRun(BatchSinkContext context) throws Exception {
FailureCollector collector = context.getFailureCollector();
config.validate(collector, context.getArguments().asMap());
collector.getOrThrowException();
Expand Down Expand Up @@ -156,15 +159,22 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
try {
if (storage.get(bucketName) == null) {
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
}
} catch (StorageException e) {
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.exception.ErrorPhase;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
Expand Down Expand Up @@ -118,7 +121,14 @@ public void prepareRun(BatchSourceContext context) throws Exception {
collector.getOrThrowException();
}

Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
Storage storage;
try {
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
} catch (Exception e) {
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
new ErrorContext(ErrorPhase.READING));
throw ex == null ? e : ex;
}
String location = null;
try {
// Get location of the source for lineage
Expand Down

0 comments on commit b8ca08a

Please sign in to comment.