Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1706] Implement Retry logic in Salesforce batch source and Mutli Source Plugin for connection timeout issues #225

Merged

Conversation

sgarg-CS
Copy link
Contributor

@sgarg-CS sgarg-CS commented Nov 17, 2023

PLUGIN-1706: Implement Retry logic in Salesforce batch source and Multi-Source Plugin for connection timeout issues

@sgarg-CS sgarg-CS force-pushed the feature/retry-bulk-query branch 2 times, most recently from 9a76af8 to c8b6161 Compare November 17, 2023 15:35
@sgarg-CS sgarg-CS changed the title Implement Retry logic in Salesforce batch source and Mutli Source Plugin for connection timeout issues [PLUGIN-1706] Implement Retry logic in Salesforce batch source and Mutli Source Plugin for connection timeout issues Nov 24, 2023
Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests.

@Vipinofficial11 Vipinofficial11 force-pushed the feature/retry-bulk-query branch from 2f78859 to 8c56415 Compare November 29, 2023 08:36
@Shubhangi-cs
Copy link
Contributor

Please add unit tests.

Added

@@ -50,7 +55,11 @@
public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String, ?>> {

private static final Logger LOG = LoggerFactory.getLogger(SalesforceBulkRecordReader.class);

public static final Set<String> RETRY_ON_REASON = ImmutableSet.of("Unknown", "InternalServerError",
"ClientInputError", "Timeout");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is ClientInputError returned? Sounds like a bad request 400 which should not be retried.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientInputError is coming when bulk api library not able to parse it properly and this is coming intermittently, we have added fix for this only.

@vikasrathee-cs vikasrathee-cs requested review from itsankit-google and removed request for vikasrathee-cs December 1, 2023 05:59
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> batchInfoList[0] = getQueryResultList(bulkConnection));
return batchInfoList[0];
//return bulkConnection.getQueryResultList(jobId, batchId).getResult();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Comment on lines 250 to 252
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> batchInfoList[0] = getQueryResultList(bulkConnection));
return batchInfoList[0];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use of storing batchInfoList and then returning?

return Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
          .run(() -> getQueryResultList(bulkConnection));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is used inside the Failsafe block there r chances of exception while retrying. Just used this variable first to capture the successful result before returning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not make sense, how does storing it in the variable help here?

Can you share an example explaining this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done the change

