Skip to content

Commit

Permalink
[BugFix] Fix the npe of show routine load (backport #50963) (#51141)
Browse files Browse the repository at this point in the history
Co-authored-by: trueeyu <[email protected]>
  • Loading branch information
mergify[bot] and trueeyu authored Sep 20, 2024
1 parent 30fd372 commit fe137ea
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob {
// pulsar properties, property prefix will be mapped to pulsar custom parameters, which can be extended in the future
@SerializedName("cpt")
private Map<String, String> customProperties = Maps.newHashMap();
private Map<String, String> convertedCustomProperties = Maps.newHashMap();
private final Map<String, String> convertedCustomProperties = Maps.newHashMap();

public static final String POSITION_EARLIEST = "POSITION_EARLIEST"; // 1
public static final String POSITION_LATEST = "POSITION_LATEST"; // 0
Expand All @@ -101,6 +101,8 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob {
public PulsarRoutineLoadJob() {
// for serialization, id is dummy
super(-1, LoadDataSourceType.PULSAR);
this.progress = new PulsarProgress();
this.timestampProgress = new PulsarProgress();
}

public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId,
Expand All @@ -110,6 +112,7 @@ public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId,
this.topic = topic;
this.subscription = subscription;
this.progress = new PulsarProgress();
this.timestampProgress = new PulsarProgress();
}

public String getTopic() {
Expand Down Expand Up @@ -262,8 +265,7 @@ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttac
// For compatible reason, the default behavior of empty load is still returning
// "No partitions have data available for loading" and abort transaction.
// In this situation, we also need update commit info.
if (txnStatusChangeReason != null &&
txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
if (txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
// Because the max_filter_ratio of routine load task is always 1.
// Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows".
// If no data is imported, the error "No partitions have data available for loading" may only be returned.
Expand Down Expand Up @@ -388,17 +390,17 @@ protected void unprotectUpdateCurrentPartitions(List<String> newCurrentPartition
@Override
protected String getStatistic() {
Map<String, Object> summary = Maps.newHashMap();
summary.put("totalRows", Long.valueOf(totalRows));
summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows));
summary.put("errorRows", Long.valueOf(errorRows));
summary.put("unselectedRows", Long.valueOf(unselectedRows));
summary.put("receivedBytes", Long.valueOf(receivedBytes));
summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs));
summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000));
summary.put("totalRows", totalRows);
summary.put("loadedRows", totalRows - errorRows - unselectedRows);
summary.put("errorRows", errorRows);
summary.put("unselectedRows", unselectedRows);
summary.put("receivedBytes", receivedBytes);
summary.put("taskExecuteTimeMs", totalTaskExcutionTimeMs);
summary.put("receivedBytesRate", receivedBytes / totalTaskExcutionTimeMs * 1000);
summary.put("loadRowsRate",
Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000));
summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
(totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000);
summary.put("committedTaskNum", committedTaskNum);
summary.put("abortedTaskNum", abortedTaskNum);
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1618,25 +1618,25 @@ public String jobPropertiesToSql() {
StringBuilder sb = new StringBuilder();
sb.append("(\n");
sb.append("\"").append(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY).append("\"=\"");
sb.append(String.valueOf(desireTaskConcurrentNum)).append("\",\n");
sb.append(desireTaskConcurrentNum).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxErrorNum)).append("\",\n");
sb.append(maxErrorNum).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxFilterRatio)).append("\",\n");
sb.append(maxFilterRatio).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY).append("\"=\"");
sb.append(String.valueOf(taskSchedIntervalS)).append("\",\n");
sb.append(taskSchedIntervalS).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxBatchRows)).append("\",\n");
sb.append(maxBatchRows).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.TASK_CONSUME_SECOND).append("\"=\"");
sb.append(String.valueOf(taskConsumeSecond)).append("\",\n");
sb.append(taskConsumeSecond).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.TASK_TIMEOUT_SECOND).append("\"=\"");
sb.append(String.valueOf(taskTimeoutSecond)).append("\",\n");
sb.append(taskTimeoutSecond).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.FORMAT).append("\"=\"");
sb.append(getFormat()).append("\",\n");
Expand All @@ -1645,27 +1645,27 @@ public String jobPropertiesToSql() {
sb.append(getJsonPaths()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY).append("\"=\"");
sb.append(Boolean.toString(isStripOuterArray())).append("\",\n");
sb.append(isStripOuterArray()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.JSONROOT).append("\"=\"");
sb.append(getJsonRoot()).append("\",\n");

sb.append("\"").append(LoadStmt.STRICT_MODE).append("\"=\"");
sb.append(Boolean.toString(isStrictMode())).append("\",\n");
sb.append(isStrictMode()).append("\",\n");

sb.append("\"").append(LoadStmt.TIMEZONE).append("\"=\"");
sb.append(getTimezone()).append("\",\n");

sb.append("\"").append(LoadStmt.PARTIAL_UPDATE).append("\"=\"");
sb.append(Boolean.toString(isPartialUpdate())).append("\",\n");
sb.append(isPartialUpdate()).append("\",\n");

if (getMergeCondition() != null) {
sb.append("\"").append(LoadStmt.MERGE_CONDITION).append("\"=\"");
sb.append(getMergeCondition()).append("\",\n");
}

sb.append("\"").append(CreateRoutineLoadStmt.TRIMSPACE).append("\"=\"");
sb.append(Boolean.toString(isTrimspace())).append("\",\n");
sb.append(isTrimspace()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.ENCLOSE).append("\"=\"");
sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEnclose()))).append("\",\n");
Expand All @@ -1674,7 +1674,7 @@ public String jobPropertiesToSql() {
sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEscape()))).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.LOG_REJECTED_RECORD_NUM_PROPERTY).append("\"=\"");
sb.append(String.valueOf(getLogRejectedRecordNum()));
sb.append(getLogRejectedRecordNum());

