Skip to content

Commit

Permalink
bump up to 1.6.3 for release
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs committed Aug 8, 2024
1 parent 2ff505f commit ad10954
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<spark2.version>2.1.3</spark2.version>
<hydrator.version>2.10.0</hydrator.version>
<commons.version>3.8.1</commons.version>
<salesforce.api.version>53.0.0</salesforce.api.version>
<salesforce.api.version>61.0.0</salesforce.api.version>
<cometd.java.client.version>4.0.0</cometd.java.client.version>
<antlr.version>4.7.2</antlr.version>
<mockito.version>2.23.0</mockito.version>
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.SessionRenewer;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand Down Expand Up @@ -68,6 +69,7 @@ public final class SalesforceBulkUtil {
public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum,
@Nullable String externalIdField,
ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException {
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
JobInfo job = new JobInfo();
job.setObject(sObject);
job.setOperation(operationEnum);
Expand All @@ -77,10 +79,10 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O
job.setExternalIdFieldName(externalIdField);
}

job = bulkConnection.createJob(job);
job = bulkConnectionRetryWrapper.createJob(job);
Preconditions.checkState(job.getId() != null, "Couldn't get job ID. There was a problem in creating the " +
"batch job");
return bulkConnection.getJobStatus(job.getId());
return bulkConnectionRetryWrapper.getJobStatus(job.getId());
}

/**
Expand All @@ -91,10 +93,11 @@ public static JobInfo createJob(BulkConnection bulkConnection, String sObject, O
* @throws AsyncApiException if there is an issue creating the job
*/
public static void closeJob(BulkConnection bulkConnection, String jobId) throws AsyncApiException {
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
JobInfo job = new JobInfo();
job.setId(jobId);
job.setState(JobStateEnum.Closed);
bulkConnection.updateJob(job);
bulkConnectionRetryWrapper.updateJob(job);
}

