From 2165056445410a5661d2463ef985c249d5b5aee3 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Fri, 10 Jan 2025 06:18:09 +0000 Subject: [PATCH] Fix error messages in GCS & BQ --- .../bigquery/sink/AbstractBigQuerySink.java | 24 +++++++++++++-- .../gcp/bigquery/sink/BigQuerySinkUtils.java | 8 +++-- .../gcp/bigquery/source/BigQuerySource.java | 29 +++++++++++++++---- .../gcp/common/GCPErrorDetailsProvider.java | 26 ++++++++++------- .../plugin/gcp/gcs/sink/GCSBatchSink.java | 18 +++++++++--- .../gcp/gcs/sink/GCSMultiBatchSink.java | 20 +++++++++---- .../cdap/plugin/gcp/gcs/source/GCSSource.java | 12 +++++++- 7 files changed, 106 insertions(+), 31 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index b2e309f05..9cc580ed5 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -30,11 +30,14 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.plugin.common.Asset; import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider; import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; @@ -43,6 +46,7 @@ import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; import io.cdap.plugin.gcp.common.GCPUtils; +import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; @@ -90,7 +94,6 @@ public final void prepareRun(BatchSinkContext context) throws Exception { Credentials credentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath()); String project = config.getProject(); - bigQuery = GCPUtils.getBigQuery(project, credentials, null); FailureCollector collector = context.getFailureCollector(); CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); collector.getOrThrowException(); @@ -98,10 +101,25 @@ public final void prepareRun(BatchSinkContext context) throws Exception { // Get required dataset ID and dataset instance (if it exists) DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset()); - Dataset dataset = bigQuery.getDataset(datasetId); + Dataset dataset; + try { + bigQuery = GCPUtils.getBigQuery(project, credentials, null); + dataset = bigQuery.getDataset(datasetId); + } catch (Exception e) { + ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e, + new ErrorContext(ErrorPhase.WRITING)); + throw ex == null ? e : ex; + } // Get the required bucket name and bucket instance (if it exists) - Storage storage = GCPUtils.getStorage(project, credentials); + Storage storage; + try { + storage = GCPUtils.getStorage(project, credentials);; + } catch (Exception e) { + ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e, + new ErrorContext(ErrorPhase.WRITING)); + throw ex == null ? e : ex; + } String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(), dataset, config.getBucket()); bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString()); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java index ca1d29ecf..07d5b8943 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java @@ -199,8 +199,10 @@ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullabl ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode()); String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction(), GCPUtils.BQ_SUPPORTED_DOC_URL); + String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(), + e.getClass().getName(), e.getMessage()); throw ErrorUtils.getProgramFailureException( - new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(), + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal, pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()), GCPUtils.BQ_SUPPORTED_DOC_URL, e); } @@ -249,8 +251,10 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode()); String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction(), GCPUtils.GCS_SUPPORTED_DOC_URL); + String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(), + e.getClass().getName(), e.getMessage()); throw ErrorUtils.getProgramFailureException( - new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(), + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal, pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()), GCPUtils.GCS_SUPPORTED_DOC_URL, e); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index 77e63ee40..a115b642f 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -39,6 +39,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -48,7 +49,9 @@ import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput; +import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; @@ -60,6 +63,7 @@ import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; import io.cdap.plugin.gcp.common.GCPUtils; +import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider; import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; @@ -147,9 +151,16 @@ public void prepareRun(BatchSourceContext context) throws Exception { collector.getOrThrowException(); } - BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null); - Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset())); - Storage storage = GCPUtils.getStorage(config.getProject(), credentials); + BigQuery bigQuery; + Dataset dataset; + try { + bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null); + dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset())); + } catch (Exception e) { + ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e, + new ErrorContext(ErrorPhase.READING)); + throw ex == null ? e : ex; + } // Get Configuration for this run bucketPath = UUID.randomUUID().toString(); @@ -169,10 +180,18 @@ public void prepareRun(BatchSourceContext context) throws Exception { dataset, config.getBucket()); // Configure GCS Bucket to use + Storage storage; + try { + storage = GCPUtils.getStorage(config.getProject(), credentials);; + } catch (Exception e) { + ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e, + new ErrorContext(ErrorPhase.READING)); + throw ex == null ? e : ex; + } String bucket = null; try { - bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath, - cmekKeyName); + bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, + bucketPath, cmekKeyName); } catch (Exception e) { String errorReason = "Failed to create bucket."; collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java index 3b962610e..bf034c1f6 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java @@ -36,6 +36,7 @@ * A custom ErrorDetailsProvider for GCP plugins. */ public class GCPErrorDetailsProvider implements ErrorDetailsProvider { + private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s"; /** * Get a ProgramFailureException with the given error @@ -71,12 +72,12 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err * @param e The HttpResponseException to get the error information from. * @return A ProgramFailureException with the given error information. */ - private ProgramFailureException getProgramFailureException(HttpResponseException e, ErrorContext errorContext) { + private ProgramFailureException getProgramFailureException(HttpResponseException e, + ErrorContext errorContext) { Integer statusCode = e.getStatusCode(); ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(), pair.getCorrectiveAction()); - String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; String errorMessage = e.getMessage(); String externalDocumentationLink = null; @@ -95,7 +96,8 @@ private ProgramFailureException getProgramFailureException(HttpResponseException } return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), - errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), + errorReason, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), + e.getClass().getName(), errorMessage), pair.getErrorType(), true, ErrorCodeType.HTTP, statusCode.toString(), externalDocumentationLink, e); } @@ -122,11 +124,12 @@ private String getErrorMessage(GoogleJsonResponseException exception) { * @param e The IllegalArgumentException to get the error information from. * @return A ProgramFailureException with the given error information. */ - private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) { + private ProgramFailureException getProgramFailureException(IllegalArgumentException e, + ErrorContext errorContext) { String errorMessage = e.getMessage(); - String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; - return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage, - String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), + e.getClass().getName(), errorMessage), ErrorType.USER, false, e); } /** @@ -136,11 +139,12 @@ private ProgramFailureException getProgramFailureException(IllegalArgumentExcept * @param e The IllegalStateException to get the error information from. * @return A ProgramFailureException with the given error information. */ - private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { + private ProgramFailureException getProgramFailureException(IllegalStateException e, + ErrorContext errorContext) { String errorMessage = e.getMessage(); - String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; - return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage, - String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), + errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), + e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e); } /** diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java index 6789676e3..998f31c1e 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java @@ -32,6 +32,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -39,7 +40,9 @@ import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; @@ -138,10 +141,14 @@ public void prepareRun(BatchSinkContext context) throws Exception { } String bucketName = config.getBucket(collector); - Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); - String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; - String correctiveAction = "Ensure you entered the correct bucket path and " - + "have permissions for it."; + Storage storage; + try { + storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + } catch (Exception e) { + ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e, + new ErrorContext(ErrorPhase.READING)); + throw ex == null ? e : ex; + } Bucket bucket; String location = null; try { @@ -153,6 +160,9 @@ public void prepareRun(BatchSinkContext context) throws Exception { GCPUtils.createBucket(storage, bucketName, location, cmekKeyName); } } catch (StorageException e) { + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; String errorReason = String.format(errorReasonFormat, e.getCode()); collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) .withStacktrace(e.getStackTrace()); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java index 33310421e..ee7bfabc3 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java @@ -32,6 +32,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.api.plugin.InvalidPluginConfigException; import io.cdap.cdap.api.plugin.InvalidPluginProperty; import io.cdap.cdap.api.plugin.PluginProperties; @@ -41,7 +42,9 @@ import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; import io.cdap.plugin.format.FileFormat; @@ -126,7 +129,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { } @Override - public void prepareRun(BatchSinkContext context) throws IOException, InstantiationException { + public void prepareRun(BatchSinkContext context) throws Exception { FailureCollector collector = context.getFailureCollector(); config.validate(collector, context.getArguments().asMap()); collector.getOrThrowException(); @@ -156,15 +159,22 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati } String bucketName = config.getBucket(collector); - Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); - String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; - String correctiveAction = "Ensure you entered the correct bucket path and " - + "have permissions for it."; + Storage storage; + try { + storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + } catch (Exception e) { + ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e, + new ErrorContext(ErrorPhase.READING)); + throw ex == null ? e : ex; + } try { if (storage.get(bucketName) == null) { GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName); } } catch (StorageException e) { + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; + String correctiveAction = "Ensure you entered the correct bucket path and " + + "have permissions for it."; String errorReason = String.format(errorReasonFormat, e.getCode()); collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) .withStacktrace(e.getStackTrace()); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java index 4287b2a69..8a805e646 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java @@ -28,12 +28,15 @@ import io.cdap.cdap.api.annotation.MetadataProperty; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; @@ -118,7 +121,14 @@ public void prepareRun(BatchSourceContext context) throws Exception { collector.getOrThrowException(); } - Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + Storage storage; + try { + storage = GCPUtils.getStorage(config.connection.getProject(), credentials); + } catch (Exception e) { + ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e, + new ErrorContext(ErrorPhase.READING)); + throw ex == null ? e : ex; + } String location = null; try { // Get location of the source for lineage