Skip to content

Commit

Permalink
Add retry to Bulk query jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sgarg-CS committed Nov 17, 2023
1 parent cef543f commit c8b6161
Show file tree
Hide file tree
Showing 17 changed files with 329 additions and 22 deletions.
11 changes: 10 additions & 1 deletion docs/Salesforce-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,13 @@ Support also includes custom objects, and any Sharing and History tables that su

**Query Operation:**
Specify the query operation to run on the table. If query is selected, only current records will be returned.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.

**Initial Retry Duration**
Time taken for the first retry. Default is 5 seconds.

**Max Retry Duration**
Maximum time in seconds retries can take. Default is 80 seconds.

**Max Retry Count**
Maximum number of retries allowed. Default is 5.
9 changes: 9 additions & 0 deletions docs/SalesforceMultiObjects-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,12 @@ Below is a non-comprehensive list of **sObjects** that are not currently availab
- UserRecordAccess
- WorkOrderLineItemStatus
- WorkOrderStatus

**Initial Retry Duration**
Time taken for the first retry. Default is 5 seconds.

**Max Retry Duration**
Maximum time in seconds retries can take. Default is 80 seconds.

**Max Retry Count**
Maximum number of retries allowed. Default is 5.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<powermock.version>2.0.2</powermock.version>
<failsafe.version>3.3.2</failsafe.version>
</properties>

<repositories>
Expand Down Expand Up @@ -323,6 +324,11 @@
<version>${commons-logging.version}</version>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>

<!-- tests -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ public abstract class SalesforceBaseSourceConfig extends ReferencePluginConfig {
@Nullable
private String operation;

@Name(SalesforceSourceConstants.PROPERTY_INITIAL_RETRY_DURATION)
@Description("Time taken for the first retry. Default is 5 seconds.")
@Nullable
private Long initialRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_DURATION)
@Description("Maximum time in seconds retries can take. Default is 80 seconds.")
@Nullable
private Long maxRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_COUNT)
@Description("Maximum number of retries allowed. Default is 5.")
@Nullable
private Integer maxRetryCount;