Comment on lines 443 to 446
Mockito.when(mockConnection.getQueryResultStream(jobId, batchId, resultId))
.thenThrow(new SalesforceQueryExecutionException("Simulated error"))
.thenThrow(new SalesforceQueryExecutionException("Simulated error"))
.thenReturn(new ByteArrayInputStream(csvStrings[1].getBytes(StandardCharsets.UTF_8)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: identation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


final InputStream[] queryResponseStream = new InputStream[1];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this an array of InputStream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As lambda function is used with Failsafe library, we need to initialize the array of inputstream to use final keyword.

Copy link
Member

@itsankit-google itsankit-google Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error did you get with this code?

final InputStream queryResponseStream = 
  Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
    .run(() -> getQueryResultStream(bulkConnection));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

throws AsyncApiException, InterruptedException {

throws AsyncApiException, InterruptedException, SalesforceQueryExecutionException {
final String[][] batchInfoList = new String[1][];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


@Test
public void testSetupParserWithRetrySuccess() throws Exception {
MapToRecordTransformer transformer = new MapToRecordTransformer();
Copy link
Member

@itsankit-google itsankit-google Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first usage of transformer seem to be at line 457, as a recommended practice, local variables shouldn't be initialized more than 2-3 lines before they are used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Comment on lines 126 to 128
final BatchInfo[] batchInfo = new BatchInfo[1];
Failsafe.with(getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.run(() -> batchInfo[0] = createBatchFromStream(bulkConnection, query, job));
Copy link
Member

@itsankit-google itsankit-google Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this as well similarly.

final BatchInfo batchInfo = 
  Failsafe.with(getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
    .run(() -> createBatchFromStream(bulkConnection, query, job));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

LOG.error("The bulk query job {} failed. Job State: {}", job.getId(), job.getState());
if (SalesforceBulkRecordReader.RETRY_ON_REASON.contains(exception.getExceptionCode())) {
throw new SalesforceQueryExecutionException(exception.getMessage());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the error is not retryable, we are ignoring the error here, is it expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error logged

LOG.error("The bulk query job {} failed.", jobId);
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) {
throw new SalesforceQueryExecutionException(exception.getMessage());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the error is not retryable, we are ignoring the error here, is it expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error logged

LOG.error("The bulk query job {} failed.", jobId);
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) {
throw new SalesforceQueryExecutionException(exception.getMessage());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the error is not retryable, we are ignoring the error here, is it expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error logged

if (RETRY_ON_REASON.contains(exception.getExceptionCode())) {
throw new SalesforceQueryExecutionException(exception.getMessage());
}
throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, exception);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we overriding the AsyncApiException, can't we just throw exception? Same applies to all places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

} catch (AsyncApiException exception) {
LOG.error("The bulk query job {} failed.", jobId);
if (RETRY_ON_REASON.contains(exception.getExceptionCode())) {
throw new SalesforceQueryExecutionException(exception.getMessage());
Copy link
Member

@itsankit-google itsankit-google Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way you are loosing all the additional information from original AsyncApiException stack trace. There are only some case where you would want to hide all details of the underlying exception by using e.getMessage instead of attaching a root cause. I don't see why would we want to have it here.

Same applies to all the places in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 326 to 327
Long initialRetryDuration = 5L;
Long maxRetryDuration = 80L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want immediate retries, otherwise this test would be waiting unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users should also be able to disable the retry.

We should add a Retry Configuration section in the UI.

There should be a toggle to disable the behavior and fine tune the retries(which is already added).

@itsankit-google
Copy link
Member

Please don't remove the build label, it helps ensuring that we are not introducing any compile time errors.

try {
return bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
} catch (AsyncApiException exception) {
LOG.error("The bulk query job {} failed.", jobId);
Copy link
Member

@itsankit-google itsankit-google Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you catch it and log it and rethrow, it is considered as an anti-pattern, ref: https://stackoverflow.com/questions/6639963/why-is-log-and-throw-considered-an-anti-pattern

It is okay to log if you think this information will not be logged anywhere above in the stack, but it should not be an error log since we are also retrying the error, it can be a warn.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment applies to all places in PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the jobId as warn might still be useful as I cannot see it getting logged in upper stack anywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged as a warning

Comment on lines 249 to 250
return Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.get(() -> getQueryResultList(bulkConnection));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we would need to unwrap those unchecked exceptions to prevent unneeded layers / cluttered message, see: https://github.com/data-integrations/bigquery-delta-plugins/pull/40/files#diff-427b4fbd30b2c80fb381e0e8fc3a6e2424ec9f0c94e8ed0881b0bc49402fc385R268-R279 for reference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be changed at all the three places where it is used or here only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment applies to all similar places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@sau42shri sau42shri self-requested a review December 8, 2023 12:04
}
throw e;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra empty line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@Shubhangi-cs Shubhangi-cs force-pushed the feature/retry-bulk-query branch from 0554370 to 7d64c9d Compare December 12, 2023 05:27
@Shubhangi-cs
Copy link
Contributor

Users should also be able to disable the retry.

We should add a Retry Configuration section in the UI.

There should be a toggle to disable the behavior and fine tune the retries(which is already added).

Added

null, ConcurrencyMode.Parallel, ContentType.CSV);
final BatchInfo batchInfo;
try {
if (retryOnBackendError) {
Copy link
Contributor

@vikasrathee-cs vikasrathee-cs Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having if else for whole code, if !retryEnabled, set maxretryattempts to 0 at all places. Keep failsafe with 0 maxRetryCount.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this at all places

@@ -73,6 +90,10 @@ public SalesforceBulkRecordReader(Schema schema) {
this.jobId = jobId;
this.batchId = batchId;
this.resultIds = resultIds;
initialRetryDuration = SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this code, instead use like this in initialize method.
conf.get(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used for testing purpose only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still it is not solving any purpose, if those properties are null in config, these value will be overidden by null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will require extra mocking for conf and stubs for all the values required for the test cases.

Comment on lines 92 to 116
@Nullable String consumerKey,
@Nullable String consumerSecret,
@Nullable String username,
@Nullable String password,
@Nullable String loginUrl,
@Nullable Integer connectTimeout,
@Nullable Integer readTimeout,
@Nullable String query,
@Nullable String sObjectName,
@Nullable String datetimeAfter,
@Nullable String datetimeBefore,
@Nullable String duration,
@Nullable String offset,
@Nullable String schema,
@Nullable String securityToken,
@Nullable String operation,
@Nullable Long initialRetryDuration,
@Nullable Long maxRetryDuration,
@Nullable Integer maxRetryCount,
Boolean retryOnBackendError,
@Nullable OAuthInfo oAuthInfo,
@Nullable Boolean enablePKChunk,
@Nullable Integer chunkSize,
@Nullable String parent,
@Nullable String proxyUrl) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

csvParser = CSVParser.parse(queryResponseStream, StandardCharsets.UTF_8, csvFormat);
if (csvParser.getHeaderMap().isEmpty()) {
throw new IllegalStateException("Empty response was received from Salesforce, but csv header was expected.");
int maxRetryCount = isRetryRequired ? maxRetryAttempts : 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when maxRetryCount is 0 it should not use FailSafe to call the salesforce query method.

This comment applies to all places in the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done the changes

* the License.
*/

import io.cdap.cdap.api.retry.RetryableException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused imports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}


public Map<ChronoUnit, Integer> getDuration() {
return extractRangeValue(SalesforceSourceConstants.PROPERTY_DURATION, duration);
}

public Boolean isRetryRequired() {
return retryOnBackendError;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be null in case of older pipelines. Did we verify it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intent was that we should handle it somewhere at the top level or in this class.

I leave it up to you to decide.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done the change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intent was that we should handle it somewhere at the top level or in this class.

I leave it up to you to decide.

ya changed it in config as well

@@ -0,0 +1,39 @@
package io.cdap.plugin.salesforce.plugin.source.batch.util;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this statement should be below the license.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* The FailsafeException is thrown with the expected error message.
* The retry policy is correctly configured with the specified configurations.
*/
@Test(expected = FailsafeException.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why FailsafeException here?

Javadoc comment suggests that The FailsafeException is thrown with the expected error message. Where are we comparing the expected error message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FailsafeException is expected as retry limit will get exceeded, changed the javadoc accordingly

@itsankit-google
Copy link
Member

Please squash commits before merge.

@Shubhangi-cs Shubhangi-cs force-pushed the feature/retry-bulk-query branch 4 times, most recently from 8030eb4 to 5e490ac Compare December 18, 2023 12:05
@Shubhangi-cs Shubhangi-cs force-pushed the feature/retry-bulk-query branch from 5e490ac to b92576e Compare December 18, 2023 12:06
@Shubhangi-cs Shubhangi-cs merged commit a84eecd into data-integrations:develop Dec 18, 2023
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants