Skip to content

Commit

Permalink
Merge pull request data-integrations#236 from cloudsufi/job-abort-fix
Browse files Browse the repository at this point in the history
Added code for aborting job in case of failure and logs added
  • Loading branch information
vikasrathee-cs authored Feb 5, 2024
2 parents a73e481 + 48ccda5 commit 07222d9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
21 changes: 11 additions & 10 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -120,23 +118,20 @@ public static void checkResults(BulkConnection bulkConnection, JobInfo job,
*/
CSVReader rdr = new CSVReader(bulkConnection.getBatchResultStream(job.getId(), batchInfo.getId()));
List<String> resultHeader = rdr.nextRecord();
int resultCols = resultHeader.size();
int successRowId = resultHeader.indexOf("Success");
int errorRowId = resultHeader.indexOf("Error");

List<String> row;
while ((row = rdr.nextRecord()) != null) {
Map<String, String> resultInfo = new HashMap<>();
for (int i = 0; i < resultCols; i++) {
resultInfo.put(resultHeader.get(i), row.get(i));
}
boolean success = Boolean.parseBoolean(resultInfo.get("Success"));
boolean success = Boolean.parseBoolean(row.get(successRowId));
if (!success) {
String error = resultInfo.get("Error");
String error = row.get(errorRowId);
String errorMessage = String.format("Failed to create row with error: '%s'. BatchId='%s'",
error, batchInfo.getId());
if (ignoreFailures) {
LOG.error(errorMessage);
} else {
throw new RuntimeException(errorMessage);
throw new BulkAPIBatchException(errorMessage, batchInfo);
}
}
}
Expand Down Expand Up @@ -181,6 +176,12 @@ public static void awaitCompletion(BulkConnection bulkConnection, JobInfo job,
throw new BulkAPIBatchException("Batch failed", b);
}
} else if (b.getState() == BatchStateEnum.Completed) {
LOG.debug("Batch Completed with Batch Id:{}, Total Processed Records: {}, Failed Records: {}," +
" Successful records: {}",
b.getId(),
b.getNumberRecordsProcessed(),
b.getNumberRecordsFailed(),
b.getNumberRecordsProcessed() - b.getNumberRecordsFailed());
incomplete.remove(b.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,20 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException {
SalesforceBulkUtil.awaitCompletion(bulkConnection, jobInfo, batchInfoList,
errorHandling.equals(ErrorHandling.SKIP));
SalesforceBulkUtil.checkResults(bulkConnection, jobInfo, batchInfoList, errorHandling.equals(ErrorHandling.SKIP));
} catch (AsyncApiException | ConditionTimeoutException | BulkAPIBatchException e) {
} catch (AsyncApiException | ConditionTimeoutException e) {
throw new RuntimeException(String.format("Failed to check the result of a batch for writes: %s",
e.getMessage()), e);
} catch (BulkAPIBatchException e) {
// This exception will be thrown if batch got failed or any record insertion got failed and
// user has selected Fail on Error option in Error Handling.
// In that case, abort the job to avoid creating multiple batches due to spark task retries on failure.
try {
bulkConnection.abortJob(jobInfo.getId());
LOG.debug("Job aborted with Id: {}", jobInfo.getId());
} catch (AsyncApiException ex) {
throw new RuntimeException(String.format("Failed to abort job %s", jobInfo.getId()), e);
}
throw new RuntimeException(String.format("Batch write failed with error: %s", e.getMessage()), e);
} catch (Exception e) {
throw new RuntimeException(String.format("Pipeline Failed due to error: %s", e.getMessage()), e);
} finally {
Expand All @@ -174,6 +185,7 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException {
} catch (IOException ex) {
throw ex;
} finally {
csvBuffer.reset();
csvBuffer.close();
}
}
Expand Down

0 comments on commit 07222d9

Please sign in to comment.