diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java index aa23fd5f66..500cbf39c3 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java @@ -48,8 +48,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.function.Function; import javax.annotation.Nullable; /** @@ -100,9 +103,6 @@ public void run(ActionContext context) throws Exception { // Enable legacy SQL builder.setUseLegacySql(config.isLegacySQL()); - // Location must match that of the dataset(s) referenced in the query. - JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); - // API request - starts the query. Credentials credentials = config.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), @@ -126,13 +126,17 @@ public void run(ActionContext context) throws Exception { QueryJobConfiguration queryConfig = builder.build(); - Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); - LOG.info("Executing SQL as job {}.", jobId.getJob()); - LOG.debug("The BigQuery SQL is {}", config.getSql()); + // Setting external retry strategy for BigQuery client due to BigQuery Client not retrying when a job clashes + // with another job, due to error being 400. - // Wait for the query to complete - queryJob.waitFor(); + final String retryableStringPattern = "Retrying the job with back-off"; + List> retryRules = new ArrayList<>(); + retryRules.add( + (BigQueryException e) -> !((e.getCode() == 400) + && (e.getMessage().contains(retryableStringPattern) || e.getReason().contains(retryableStringPattern))) + ); + Job queryJob = executeQueryJobWithCustomRetry(bigQuery, queryConfig, retryRules); // Check for errors if (queryJob.getStatus().getError() != null) { @@ -169,6 +173,47 @@ public void run(ActionContext context) throws Exception { context.getMetrics().gauge(RECORDS_PROCESSED, rows); } + /** + * Executes Query with added retry rules following: + * https://cloud.google.com/bigquery/sla + */ + private Job executeQueryJobWithCustomRetry(BigQuery bigQuery, QueryJobConfiguration queryConfig, + List> retryRules) throws Exception { + // The longest amount of time to wait in-between retries. + final int maximum_backoff = 32; + + // The maximum number of retries. + final int max_retries = 20; + + int retries = 0; + + while (true) { + try { + // Location must match that of the dataset(s) referenced in the query. + JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build(); + Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + LOG.info("Executing SQL as job {}.", jobId.getJob()); + LOG.debug("The BigQuery SQL is {}", config.getSql()); + + // Wait for the query to complete + queryJob.waitFor(); + return queryJob; + } catch (BigQueryException bigQueryException) { + if (retries >= max_retries) { + LOG.error("Run out of retries while executing query with backoff."); + throw bigQueryException; + } + if (retryRules.stream().noneMatch((f -> f.apply(bigQueryException)))) { + throw bigQueryException; + } + LOG.warn("Received {} error from BigQuery, retrying...", bigQueryException.getMessage()); + long sleep_time = Math.round((Math.min(Math.pow(2, retries), maximum_backoff) + Math.random()) * 1000); + Thread.sleep(sleep_time); + retries += 1; + } + } + } + @Override public AbstractBigQueryActionConfig getConfig() { return config;