Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1706] Implement Retry logic in Salesforce batch source and Mutli Source Plugin for connection timeout issues #225

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/Salesforce-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,5 @@ Support also includes custom objects, and any Sharing and History tables that su

**Query Operation:**
Specify the query operation to run on the table. If query is selected, only current records will be returned.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.

1 change: 1 addition & 0 deletions docs/SalesforceMultiObjects-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,4 @@ Below is a non-comprehensive list of **sObjects** that are not currently availab
- UserRecordAccess
- WorkOrderLineItemStatus
- WorkOrderStatus

8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<powermock.version>2.0.2</powermock.version>
<failsafe.version>3.3.2</failsafe.version>
</properties>

<repositories>
Expand Down Expand Up @@ -323,6 +324,11 @@
<version>${commons-logging.version}</version>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>

<!-- tests -->
<dependency>
Expand Down Expand Up @@ -783,7 +789,7 @@
<dependency>
<groupId>io.cdap.tests.e2e</groupId>
<artifactId>cdap-e2e-framework</artifactId>
<version>0.3.0-SNAPSHOT</version>
<version>0.4.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ public abstract class SalesforceBaseSourceConfig extends ReferencePluginConfig {
@Nullable
private String operation;

@Name(SalesforceSourceConstants.PROPERTY_INITIAL_RETRY_DURATION)
@Description("Time taken for the first retry. Default is 5 seconds.")
@Nullable
private Long initialRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_DURATION)
@Description("Maximum time in seconds retries can take. Default is 80 seconds.")
@Nullable
private Long maxRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_COUNT)
@Description("Maximum number of retries allowed. Default is 5.")
@Nullable
private Integer maxRetryCount;

@Name(SalesforceSourceConstants.PROPERTY_RETRY_REQUIRED)
@Description("Retry is required or not for some of the internal call failures")
private Boolean retryOnBackendError;

