Skip to content

Commit

Permalink
Fixed testing issue
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs authored and Vipinofficial11 committed Nov 29, 2023
1 parent 5c85c9b commit 8c56415
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@
<dependency>
<groupId>io.cdap.tests.e2e</groupId>
<artifactId>cdap-e2e-framework</artifactId>
<version>0.3.0-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import com.sforce.async.BatchStateEnum;
import com.sforce.async.BulkConnection;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
Expand All @@ -43,9 +43,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -122,7 +120,7 @@ public SalesforceBulkRecordReader initialize(
resultIds = waitForBatchResults(bulkConnection);
LOG.debug("Batch {} returned {} results", batchId, resultIds.length);
setupParser();
} catch (AsyncApiException e) {
} catch (AsyncApiException | SalesforceQueryExecutionException e) {
throw new RuntimeException(
String.format("Failed to wait for the result of a batch: %s", e.getMessage()),
e);
Expand Down Expand Up @@ -195,7 +193,8 @@ void setupParser() throws IOException, AsyncApiException {
}

final InputStream[] queryResponseStream = new InputStream[1];
Failsafe.with(getRetryPolicy()).run(() -> queryResponseStream[0] = (getQueryResultStream(bulkConnection)));
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> queryResponseStream[0] = (getQueryResultStream(bulkConnection)));

CSVFormat csvFormat = CSVFormat.DEFAULT
.withHeader()
Expand Down Expand Up @@ -230,8 +229,8 @@ private InputStream getQueryResultStream(BulkConnection bulkConnection) throws S
* @throws InterruptedException sleep interrupted
*/
private String[] waitForBatchResults(BulkConnection bulkConnection)
throws AsyncApiException, InterruptedException {

throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException {
final String[][] batchInfoList = new String[1][];
BatchInfo info = null;
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) {
try {
Expand All @@ -245,7 +244,9 @@ private String[] waitForBatchResults(BulkConnection bulkConnection)
}

if (info.getState() == BatchStateEnum.Completed) {
Failsafe.with(getRetryPolicy()).run(() -> getQueryResultList(bulkConnection));
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> batchInfoList[0] = getQueryResultList(bulkConnection));
return batchInfoList[0];
//return bulkConnection.getQueryResultList(jobId, batchId).getResult();
} else if (info.getState() == BatchStateEnum.Failed) {
throw new BulkAPIBatchException("Batch failed", info);
Expand All @@ -269,16 +270,4 @@ private String[] getQueryResultList(BulkConnection bulkConnection) throws Salesf
return null;
}

private static RetryPolicy<Object> getRetryPolicy() {
// Exponential backoff with initial retry of 5 seconds and max retry of 80 seconds.
return RetryPolicy.builder()
.handle(SocketTimeoutException.class, AsyncApiException.class)
.withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration), 2)
.withMaxRetries(maxRetryCount)
.onRetry(event -> LOG.debug("Retrying Salesforce Create Batch From Stream. Retry count: {}", event
.getAttemptCount()))
.onSuccess(event -> LOG.debug("Salesforce Create Batch From Stream executed successfully."))
.onRetriesExceeded(event -> LOG.error("Retry limit reached for Salesforce Create Batch From Stream."))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -248,11 +247,11 @@ private static OperationEnum getOperationEnum(String operation) {
}
}

private static RetryPolicy<Object> getRetryPolicy(Long initialRetryDuration, Long maxRetryDuration,
public static RetryPolicy<Object> getRetryPolicy(Long initialRetryDuration, Long maxRetryDuration,
Integer maxRetryCount) {
// Exponential backoff with initial retry of 5 seconds and max retry of 80 seconds.
return RetryPolicy.builder()
.handle(SocketTimeoutException.class, AsyncApiException.class)
.handle(SalesforceQueryExecutionException.class)
.withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration), 2)
.withMaxRetries(maxRetryCount)
.onRetry(event -> LOG.debug("Retrying Salesforce Create Batch From Stream. Retry count: {}", event
Expand Down

0 comments on commit 8c56415

Please sign in to comment.