diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java index 59e6acaebeb97..b057af0477c42 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java @@ -145,6 +145,8 @@ public enum Type { private long startPreparingTimeMs; @SerializedName(value = "finishPreparingTimeMs") private long finishPreparingTimeMs; + @SerializedName(value = "commitTimeMs") + private long commitTimeMs; @SerializedName(value = "endTimeMs") private long endTimeMs; @SerializedName(value = "txnId") @@ -616,7 +618,7 @@ public void waitCoordFinishAndPrepareTxn(TransactionResult resp) { boolean exception = false; writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { if (state == State.CANCELLED) { resp.setOKMsg("txn could not be prepared because task state is: " + state + ", error_msg: " + errorMsg); @@ -688,7 +690,7 @@ public void commitTxn(TransactionResult resp) throws UserException { boolean exception = false; readLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { if (state == State.CANCELLED) { resp.setOKMsg("txn could not be committed because task state is: " + state + ", error_msg: " + errorMsg); @@ -868,7 +870,7 @@ public String cancelTask(String msg) { } readLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { if (state == State.CANCELLED) { return "cur task state is: " + state + ", error_msg: " + errorMsg; @@ -956,7 +958,7 @@ public boolean checkNeedPrepareTxn() { public void beforePrepared(TransactionState txnState) throws TransactionException { writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { throw new TransactionException("txn could not be prepared because task state is: " + state); } } finally { @@ -1000,7 +1002,7 @@ public void replayOnPrepared(TransactionState txnState) { public void beforeCommitted(TransactionState txnState) throws TransactionException { writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { throw new TransactionException("txn could not be commited because task state is: " + state); } isCommitting = true; @@ -1026,8 +1028,8 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw this.channels.set(i, State.COMMITED); } this.state = State.COMMITED; + commitTimeMs = System.currentTimeMillis(); isCommitting = false; - endTimeMs = System.currentTimeMillis(); } finally { writeUnlock(); // sync stream load related query info should unregister here @@ -1118,8 +1120,8 @@ public void replayOnCommitted(TransactionState txnState) { this.channels.set(i, State.COMMITED); } this.state = State.COMMITED; + commitTimeMs = txnState.getCommitTime(); this.preparedChannelNum = this.channelNum; - this.endTimeMs = txnState.getCommitTime(); } finally { writeUnlock(); } @@ -1138,7 +1140,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { return; } if (coord != null && !isSyncStreamLoad) { @@ -1279,11 +1281,19 @@ public long createTimeMs() { return createTimeMs; } + public long commitTimeMs() { + return commitTimeMs; + } + public long endTimeMs() { return endTimeMs; } public boolean isFinalState() { + return state == State.CANCELLED || state == State.FINISHED; + } + + public boolean isUnreversibleState() { return state == State.CANCELLED || state == State.COMMITED || state == State.FINISHED; } @@ -1528,7 +1538,7 @@ public TLoadInfo toThrift() { info.setCreate_time(TimeUtils.longToTimeString(createTimeMs)); info.setLoad_start_time(TimeUtils.longToTimeString(startLoadingTimeMs)); - info.setLoad_commit_time(TimeUtils.longToTimeString(finishPreparingTimeMs)); + info.setLoad_commit_time(TimeUtils.longToTimeString(commitTimeMs)); info.setLoad_finish_time(TimeUtils.longToTimeString(endTimeMs)); info.setType(getStringByType()); diff --git a/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java index 34439ac4e5bff..66a356163c442 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java @@ -300,14 +300,13 @@ public void testStreamLoadTaskAfterCommit() throws UserException { TransactionState state = new TransactionState(); task.afterCommitted(state, true); - Assert.assertNotEquals(-1, task.endTimeMs()); + Assert.assertNotEquals(-1, task.commitTimeMs()); - state.setCommitTime(task.endTimeMs()); - task.replayOnCommitted(state); - Assert.assertEquals(task.endTimeMs(), state.getCommitTime()); + Assert.assertTrue(task.isUnreversibleState()); + Assert.assertFalse(task.isFinalState()); streamLoadManager.cleanSyncStreamLoadTasks(); - Assert.assertEquals(0, streamLoadManager.getStreamLoadTaskCount()); + Assert.assertEquals(1, streamLoadManager.getStreamLoadTaskCount()); } }