From ad10954d1c5d25d25450699036e29d141ed0e679 Mon Sep 17 00:00:00 2001 From: vikasrathee-cs Date: Thu, 8 Aug 2024 20:43:17 +0530 Subject: [PATCH] bump up to 1.6.3 for release --- pom.xml | 2 +- .../plugin/salesforce/SalesforceBulkUtil.java | 16 +-- .../batch/SalesforceBulkRecordReader.java | 6 +- .../util/BulkConnectionRetryWrapper.java | 100 ++++++++++++++++++ .../batch/util/SalesforceSplitUtil.java | 13 +-- 5 files changed, 121 insertions(+), 16 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java diff --git a/pom.xml b/pom.xml index 4dd31361..32916987 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ 2.1.3 2.10.0 3.8.1 - 53.0.0 + 61.0.0 4.0.0 4.7.2 2.23.0 diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java index 0b841087..1e8397d1 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java @@ -32,6 +32,7 @@ import com.sforce.ws.ConnectionException; import com.sforce.ws.ConnectorConfig; import com.sforce.ws.SessionRenewer; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -68,6 +69,7 @@ public final class SalesforceBulkUtil { public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum, @Nullable String externalIdField, ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException { + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); JobInfo job = new JobInfo(); job.setObject(sObject); job.setOperation(operationEnum); @@ -77,10 +79,10 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O job.setExternalIdFieldName(externalIdField); } - job = bulkConnection.createJob(job); + job = bulkConnectionRetryWrapper.createJob(job); Preconditions.checkState(job.getId() != null, "Couldn't get job ID. There was a problem in creating the " + "batch job"); - return bulkConnection.getJobStatus(job.getId()); + return bulkConnectionRetryWrapper.getJobStatus(job.getId()); } /** @@ -91,10 +93,11 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O * @throws AsyncApiException if there is an issue creating the job */ public static void closeJob(BulkConnection bulkConnection, String jobId) throws AsyncApiException { + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); JobInfo job = new JobInfo(); job.setId(jobId); job.setState(JobStateEnum.Closed); - bulkConnection.updateJob(job); + bulkConnectionRetryWrapper.updateJob(job); } /** @@ -110,13 +113,13 @@ public static void closeJob(BulkConnection bulkConnection, String jobId) throws public static void checkResults(BulkConnection bulkConnection, JobInfo job, List batchInfoList, boolean ignoreFailures) throws AsyncApiException, IOException { - + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); for (BatchInfo batchInfo : batchInfoList) { /* The response is a CSV with the following headers: Id,Success,Created,Error */ - CSVReader rdr = new CSVReader(bulkConnection.getBatchResultStream(job.getId(), batchInfo.getId())); + CSVReader rdr = new CSVReader(bulkConnectionRetryWrapper.getBatchResultStream(job.getId(), batchInfo.getId())); List resultHeader = rdr.nextRecord(); int successRowId = resultHeader.indexOf("Success"); int errorRowId = resultHeader.indexOf("Error"); @@ -148,6 +151,7 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job, */ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job, List batchInfoList, boolean ignoreFailures) { + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); Set incomplete = batchInfoList .stream() .map(BatchInfo::getId) @@ -164,7 +168,7 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job, .until(() -> { try { BatchInfo[] statusList = - bulkConnection.getBatchInfoList(job.getId()).getBatchInfo(); + bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo(); for (BatchInfo b : statusList) { if (b.getState() == BatchStateEnum.Failed) { diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java index bbf876d8..7fa6883a 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java @@ -28,13 +28,12 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.salesforce.BulkAPIBatchException; import io.cdap.plugin.salesforce.SalesforceConnectionUtil; -import io.cdap.plugin.salesforce.SalesforceConstants; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; - import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; @@ -261,10 +260,11 @@ public InputStream getQueryResultStream(BulkConnection bulkConnection) */ private String[] waitForBatchResults(BulkConnection bulkConnection) throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException { + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); BatchInfo info = null; for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) { try { - info = bulkConnection.getBatchInfo(jobId, batchId); + info = bulkConnectionRetryWrapper.getBatchInfo(jobId, batchId); } catch (AsyncApiException e) { if (i == SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES - 1) { throw e; diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java new file mode 100644 index 00000000..1337ebe5 --- /dev/null +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java @@ -0,0 +1,100 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.salesforce.plugin.source.batch.util; + +import com.sforce.async.AsyncApiException; +import com.sforce.async.BatchInfo; +import com.sforce.async.BatchInfoList; +import com.sforce.async.BulkConnection; +import com.sforce.async.JobInfo; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.time.Duration; + +/** + * BulkConnectionRetryWrapper class to retry all the salesforce api calls in case of failure. + */ +public class BulkConnectionRetryWrapper { + + private final BulkConnection bulkConnection; + private final RetryPolicy retryPolicy; + private static final Logger LOG = LoggerFactory.getLogger(BulkConnectionRetryWrapper.class); + + public BulkConnectionRetryWrapper(BulkConnection bulkConnection) { + this.bulkConnection = bulkConnection; + this.retryPolicy = getRetryPolicy(5L, 80L, 5); + } + + public JobInfo createJob(JobInfo jobInfo) { + Object resultJobInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while creating job")) + .get(() -> bulkConnection.createJob(jobInfo)); + return (JobInfo) resultJobInfo; + } + + public JobInfo getJobStatus(String jobId) { + Object resultJobInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting job status")) + .get(() -> bulkConnection.getJobStatus(jobId)); + return (JobInfo) resultJobInfo; + } + + public void updateJob(JobInfo jobInfo) { + Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while updating job.")) + .get(() -> bulkConnection.updateJob(jobInfo)); + } + + public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException { + Object batchInfoList = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch info list")) + .get(() -> bulkConnection.getBatchInfoList(jobId)); + return (BatchInfoList) batchInfoList; + } + + public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException { + Object batchInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batc status")) + .get(() -> bulkConnection.getBatchInfo(jobId, batchId)); + return (BatchInfo) batchInfo; + } + + public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException { + Object inputStream = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch result stream")) + .get(() -> bulkConnection.getBatchResultStream(jobId, batchId)); + return (InputStream) inputStream; + } + + public static RetryPolicy getRetryPolicy(Long initialRetryDuration, Long maxRetryDuration, + Integer maxRetryCount) { + // Exponential backoff with initial retry of 5 seconds and max retry of 80 seconds. + return RetryPolicy.builder() + .handle(AsyncApiException.class) + .withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration), 2) + .withMaxRetries(maxRetryCount) + .onRetry(event -> LOG.info("Retrying Salesforce Bulk Query. Retry count: {}", event + .getAttemptCount())) + .onSuccess(event -> LOG.debug("Salesforce api call executed successfully.")) + .onRetriesExceeded(event -> LOG.error("Retry limit reached for Salesforce Bulk Query.")) + .build(); + } + +} diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index 6223c292..39510e34 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -119,7 +119,7 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) throws AsyncApiException, IOException, InterruptedException { - + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query); JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), getOperationEnum(operation), null, ConcurrencyMode.Parallel, ContentType.CSV); @@ -139,8 +139,8 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu return waitForBatchChunks(bulkConnection, job.getId(), batchInfo.getId()); } LOG.debug("PKChunking is not enabled"); - BatchInfo[] batchInfos = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo(); - LOG.info("Job id {}, status: {}", job.getId(), bulkConnection.getJobStatus(job.getId()).getState()); + BatchInfo[] batchInfos = bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo(); + LOG.info("Job id {}, status: {}", job.getId(), bulkConnectionRetryWrapper.getJobStatus(job.getId()).getState()); if (batchInfos.length > 0) { LOG.info("Batch size {}, state {}", batchInfos.length, batchInfos[0].getState()); } @@ -201,14 +201,15 @@ public static BulkConnection getBulkConnection(AuthenticatorCredentials authenti private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, String jobId, String initialBatchId) throws AsyncApiException { BatchInfo initialBatchInfo = null; + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection); for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) { //check if the job is aborted - if (bulkConnection.getJobStatus(jobId).getState() == JobStateEnum.Aborted) { + if (bulkConnectionRetryWrapper.getJobStatus(jobId).getState() == JobStateEnum.Aborted) { LOG.info(String.format("Job with Id: '%s' is aborted", jobId)); return new BatchInfo[0]; } try { - initialBatchInfo = bulkConnection.getBatchInfo(jobId, initialBatchId); + initialBatchInfo = bulkConnectionRetryWrapper.getBatchInfo(jobId, initialBatchId); } catch (AsyncApiException e) { if (i == SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES - 1) { throw e; @@ -218,7 +219,7 @@ private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, Str } if (initialBatchInfo.getState() == BatchStateEnum.NotProcessed) { - BatchInfo[] result = bulkConnection.getBatchInfoList(jobId).getBatchInfo(); + BatchInfo[] result = bulkConnectionRetryWrapper.getBatchInfoList(jobId).getBatchInfo(); return Arrays.stream(result).filter(batchInfo -> batchInfo.getState() != BatchStateEnum.NotProcessed) .toArray(BatchInfo[]::new); } else if (initialBatchInfo.getState() == BatchStateEnum.Failed) {