if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
sb.append("\",\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ void writeUnlock() {
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);

Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertTrue(routineLoadJob.getOtherMsg(), routineLoadJob.getOtherMsg().endsWith(txnStatusChangeReasonString));

Assert.assertEquals(new Long(prevValue + 1), entity.counterRoutineLoadAbortedTasksTotal.getValue());
Assert.assertEquals(Long.valueOf(prevValue + 1), entity.counterRoutineLoadAbortedTasksTotal.getValue());

routineLoadTaskInfoList.clear();
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
Assert.assertEquals(new Long(2), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertEquals(new Long(prevValue + 2), entity.counterRoutineLoadAbortedTasksTotal.getValue());
Assert.assertEquals(Long.valueOf(2), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertEquals(Long.valueOf(prevValue + 2), entity.counterRoutineLoadAbortedTasksTotal.getValue());
}

@Test
Expand Down Expand Up @@ -235,13 +235,43 @@ void writeUnlock() {
routineLoadJob.afterCommitted(transactionState, true);

Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "committedTaskNum"));
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "committedTaskNum"));

Assert.assertEquals(new Long(prevValue + 1), entity.counterRoutineLoadCommittedTasksTotal.getValue());
Assert.assertEquals(Long.valueOf(prevValue + 1), entity.counterRoutineLoadCommittedTasksTotal.getValue());
}

@Test
public void testGetShowInfo() throws UserException {
public void testPulsarGetShowInfo() {
{
// PAUSE state
PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR,
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);

List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))
.anyMatch(entity -> entity.equals(errorReason.toString())));
}

{
// PAUSE state
PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob(
1L, "task1", 1, 1, "http://url", "task-1", "sub-1");
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR,
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);

List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))
.anyMatch(entity -> entity.equals(errorReason.toString())));
}
}

@Test
public void testKafkaGetShowInfo() throws UserException {
{
// PAUSE state
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Expand Down

0 comments on commit fe137ea

Please sign in to comment.