/**
Expand All @@ -110,13 +113,13 @@ public static void closeJob(BulkConnection bulkConnection, String jobId) throws
public static void checkResults(BulkConnection bulkConnection, JobInfo job,
List<BatchInfo> batchInfoList, boolean ignoreFailures)
throws AsyncApiException, IOException {

BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
for (BatchInfo batchInfo : batchInfoList) {
/*
The response is a CSV with the following headers:
Id,Success,Created,Error
*/
CSVReader rdr = new CSVReader(bulkConnection.getBatchResultStream(job.getId(), batchInfo.getId()));
CSVReader rdr = new CSVReader(bulkConnectionRetryWrapper.getBatchResultStream(job.getId(), batchInfo.getId()));
List<String> resultHeader = rdr.nextRecord();
int successRowId = resultHeader.indexOf("Success");
int errorRowId = resultHeader.indexOf("Error");
Expand Down Expand Up @@ -148,6 +151,7 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job,
*/
public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
List<BatchInfo> batchInfoList, boolean ignoreFailures) {
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
Set<String> incomplete = batchInfoList
.stream()
.map(BatchInfo::getId)
Expand All @@ -164,7 +168,7 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
.until(() -> {
try {
BatchInfo[] statusList =
bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo();

for (BatchInfo b : statusList) {
if (b.getState() == BatchStateEnum.Failed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
Expand Down Expand Up @@ -261,10 +260,11 @@ public InputStream getQueryResultStream(BulkConnection bulkConnection)
*/
private String[] waitForBatchResults(BulkConnection bulkConnection)
throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException {
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
BatchInfo info = null;
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) {
try {
info = bulkConnection.getBatchInfo(jobId, batchId);
info = bulkConnectionRetryWrapper.getBatchInfo(jobId, batchId);
} catch (AsyncApiException e) {
if (i == SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES - 1) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.salesforce.plugin.source.batch.util;

import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BulkConnection;
import com.sforce.async.JobInfo;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.time.Duration;

/**
* BulkConnectionRetryWrapper class to retry all the salesforce api calls in case of failure.
*/
public class BulkConnectionRetryWrapper {

private final BulkConnection bulkConnection;
private final RetryPolicy retryPolicy;
private static final Logger LOG = LoggerFactory.getLogger(BulkConnectionRetryWrapper.class);

public BulkConnectionRetryWrapper(BulkConnection bulkConnection) {
this.bulkConnection = bulkConnection;
this.retryPolicy = getRetryPolicy(5L, 80L, 5);
}

public JobInfo createJob(JobInfo jobInfo) {
Object resultJobInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while creating job"))
.get(() -> bulkConnection.createJob(jobInfo));
return (JobInfo) resultJobInfo;
}

public JobInfo getJobStatus(String jobId) {
Object resultJobInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting job status"))
.get(() -> bulkConnection.getJobStatus(jobId));
return (JobInfo) resultJobInfo;
}

public void updateJob(JobInfo jobInfo) {
Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while updating job."))
.get(() -> bulkConnection.updateJob(jobInfo));
}

public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException {
Object batchInfoList = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batch info list"))
.get(() -> bulkConnection.getBatchInfoList(jobId));
return (BatchInfoList) batchInfoList;
}

public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException {
Object batchInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batc status"))
.get(() -> bulkConnection.getBatchInfo(jobId, batchId));
return (BatchInfo) batchInfo;
}

public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException {
Object inputStream = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batch result stream"))
.get(() -> bulkConnection.getBatchResultStream(jobId, batchId));
return (InputStream) inputStream;
}

public static RetryPolicy<Object> getRetryPolicy(Long initialRetryDuration, Long maxRetryDuration,
Integer maxRetryCount) {
// Exponential backoff with initial retry of 5 seconds and max retry of 80 seconds.
return RetryPolicy.builder()
.handle(AsyncApiException.class)
.withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration), 2)
.withMaxRetries(maxRetryCount)
.onRetry(event -> LOG.info("Retrying Salesforce Bulk Query. Retry count: {}", event
.getAttemptCount()))
.onSuccess(event -> LOG.debug("Salesforce api call executed successfully."))
.onRetriesExceeded(event -> LOG.error("Retry limit reached for Salesforce Bulk Query."))
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu
Long initialRetryDuration, Long maxRetryDuration,
Integer maxRetryCount, Boolean retryOnBackendError)
throws AsyncApiException, IOException, InterruptedException {

BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query);
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), getOperationEnum(operation),
null, ConcurrencyMode.Parallel, ContentType.CSV);
Expand All @@ -139,8 +139,8 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu
return waitForBatchChunks(bulkConnection, job.getId(), batchInfo.getId());
}
LOG.debug("PKChunking is not enabled");
BatchInfo[] batchInfos = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
LOG.info("Job id {}, status: {}", job.getId(), bulkConnection.getJobStatus(job.getId()).getState());
BatchInfo[] batchInfos = bulkConnectionRetryWrapper.getBatchInfoList(job.getId()).getBatchInfo();
LOG.info("Job id {}, status: {}", job.getId(), bulkConnectionRetryWrapper.getJobStatus(job.getId()).getState());
if (batchInfos.length > 0) {
LOG.info("Batch size {}, state {}", batchInfos.length, batchInfos[0].getState());
}
Expand Down Expand Up @@ -201,14 +201,15 @@ public static BulkConnection getBulkConnection(AuthenticatorCredentials authenti
private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, String jobId, String initialBatchId)
throws AsyncApiException {
BatchInfo initialBatchInfo = null;
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection);
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) {
//check if the job is aborted
if (bulkConnection.getJobStatus(jobId).getState() == JobStateEnum.Aborted) {
if (bulkConnectionRetryWrapper.getJobStatus(jobId).getState() == JobStateEnum.Aborted) {
LOG.info(String.format("Job with Id: '%s' is aborted", jobId));
return new BatchInfo[0];
}
try {
initialBatchInfo = bulkConnection.getBatchInfo(jobId, initialBatchId);
initialBatchInfo = bulkConnectionRetryWrapper.getBatchInfo(jobId, initialBatchId);
} catch (AsyncApiException e) {
if (i == SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES - 1) {
throw e;
Expand All @@ -218,7 +219,7 @@ private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, Str
}

if (initialBatchInfo.getState() == BatchStateEnum.NotProcessed) {
BatchInfo[] result = bulkConnection.getBatchInfoList(jobId).getBatchInfo();
BatchInfo[] result = bulkConnectionRetryWrapper.getBatchInfoList(jobId).getBatchInfo();
return Arrays.stream(result).filter(batchInfo -> batchInfo.getState() != BatchStateEnum.NotProcessed)
.toArray(BatchInfo[]::new);
} else if (initialBatchInfo.getState() == BatchStateEnum.Failed) {
Expand Down

0 comments on commit ad10954

Please sign in to comment.