-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PLUGIN-1706] Implement Retry logic in Salesforce batch source and Mutli Source Plugin for connection timeout issues #225
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,16 +16,25 @@ | |
package io.cdap.plugin.salesforce.plugin.source.batch; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.ImmutableSet; | ||
import com.sforce.async.AsyncApiException; | ||
import com.sforce.async.AsyncExceptionCode; | ||
import com.sforce.async.BatchInfo; | ||
import com.sforce.async.BatchStateEnum; | ||
import com.sforce.async.BulkConnection; | ||
import dev.failsafe.Failsafe; | ||
import dev.failsafe.FailsafeException; | ||
import dev.failsafe.TimeoutExceededException; | ||
import io.cdap.cdap.api.data.schema.Schema; | ||
import io.cdap.plugin.salesforce.BulkAPIBatchException; | ||
import io.cdap.plugin.salesforce.SalesforceConnectionUtil; | ||
import io.cdap.plugin.salesforce.SalesforceConstants; | ||
import io.cdap.plugin.salesforce.authenticator.Authenticator; | ||
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; | ||
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException; | ||
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; | ||
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; | ||
|
||
import org.apache.commons.csv.CSVFormat; | ||
import org.apache.commons.csv.CSVParser; | ||
import org.apache.commons.csv.CSVRecord; | ||
|
@@ -42,6 +51,7 @@ | |
import java.nio.charset.StandardCharsets; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* RecordReader implementation, which reads a single Salesforce batch from bulk job | ||
|
@@ -50,7 +60,14 @@ | |
public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String, ?>> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(SalesforceBulkRecordReader.class); | ||
|
||
public static final Set<AsyncExceptionCode> RETRY_ON_REASON = ImmutableSet.of(AsyncExceptionCode.Unknown, | ||
AsyncExceptionCode.InternalServerError, | ||
AsyncExceptionCode.ClientInputError, | ||
AsyncExceptionCode.Timeout); | ||
private static Long initialRetryDuration; | ||
private static Long maxRetryDuration; | ||
private static Integer maxRetryCount; | ||
private Boolean isRetryRequired; | ||
private final Schema schema; | ||
|
||
private CSVParser csvParser; | ||
|
@@ -73,6 +90,10 @@ public SalesforceBulkRecordReader(Schema schema) { | |
this.jobId = jobId; | ||
this.batchId = batchId; | ||
this.resultIds = resultIds; | ||
initialRetryDuration = SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS; | ||
maxRetryDuration = SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS; | ||
maxRetryCount = SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT; | ||
isRetryRequired = true; | ||
} | ||
|
||
/** | ||
|
@@ -87,13 +108,18 @@ public SalesforceBulkRecordReader(Schema schema) { | |
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) | ||
throws IOException, InterruptedException { | ||
Configuration conf = taskAttemptContext.getConfiguration(); | ||
initialRetryDuration = Long.valueOf(conf.get(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, | ||
String.valueOf((SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS)))); | ||
maxRetryDuration = Long.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, | ||
String.valueOf(SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS))); | ||
maxRetryCount = Integer.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, | ||
String.valueOf(SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT))); | ||
isRetryRequired = Boolean.valueOf(conf.get(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, String.valueOf(true))); | ||
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf); | ||
initialize(inputSplit, credentials); | ||
} | ||
|
||
public SalesforceBulkRecordReader initialize( | ||
InputSplit inputSplit, | ||
AuthenticatorCredentials credentials) | ||
public SalesforceBulkRecordReader initialize(InputSplit inputSplit, AuthenticatorCredentials credentials) | ||
throws IOException, InterruptedException { | ||
SalesforceSplit salesforceSplit = (SalesforceSplit) inputSplit; | ||
jobId = salesforceSplit.getJobId(); | ||
|
@@ -105,7 +131,7 @@ public SalesforceBulkRecordReader initialize( | |
resultIds = waitForBatchResults(bulkConnection); | ||
LOG.debug("Batch {} returned {} results", batchId, resultIds.length); | ||
setupParser(); | ||
} catch (AsyncApiException e) { | ||
} catch (AsyncApiException | SalesforceQueryExecutionException e) { | ||
throw new RuntimeException( | ||
String.format("Failed to wait for the result of a batch: %s", e.getMessage()), | ||
e); | ||
|
@@ -173,23 +199,56 @@ public void close() throws IOException { | |
} | ||
|
||
@VisibleForTesting | ||
void setupParser() throws IOException, AsyncApiException { | ||
void setupParser() throws IOException, AsyncApiException, InterruptedException { | ||
if (resultIdIndex >= resultIds.length) { | ||
throw new IllegalArgumentException(String.format("Invalid resultIdIndex %d, should be less than %d", | ||
resultIdIndex, resultIds.length)); | ||
resultIdIndex, resultIds.length)); | ||
} | ||
InputStream queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); | ||
|
||
CSVFormat csvFormat = CSVFormat.DEFAULT | ||
.withHeader() | ||
.withQuoteMode(QuoteMode.ALL) | ||
.withAllowMissingColumnNames(false); | ||
csvParser = CSVParser.parse(queryResponseStream, StandardCharsets.UTF_8, csvFormat); | ||
if (csvParser.getHeaderMap().isEmpty()) { | ||
throw new IllegalStateException("Empty response was received from Salesforce, but csv header was expected."); | ||
try { | ||
final InputStream queryResponseStream; | ||
if (isRetryRequired) { | ||
queryResponseStream = | ||
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) | ||
.get(() -> getQueryResultStream(bulkConnection)); | ||
} else { | ||
queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); | ||
} | ||
|
||
CSVFormat csvFormat = CSVFormat.DEFAULT | ||
.withHeader() | ||
.withQuoteMode(QuoteMode.ALL) | ||
.withAllowMissingColumnNames(false); | ||
csvParser = CSVParser.parse(queryResponseStream, StandardCharsets.UTF_8, csvFormat); | ||
|
||
if (csvParser.getHeaderMap().isEmpty()) { | ||
throw new IllegalStateException("Empty response was received from Salesforce, but csv header was expected."); | ||
} | ||
parserIterator = csvParser.iterator(); | ||
resultIdIndex++; | ||
} catch (TimeoutExceededException e) { | ||
throw new AsyncApiException("Exhausted retries trying to get query result stream", AsyncExceptionCode.Timeout); | ||
} catch (FailsafeException e) { | ||
if (e.getCause() instanceof InterruptedException) { | ||
throw (InterruptedException) e.getCause(); | ||
} | ||
if (e.getCause() instanceof AsyncApiException) { | ||
throw (AsyncApiException) e.getCause(); | ||
} | ||
throw e; | ||
} | ||
} | ||
|
||
public InputStream getQueryResultStream(BulkConnection bulkConnection) | ||
throws SalesforceQueryExecutionException, AsyncApiException { | ||
try { | ||
return bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); | ||
} catch (AsyncApiException exception) { | ||
LOG.warn("The bulk query job {} failed.", jobId); | ||
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) { | ||
throw new SalesforceQueryExecutionException(exception); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the error is not retryable, we are ignoring the error here, is it expected? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. error logged |
||
throw exception; | ||
} | ||
parserIterator = csvParser.iterator(); | ||
resultIdIndex++; | ||
} | ||
|
||
/** | ||
|
@@ -201,8 +260,7 @@ void setupParser() throws IOException, AsyncApiException { | |
* @throws InterruptedException sleep interrupted | ||
*/ | ||
private String[] waitForBatchResults(BulkConnection bulkConnection) | ||
throws AsyncApiException, InterruptedException { | ||
|
||
throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException { | ||
BatchInfo info = null; | ||
for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) { | ||
try { | ||
|
@@ -216,7 +274,26 @@ private String[] waitForBatchResults(BulkConnection bulkConnection) | |
} | ||
|
||
if (info.getState() == BatchStateEnum.Completed) { | ||
return bulkConnection.getQueryResultList(jobId, batchId).getResult(); | ||
try { | ||
if (isRetryRequired) { | ||
return Failsafe.with( | ||
SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) | ||
.get(() -> getQueryResultList(bulkConnection)); | ||
} else { | ||
return bulkConnection.getQueryResultList(jobId, batchId).getResult(); | ||
} | ||
|
||
} catch (TimeoutExceededException e) { | ||
throw new AsyncApiException("Exhausted retries trying to get query result list", AsyncExceptionCode.Timeout); | ||
} catch (FailsafeException e) { | ||
if (e.getCause() instanceof InterruptedException) { | ||
throw (InterruptedException) e.getCause(); | ||
} | ||
if (e.getCause() instanceof AsyncApiException) { | ||
throw (AsyncApiException) e.getCause(); | ||
} | ||
throw e; | ||
} | ||
} else if (info.getState() == BatchStateEnum.Failed) { | ||
throw new BulkAPIBatchException("Batch failed", info); | ||
} else { | ||
|
@@ -226,4 +303,17 @@ private String[] waitForBatchResults(BulkConnection bulkConnection) | |
} | ||
throw new BulkAPIBatchException("Timeout waiting for batch results", info); | ||
} | ||
|
||
private String[] getQueryResultList(BulkConnection bulkConnection) | ||
throws SalesforceQueryExecutionException, AsyncApiException { | ||
try { | ||
return bulkConnection.getQueryResultList(jobId, batchId).getResult(); | ||
} catch (AsyncApiException exception) { | ||
LOG.warn("The bulk query job {} failed.", jobId); | ||
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) { | ||
throw new SalesforceQueryExecutionException(exception); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the error is not retryable, we are ignoring the error here, is it expected? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. error logged |
||
throw exception; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this code, instead use like this in initialize method.
conf.get(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used for testing purpose only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still it is not solving any purpose, if those properties are null in config, these value will be overidden by null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will require extra mocking for conf and stubs for all the values required for the test cases.