-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PLUGIN-1706] Implement Retry logic in Salesforce batch source and Mutli Source Plugin for connection timeout issues #225
[PLUGIN-1706] Implement Retry logic in Salesforce batch source and Mutli Source Plugin for connection timeout issues #225
Conversation
9a76af8
to
c8b6161
Compare
src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit tests.
2f78859
to
8c56415
Compare
Added |
@@ -50,7 +55,11 @@ | |||
public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String, ?>> { | |||
|
|||
private static final Logger LOG = LoggerFactory.getLogger(SalesforceBulkRecordReader.class); | |||
|
|||
public static final Set<String> RETRY_ON_REASON = ImmutableSet.of("Unknown", "InternalServerError", | |||
"ClientInputError", "Timeout"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is ClientInputError
returned? Sounds like a bad request 400 which should not be retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClientInputError is coming when bulk api library not able to parse it properly and this is coming intermittently, we have added fix for this only.
...va/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceQueryExecutionException.java
Show resolved
Hide resolved
src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java
Outdated
Show resolved
Hide resolved
src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java
Outdated
Show resolved
Hide resolved
src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java
Show resolved
Hide resolved
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) | ||
.run(() -> batchInfoList[0] = getQueryResultList(bulkConnection)); | ||
return batchInfoList[0]; | ||
//return bulkConnection.getQueryResultList(jobId, batchId).getResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove comment not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) | ||
.run(() -> batchInfoList[0] = getQueryResultList(bulkConnection)); | ||
return batchInfoList[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the use of storing batchInfoList
and then returning?
return Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> getQueryResultList(bulkConnection));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it is used inside the Failsafe block there r chances of exception while retrying. Just used this variable first to capture the successful result before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not make sense, how does storing it in the variable help here?
Can you share an example explaining this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done the change
Mockito.when(mockConnection.getQueryResultStream(jobId, batchId, resultId)) | ||
.thenThrow(new SalesforceQueryExecutionException("Simulated error")) | ||
.thenThrow(new SalesforceQueryExecutionException("Simulated error")) | ||
.thenReturn(new ByteArrayInputStream(csvStrings[1].getBytes(StandardCharsets.UTF_8))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: identation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
final InputStream[] queryResponseStream = new InputStream[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this an array of InputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As lambda function is used with Failsafe library, we need to initialize the array of inputstream to use final keyword.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What error did you get with this code?
final InputStream queryResponseStream =
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> getQueryResultStream(bulkConnection));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
throws AsyncApiException, InterruptedException { | ||
|
||
throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException { | ||
final String[][] batchInfoList = new String[1][]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java
Outdated
Show resolved
Hide resolved
|
||
@Test | ||
public void testSetupParserWithRetrySuccess() throws Exception { | ||
MapToRecordTransformer transformer = new MapToRecordTransformer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first usage of transformer
seem to be at line 457, as a recommended practice, local variables shouldn't be initialized more than 2-3 lines before they are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
final BatchInfo[] batchInfo = new BatchInfo[1]; | ||
Failsafe.with(getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) | ||
.run(() -> batchInfo[0] = createBatchFromStream(bulkConnection, query, job)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix this as well similarly.
final BatchInfo batchInfo =
Failsafe.with(getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> createBatchFromStream(bulkConnection, query, job));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
LOG.error("The bulk query job {} failed. Job State: {}", job.getId(), job.getState()); | ||
if (SalesforceBulkRecordReader.RETRY_ON_REASON.contains(exception.getExceptionCode())) { | ||
throw new SalesforceQueryExecutionException(exception.getMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the error is not retryable, we are ignoring the error here, is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error logged
LOG.error("The bulk query job {} failed.", jobId); | ||
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) { | ||
throw new SalesforceQueryExecutionException(exception.getMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the error is not retryable, we are ignoring the error here, is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error logged
LOG.error("The bulk query job {} failed.", jobId); | ||
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) { | ||
throw new SalesforceQueryExecutionException(exception.getMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the error is not retryable, we are ignoring the error here, is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error logged
src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java
Show resolved
Hide resolved
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) { | ||
throw new SalesforceQueryExecutionException(exception.getMessage()); | ||
} | ||
throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we overriding the AsyncApiException
, can't we just throw exception
? Same applies to all places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
} catch (AsyncApiException exception) { | ||
LOG.error("The bulk query job {} failed.", jobId); | ||
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) { | ||
throw new SalesforceQueryExecutionException(exception.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way you are loosing all the additional information from original AsyncApiException
stack trace. There are only some case where you would want to hide all details of the underlying exception by using e.getMessage
instead of attaching a root cause. I don't see why would we want to have it here.
Same applies to all the places in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Long initialRetryDuration = 5L; | ||
Long maxRetryDuration = 80L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want immediate retries, otherwise this test would be waiting unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users should also be able to disable the retry.
We should add a Retry Configuration
section in the UI.
There should be a toggle to disable the behavior and fine tune the retries(which is already added).
Please don't remove the |
try { | ||
return bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); | ||
} catch (AsyncApiException exception) { | ||
LOG.error("The bulk query job {} failed.", jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you catch it and log it and rethrow, it is considered as an anti-pattern, ref: https://stackoverflow.com/questions/6639963/why-is-log-and-throw-considered-an-anti-pattern
It is okay to log if you think this information will not be logged anywhere above in the stack, but it should not be an error log since we are also retrying the error, it can be a warn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment applies to all places in PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging the jobId
as warn might still be useful as I cannot see it getting logged in upper stack anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logged as a warning
return Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) | ||
.get(() -> getQueryResultList(bulkConnection)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we would need to unwrap those unchecked exceptions to prevent unneeded layers / cluttered message, see: https://github.com/data-integrations/bigquery-delta-plugins/pull/40/files#diff-427b4fbd30b2c80fb381e0e8fc3a6e2424ec9f0c94e8ed0881b0bc49402fc385R268-R279 for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to be changed at all the three places where it is used or here only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment applies to all similar places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
} | ||
throw e; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove extra empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
0554370
to
7d64c9d
Compare
Added |
null, ConcurrencyMode.Parallel, ContentType.CSV); | ||
final BatchInfo batchInfo; | ||
try { | ||
if (retryOnBackendError) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of having if else for whole code, if !retryEnabled, set maxretryattempts to 0 at all places. Keep failsafe with 0 maxRetryCount.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this at all places
@@ -73,6 +90,10 @@ public SalesforceBulkRecordReader(Schema schema) { | |||
this.jobId = jobId; | |||
this.batchId = batchId; | |||
this.resultIds = resultIds; | |||
initialRetryDuration = SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this code, instead use like this in initialize method.
conf.get(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used for testing purpose only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still it is not solving any purpose, if those properties are null in config, these value will be overidden by null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will require extra mocking for conf and stubs for all the values required for the test cases.
@Nullable String consumerKey, | ||
@Nullable String consumerSecret, | ||
@Nullable String username, | ||
@Nullable String password, | ||
@Nullable String loginUrl, | ||
@Nullable Integer connectTimeout, | ||
@Nullable Integer readTimeout, | ||
@Nullable String query, | ||
@Nullable String sObjectName, | ||
@Nullable String datetimeAfter, | ||
@Nullable String datetimeBefore, | ||
@Nullable String duration, | ||
@Nullable String offset, | ||
@Nullable String schema, | ||
@Nullable String securityToken, | ||
@Nullable String operation, | ||
@Nullable Long initialRetryDuration, | ||
@Nullable Long maxRetryDuration, | ||
@Nullable Integer maxRetryCount, | ||
Boolean retryOnBackendError, | ||
@Nullable OAuthInfo oAuthInfo, | ||
@Nullable Boolean enablePKChunk, | ||
@Nullable Integer chunkSize, | ||
@Nullable String parent, | ||
@Nullable String proxyUrl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
csvParser = CSVParser.parse(queryResponseStream, StandardCharsets.UTF_8, csvFormat); | ||
if (csvParser.getHeaderMap().isEmpty()) { | ||
throw new IllegalStateException("Empty response was received from Salesforce, but csv header was expected."); | ||
int maxRetryCount = isRetryRequired ? maxRetryAttempts : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when maxRetryCount
is 0
it should not use FailSafe
to call the salesforce query method.
This comment applies to all places in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done the changes
* the License. | ||
*/ | ||
|
||
import io.cdap.cdap.api.retry.RetryableException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused imports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
|
||
public Map<ChronoUnit, Integer> getDuration() { | ||
return extractRangeValue(SalesforceSourceConstants.PROPERTY_DURATION, duration); | ||
} | ||
|
||
public Boolean isRetryRequired() { | ||
return retryOnBackendError; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be null in case of older pipelines. Did we verify it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intent was that we should handle it somewhere at the top level or in this class.
I leave it up to you to decide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intent was that we should handle it somewhere at the top level or in this class.
I leave it up to you to decide.
ya changed it in config as well
@@ -0,0 +1,39 @@ | |||
package io.cdap.plugin.salesforce.plugin.source.batch.util; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this statement should be below the license.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* The FailsafeException is thrown with the expected error message. | ||
* The retry policy is correctly configured with the specified configurations. | ||
*/ | ||
@Test(expected = FailsafeException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why FailsafeException
here?
Javadoc comment suggests that The FailsafeException is thrown with the expected error message
. Where are we comparing the expected error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FailsafeException is expected as retry limit will get exceeded, changed the javadoc accordingly
Please squash commits before merge. |
8030eb4
to
5e490ac
Compare
5e490ac
to
b92576e
Compare
PLUGIN-1706: Implement Retry logic in Salesforce batch source and Multi-Source Plugin for connection timeout issues