diff --git a/pom.xml b/pom.xml
index 4dd31361..32916987 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
2.1.3
2.10.0
3.8.1
- 53.0.0
+ 61.0.0
4.0.0
4.7.2
2.23.0
diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java
index 0b841087..1e8397d1 100644
--- a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java
+++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java
@@ -32,6 +32,7 @@
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.SessionRenewer;
+import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
@@ -68,6 +69,7 @@ public final class SalesforceBulkUtil {
public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum,
@Nullable String externalIdField,
ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException {
+ BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
JobInfo job = new JobInfo();
job.setObject(sObject);
job.setOperation(operationEnum);
@@ -77,10 +79,10 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O
job.setExternalIdFieldName(externalIdField);
}
- job = bulkConnection.createJob(job);
+ job = bulkConnectionRetryWrapper.createJob(job);
Preconditions.checkState(job.getId() != null, "Couldn't get job ID. There was a problem in creating the " +
"batch job");
- return bulkConnection.getJobStatus(job.getId());
+ return bulkConnectionRetryWrapper.getJobStatus(job.getId());
}
/**
@@ -91,10 +93,11 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O
* @throws AsyncApiException if there is an issue creating the job
*/
public static void closeJob(BulkConnection bulkConnection, String jobId) throws AsyncApiException {
+ BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
JobInfo job = new JobInfo();
job.setId(jobId);
job.setState(JobStateEnum.Closed);
- bulkConnection.updateJob(job);
+ bulkConnectionRetryWrapper.updateJob(job);
}
/**
@@ -110,13 +113,13 @@ public static void closeJob(BulkConnection bulkConnection, String jobId) throws
public static void checkResults(BulkConnection bulkConnection, JobInfo job,
List batchInfoList, boolean ignoreFailures)
throws AsyncApiException, IOException {
-
+ BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
for (BatchInfo batchInfo : batchInfoList) {
/*
The response is a CSV with the following headers:
Id,Success,Created,Error
*/
- CSVReader rdr = new CSVReader(bulkConnection.getBatchResultStream(job.getId(), batchInfo.getId()));
+ CSVReader rdr = new CSVReader(bulkConnectionRetryWrapper.getBatchResultStream(job.getId(), batchInfo.getId()));
List resultHeader = rdr.nextRecord();
int successRowId = resultHeader.indexOf("Success");
int errorRowId = resultHeader.indexOf("Error");
@@ -148,6 +151,7 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job,
*/
public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
List batchInfoList, boolean ignoreFailures) {
+ BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
Set incomplete = batchInfoList
.stream()
.map(BatchInfo::getId)
@@ -164,7 +168,7 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
.until(() -> {
try {
BatchInfo[] statusList =
- bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
+ bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo();
for (BatchInfo b : statusList) {
if (b.getState() == BatchStateEnum.Failed) {
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java
index bbf876d8..7fa6883a 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java
@@ -28,13 +28,12 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
-import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
+import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
-
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
@@ -261,10 +260,11 @@ public InputStream getQueryResultStream(BulkConnection bulkConnection)
*/
private String[] waitForBatchResults(BulkConnection bulkConnection)
throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException {
+ BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
BatchInfo info = null;
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) {
try {
- info = bulkConnection.getBatchInfo(jobId, batchId);
+ info = bulkConnectionRetryWrapper.getBatchInfo(jobId, batchId);
} catch (AsyncApiException e) {
if (i == SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES - 1) {
throw e;
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java
new file mode 100644
index 00000000..1337ebe5
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.plugin.salesforce.plugin.source.batch.util;
+
+import com.sforce.async.AsyncApiException;
+import com.sforce.async.BatchInfo;
+import com.sforce.async.BatchInfoList;
+import com.sforce.async.BulkConnection;
+import com.sforce.async.JobInfo;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.time.Duration;
+
+/**
+ * BulkConnectionRetryWrapper class to retry all the salesforce api calls in case of failure.
+ */
+public class BulkConnectionRetryWrapper {
+
+ private final BulkConnection bulkConnection;
+ private final RetryPolicy retryPolicy;
+ private static final Logger LOG = LoggerFactory.getLogger(BulkConnectionRetryWrapper.class);
+
+ public BulkConnectionRetryWrapper(BulkConnection bulkConnection) {
+ this.bulkConnection = bulkConnection;
+ this.retryPolicy = getRetryPolicy(5L, 80L, 5);
+ }
+
+ public JobInfo createJob(JobInfo jobInfo) {
+ Object resultJobInfo = Failsafe.with(retryPolicy)
+ .onFailure(event -> LOG.info("Failed while creating job"))
+ .get(() -> bulkConnection.createJob(jobInfo));
+ return (JobInfo) resultJobInfo;
+ }
+
+ public JobInfo getJobStatus(String jobId) {
+ Object resultJobInfo = Failsafe.with(retryPolicy)
+ .onFailure(event -> LOG.info("Failed while getting job status"))
+ .get(() -> bulkConnection.getJobStatus(jobId));
+ return (JobInfo) resultJobInfo;
+ }
+
+ public void updateJob(JobInfo jobInfo) {
+ Failsafe.with(retryPolicy)
+ .onFailure(event -> LOG.info("Failed while updating job."))
+ .get(() -> bulkConnection.updateJob(jobInfo));
+ }
+
+ public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException {
+ Object batchInfoList = Failsafe.with(retryPolicy)
+ .onFailure(event -> LOG.info("Failed while getting batch info list"))
+ .get(() -> bulkConnection.getBatchInfoList(jobId));
+ return (BatchInfoList) batchInfoList;
+ }
+
+ public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException {
+ Object batchInfo = Failsafe.with(retryPolicy)
+ .onFailure(event -> LOG.info("Failed while getting batc status"))
+ .get(() -> bulkConnection.getBatchInfo(jobId, batchId));
+ return (BatchInfo) batchInfo;
+ }
+
+ public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException {
+ Object inputStream = Failsafe.with(retryPolicy)
+ .onFailure(event -> LOG.info("Failed while getting batch result stream"))
+ .get(() -> bulkConnection.getBatchResultStream(jobId, batchId));
+ return (InputStream) inputStream;
+ }
+
+ public static RetryPolicy