@Name(ConfigUtil.NAME_USE_CONNECTION)
@Nullable
@Description("Whether to use an existing connection.")
Expand Down Expand Up @@ -127,6 +142,9 @@ protected SalesforceBaseSourceConfig(String referenceName,
@Nullable String securityToken,
@Nullable OAuthInfo oAuthInfo,
@Nullable String operation,
@Nullable Long initialRetryDuration,
@Nullable Long maxRetryDuration,
@Nullable Integer maxRetryCount,
@Nullable String proxyUrl) {
super(referenceName);
this.connection = new SalesforceConnectorConfig(consumerKey, consumerSecret, username, password, loginUrl,
Expand All @@ -136,6 +154,9 @@ protected SalesforceBaseSourceConfig(String referenceName,
this.duration = duration;
this.offset = offset;
this.operation = operation;
this.initialRetryDuration = initialRetryDuration;
this.maxRetryDuration = maxRetryDuration;
this.maxRetryCount = maxRetryCount;
}


Expand Down Expand Up @@ -186,6 +207,21 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException {
return partnerConnection.getUserInfo().getOrganizationId();
}

@Nullable
public Long getInitialRetryDuration() {
return initialRetryDuration == null ? 5L : initialRetryDuration;
}

@Nullable
public Long getMaxRetryDuration() {
return maxRetryDuration == null ? 80L : initialRetryDuration;
}

@Nullable
public Integer getMaxRetryCount() {
return maxRetryCount == null ? 5 : maxRetryCount;
}

public void validateFilters(FailureCollector collector) {
try {
validateIntervalFilterProperty(SalesforceSourceConstants.PROPERTY_DATETIME_AFTER, getDatetimeAfter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
authenticatorCredentials = config.getConnection().getAuthenticatorCredentials();
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation()))
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount()))
.flatMap(Collection::stream).collect(Collectors.toList());
// store the jobIds so be used in onRunFinish() to close the connections
querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public static List<SalesforceSplit> getSplits(
String.join(";", chunkHeaderValues));
}
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection,
enablePKChunk, config.getOperation());
enablePKChunk, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount());
return querySplits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
package io.cdap.plugin.salesforce.plugin.source.batch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchStateEnum;
import com.sforce.async.BulkConnection;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
Expand All @@ -39,9 +43,12 @@

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
* RecordReader implementation, which reads a single Salesforce batch from bulk job
Expand All @@ -50,7 +57,11 @@
public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String, ?>> {

private static final Logger LOG = LoggerFactory.getLogger(SalesforceBulkRecordReader.class);

public static final Set<String> RETRY_ON_REASON = ImmutableSet.of("Unknown", "InternalServerError",
"ClientInputError", "Timeout");
private static Long initialRetryDuration;
private static Long maxRetryDuration;
private static Integer maxRetryCount;
private final Schema schema;

private CSVParser csvParser;
Expand All @@ -73,6 +84,9 @@ public SalesforceBulkRecordReader(Schema schema) {
this.jobId = jobId;
this.batchId = batchId;
this.resultIds = resultIds;
this.initialRetryDuration = 5L;
this.maxRetryDuration = 80L;
this.maxRetryCount = 5;
}

/**
Expand All @@ -87,6 +101,9 @@ public SalesforceBulkRecordReader(Schema schema) {
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
initialRetryDuration = Long.valueOf(conf.get(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION));
maxRetryDuration = Long.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION));
maxRetryCount = Integer.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT));
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf);
initialize(inputSplit, credentials);
}
Expand Down Expand Up @@ -176,20 +193,34 @@ void setupParser() throws IOException, AsyncApiException {
throw new IllegalArgumentException(String.format("Invalid resultIdIndex %d, should be less than %d",
resultIdIndex, resultIds.length));
}
InputStream queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);

final InputStream[] queryResponseStream = new InputStream[1];
Failsafe.with(getRetryPolicy()).run(() -> queryResponseStream[0] = (getQueryResultStream(bulkConnection)));

CSVFormat csvFormat = CSVFormat.DEFAULT
.withHeader()
.withQuoteMode(QuoteMode.ALL)
.withAllowMissingColumnNames(false);
csvParser = CSVParser.parse(queryResponseStream, StandardCharsets.UTF_8, csvFormat);
csvParser = CSVParser.parse(queryResponseStream[0], StandardCharsets.UTF_8, csvFormat);
if (csvParser.getHeaderMap().isEmpty()) {
throw new IllegalStateException("Empty response was received from Salesforce, but csv header was expected.");
}
parserIterator = csvParser.iterator();
resultIdIndex++;
}

private InputStream getQueryResultStream(BulkConnection bulkConnection) throws SalesforceQueryExecutionException {
try {
return bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
} catch (AsyncApiException exception) {
LOG.error("The bulk query job {} failed.", jobId);
if (RETRY_ON_REASON.contains(exception.getExceptionCode().name())) {
throw new SalesforceQueryExecutionException(exception.getMessage());
}
}
return null;
}

/**
* Wait until a batch with given batchId succeeds, or throw an exception
*
Expand All @@ -214,7 +245,8 @@ private String[] waitForBatchResults(BulkConnection bulkConnection)
}

if (info.getState() == BatchStateEnum.Completed) {
return bulkConnection.getQueryResultList(jobId, batchId).getResult();
Failsafe.with(getRetryPolicy()).run(() -> getQueryResultList(bulkConnection));
//return bulkConnection.getQueryResultList(jobId, batchId).getResult();
} else if (info.getState() == BatchStateEnum.Failed) {
throw new BulkAPIBatchException("Batch failed", info);
} else {
Expand All @@ -224,4 +256,29 @@ private String[] waitForBatchResults(BulkConnection bulkConnection)
}
throw new BulkAPIBatchException("Timeout waiting for batch results", info);
}

private String[] getQueryResultList(BulkConnection bulkConnection) throws SalesforceQueryExecutionException {
try {
return bulkConnection.getQueryResultList(jobId, batchId).getResult();
} catch (AsyncApiException exception) {
LOG.error("The bulk query job {} failed.", jobId);
if (RETRY_ON_REASON.contains(exception.getExceptionCode().name())) {
throw new SalesforceQueryExecutionException(exception.getMessage());
}
}
return null;
}

private static RetryPolicy<Object> getRetryPolicy() {
// Exponential backoff with initial retry of 5 seconds and max retry of 80 seconds.
return RetryPolicy.builder()
.handle(SocketTimeoutException.class, AsyncApiException.class)
.withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration), 2)
.withMaxRetries(maxRetryCount)
.onRetry(event -> LOG.debug("Retrying Salesforce Create Batch From Stream. Retry count: {}", event
.getAttemptCount()))
.onSuccess(event -> LOG.debug("Salesforce Create Batch From Stream executed successfully."))
.onRetriesExceeded(event -> LOG.error("Retry limit reached for Salesforce Create Batch From Stream."))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public SalesforceInputFormatProvider(SalesforceBaseSourceConfig config,
ImmutableMap.Builder<String, String> configBuilder = new ImmutableMap.Builder<String, String>()
.put(SalesforceSourceConstants.CONFIG_SCHEMAS, GSON.toJson(schemas));
configBuilder.put(SalesforceSourceConstants.CONFIG_QUERY_SPLITS, GSON.toJson(querySplits))
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString());
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString())
.put(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, config.getInitialRetryDuration().toString())
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, config.getMaxRetryCount().toString())
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, config.getMaxRetryCount().toString());
if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) {
configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,13 @@ public SalesforceMultiSourceConfig(String referenceName,
@Nullable String securityToken,
@Nullable OAuthInfo oAuthInfo,
@Nullable String operation,
@Nullable Long initialRetryDuration,
@Nullable Long maxRetryDuration,
@Nullable Integer maxRetryCount,
@Nullable String proxyUrl) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, connectTimeout,
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation, proxyUrl);
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation, initialRetryDuration,
maxRetryDuration, maxRetryCount, proxyUrl);
this.whiteList = whiteList;
this.blackList = blackList;
this.sObjectNameField = sObjectNameField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class SalesforceMultiSourceConfigBuilder {
private String securityToken;
private OAuthInfo oAuthInfo;
private String operation;
private Long initialRetryDuration;
private Long maxRetryDuration;
private Integer maxRetryCount;
private Integer connectTimeout;
private String proxyUrl;

Expand Down Expand Up @@ -121,6 +124,21 @@ public SalesforceMultiSourceConfigBuilder setOperation(String operation) {
return this;
}

public SalesforceMultiSourceConfigBuilder setInitialRetryDuration(Long initialRetryDuration) {
this.initialRetryDuration = initialRetryDuration;
return this;
}

public SalesforceMultiSourceConfigBuilder setMaxRetryDuration(Long maxRetryDuration) {
this.maxRetryDuration = maxRetryDuration;
return this;
}

public SalesforceMultiSourceConfigBuilder setMaxRetryCount(Integer maxRetryCount) {
this.maxRetryCount = maxRetryCount;
return this;
}

public SalesforceMultiSourceConfigBuilder setConnectTimeout(Integer connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
Expand All @@ -134,6 +152,7 @@ public SalesforceMultiSourceConfigBuilder setProxyUrl(String proxyUrl) {
public SalesforceMultiSourceConfig build() {
return new SalesforceMultiSourceConfig(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
connectTimeout, datetimeAfter, datetimeBefore, duration, offset, whiteList,
blackList, sObjectNameField, securityToken, oAuthInfo, operation, proxyUrl);
blackList, sObjectNameField, securityToken, oAuthInfo, operation,
initialRetryDuration, maxRetryDuration, maxRetryCount, proxyUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,17 @@ public SalesforceSourceConfig(String referenceName,
@Nullable String schema,
@Nullable String securityToken,
@Nullable String operation,
@Nullable Long initialRetryDuration,
@Nullable Long maxRetryDuration,
@Nullable Integer maxRetryCount,
@Nullable OAuthInfo oAuthInfo,
@Nullable Boolean enablePKChunk,
@Nullable Integer chunkSize,
@Nullable String parent,
@Nullable String proxyUrl) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, connectTimeout,
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation, proxyUrl);
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation, initialRetryDuration,
maxRetryDuration, maxRetryCount, proxyUrl);
this.query = query;
this.sObjectName = sObjectName;
this.schema = schema;
Expand Down
Loading

0 comments on commit c8b6161

Please sign in to comment.