@Name(ConfigUtil.NAME_USE_CONNECTION)
@Nullable
@Description("Whether to use an existing connection.")
Expand Down Expand Up @@ -128,6 +147,10 @@ protected SalesforceBaseSourceConfig(String referenceName,
@Nullable String securityToken,
@Nullable OAuthInfo oAuthInfo,
@Nullable String operation,
@Nullable Long initialRetryDuration,
@Nullable Long maxRetryDuration,
@Nullable Integer maxRetryCount,
Boolean retryOnBackendError,
@Nullable String proxyUrl) {
super(referenceName);
this.connection = new SalesforceConnectorConfig(consumerKey, consumerSecret, username, password, loginUrl,
Expand All @@ -137,13 +160,21 @@ protected SalesforceBaseSourceConfig(String referenceName,
this.duration = duration;
this.offset = offset;
this.operation = operation;
this.initialRetryDuration = initialRetryDuration;
this.maxRetryDuration = maxRetryDuration;
this.retryOnBackendError = retryOnBackendError;
this.maxRetryCount = maxRetryCount;
}


public Map<ChronoUnit, Integer> getDuration() {
return extractRangeValue(SalesforceSourceConstants.PROPERTY_DURATION, duration);
}

public Boolean isRetryRequired() {
return retryOnBackendError == null || retryOnBackendError;
}

public Map<ChronoUnit, Integer> getOffset() {
return extractRangeValue(SalesforceSourceConstants.PROPERTY_OFFSET, offset);
}
Expand Down Expand Up @@ -188,6 +219,19 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException {
return partnerConnection.getUserInfo().getOrganizationId();
}

public Long getInitialRetryDuration() {
return initialRetryDuration == null ? SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS :
initialRetryDuration;
}

public Long getMaxRetryDuration() {
return maxRetryDuration == null ? SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
}

public Integer getMaxRetryCount() {
return maxRetryCount == null ? SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
}

public void validateFilters(FailureCollector collector) {
try {
validateIntervalFilterProperty(SalesforceSourceConstants.PROPERTY_DATETIME_AFTER, getDatetimeAfter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
authenticatorCredentials = config.getConnection().getAuthenticatorCredentials();
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation()))
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired()))
.flatMap(Collection::stream).collect(Collectors.toList());
// store the jobIds so be used in onRunFinish() to close the connections
querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public static List<SalesforceSplit> getSplits(
String.join(";", chunkHeaderValues));
}
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection,
enablePKChunk, config.getOperation());
enablePKChunk, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired());
return querySplits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@
package io.cdap.plugin.salesforce.plugin.source.batch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.sforce.async.AsyncApiException;
import com.sforce.async.AsyncExceptionCode;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchStateEnum;
import com.sforce.async.BulkConnection;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.TimeoutExceededException;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
Expand All @@ -42,6 +51,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
* RecordReader implementation, which reads a single Salesforce batch from bulk job
Expand All @@ -50,7 +60,14 @@
public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String, ?>> {

private static final Logger LOG = LoggerFactory.getLogger(SalesforceBulkRecordReader.class);

public static final Set<AsyncExceptionCode> RETRY_ON_REASON = ImmutableSet.of(AsyncExceptionCode.Unknown,
AsyncExceptionCode.InternalServerError,
AsyncExceptionCode.ClientInputError,
AsyncExceptionCode.Timeout);
private static Long initialRetryDuration;
private static Long maxRetryDuration;
private static Integer maxRetryCount;
private Boolean isRetryRequired;
private final Schema schema;

private CSVParser csvParser;
Expand All @@ -73,6 +90,10 @@ public SalesforceBulkRecordReader(Schema schema) {
this.jobId = jobId;
this.batchId = batchId;
this.resultIds = resultIds;
initialRetryDuration = SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this code, instead use like this in initialize method.
conf.get(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used for testing purpose only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still it is not solving any purpose, if those properties are null in config, these value will be overidden by null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will require extra mocking for conf and stubs for all the values required for the test cases.

maxRetryDuration = SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS;
maxRetryCount = SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT;
isRetryRequired = true;
}

/**
Expand All @@ -87,13 +108,18 @@ public SalesforceBulkRecordReader(Schema schema) {
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
initialRetryDuration = Long.valueOf(conf.get(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION,
String.valueOf((SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS))));
maxRetryDuration = Long.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION,
String.valueOf(SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS)));
maxRetryCount = Integer.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT,
String.valueOf(SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT)));
isRetryRequired = Boolean.valueOf(conf.get(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, String.valueOf(true)));
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf);
initialize(inputSplit, credentials);
}

public SalesforceBulkRecordReader initialize(
InputSplit inputSplit,
AuthenticatorCredentials credentials)
public SalesforceBulkRecordReader initialize(InputSplit inputSplit, AuthenticatorCredentials credentials)
throws IOException, InterruptedException {
SalesforceSplit salesforceSplit = (SalesforceSplit) inputSplit;
jobId = salesforceSplit.getJobId();
Expand All @@ -105,7 +131,7 @@ public SalesforceBulkRecordReader initialize(
resultIds = waitForBatchResults(bulkConnection);
LOG.debug("Batch {} returned {} results", batchId, resultIds.length);
setupParser();
} catch (AsyncApiException e) {
} catch (AsyncApiException | SalesforceQueryExecutionException e) {
throw new RuntimeException(
String.format("Failed to wait for the result of a batch: %s", e.getMessage()),
e);
Expand Down Expand Up @@ -173,23 +199,56 @@ public void close() throws IOException {
}

@VisibleForTesting
void setupParser() throws IOException, AsyncApiException {
void setupParser() throws IOException, AsyncApiException, InterruptedException {
if (resultIdIndex >= resultIds.length) {
throw new IllegalArgumentException(String.format("Invalid resultIdIndex %d, should be less than %d",
resultIdIndex, resultIds.length));
resultIdIndex, resultIds.length));
}
InputStream queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);

CSVFormat csvFormat = CSVFormat.DEFAULT
.withHeader()
.withQuoteMode(QuoteMode.ALL)
.withAllowMissingColumnNames(false);
csvParser = CSVParser.parse(queryResponseStream, StandardCharsets.UTF_8, csvFormat);
if (csvParser.getHeaderMap().isEmpty()) {
throw new IllegalStateException("Empty response was received from Salesforce, but csv header was expected.");
try {
final InputStream queryResponseStream;
if (isRetryRequired) {
queryResponseStream =
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.get(() -> getQueryResultStream(bulkConnection));
} else {
queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
}

CSVFormat csvFormat = CSVFormat.DEFAULT
.withHeader()
.withQuoteMode(QuoteMode.ALL)
.withAllowMissingColumnNames(false);
csvParser = CSVParser.parse(queryResponseStream, StandardCharsets.UTF_8, csvFormat);

if (csvParser.getHeaderMap().isEmpty()) {
throw new IllegalStateException("Empty response was received from Salesforce, but csv header was expected.");
}
parserIterator = csvParser.iterator();
resultIdIndex++;
} catch (TimeoutExceededException e) {
throw new AsyncApiException("Exhausted retries trying to get query result stream", AsyncExceptionCode.Timeout);
} catch (FailsafeException e) {
if (e.getCause() instanceof InterruptedException) {
throw (InterruptedException) e.getCause();
}
if (e.getCause() instanceof AsyncApiException) {
throw (AsyncApiException) e.getCause();
}
throw e;
}
}

public InputStream getQueryResultStream(BulkConnection bulkConnection)
throws SalesforceQueryExecutionException, AsyncApiException {
try {
return bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
} catch (AsyncApiException exception) {
LOG.warn("The bulk query job {} failed.", jobId);
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) {
throw new SalesforceQueryExecutionException(exception);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the error is not retryable, we are ignoring the error here, is it expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error logged

throw exception;
}
parserIterator = csvParser.iterator();
resultIdIndex++;
}

/**
Expand All @@ -201,8 +260,7 @@ void setupParser() throws IOException, AsyncApiException {
* @throws InterruptedException sleep interrupted
*/
private String[] waitForBatchResults(BulkConnection bulkConnection)
throws AsyncApiException, InterruptedException {

throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException {
BatchInfo info = null;
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) {
try {
Expand All @@ -216,7 +274,26 @@ private String[] waitForBatchResults(BulkConnection bulkConnection)
}

if (info.getState() == BatchStateEnum.Completed) {
return bulkConnection.getQueryResultList(jobId, batchId).getResult();
try {
if (isRetryRequired) {
return Failsafe.with(
SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.get(() -> getQueryResultList(bulkConnection));
} else {
return bulkConnection.getQueryResultList(jobId, batchId).getResult();
}

} catch (TimeoutExceededException e) {
throw new AsyncApiException("Exhausted retries trying to get query result list", AsyncExceptionCode.Timeout);
} catch (FailsafeException e) {
if (e.getCause() instanceof InterruptedException) {
throw (InterruptedException) e.getCause();
}
if (e.getCause() instanceof AsyncApiException) {
throw (AsyncApiException) e.getCause();
}
throw e;
}
} else if (info.getState() == BatchStateEnum.Failed) {
throw new BulkAPIBatchException("Batch failed", info);
} else {
Expand All @@ -226,4 +303,17 @@ private String[] waitForBatchResults(BulkConnection bulkConnection)
}
throw new BulkAPIBatchException("Timeout waiting for batch results", info);
}

private String[] getQueryResultList(BulkConnection bulkConnection)
throws SalesforceQueryExecutionException, AsyncApiException {
try {
return bulkConnection.getQueryResultList(jobId, batchId).getResult();
} catch (AsyncApiException exception) {
LOG.warn("The bulk query job {} failed.", jobId);
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) {
throw new SalesforceQueryExecutionException(exception);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the error is not retryable, we are ignoring the error here, is it expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error logged

throw exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ public SalesforceInputFormatProvider(SalesforceBaseSourceConfig config,
.put(SalesforceSourceConstants.CONFIG_SCHEMAS, GSON.toJson(schemas));
configBuilder.put(SalesforceSourceConstants.CONFIG_QUERY_SPLITS, GSON.toJson(querySplits))
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString())
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString());
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString())
.put(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, config.getInitialRetryDuration().toString())
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, config.getMaxRetryDuration().toString())
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, config.getMaxRetryCount().toString())
.put(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, config.isRetryRequired().toString());

if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) {
configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,14 @@ public SalesforceMultiSourceConfig(String referenceName,
@Nullable String securityToken,
@Nullable OAuthInfo oAuthInfo,
@Nullable String operation,
@Nullable Long initialRetryDuration,
@Nullable Long maxRetryDuration,
@Nullable Integer maxRetryCount,
Boolean retryOnBackendError,
@Nullable String proxyUrl) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, connectTimeout, readTimeout,
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation, proxyUrl);
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation, initialRetryDuration,
maxRetryDuration, maxRetryCount, retryOnBackendError, proxyUrl);
this.whiteList = whiteList;
this.blackList = blackList;
this.sObjectNameField = sObjectNameField;
Expand Down
Loading