From 9a1d8732ab9a27a9b9ec55b68cec7d7d8b18d03f Mon Sep 17 00:00:00 2001 From: sevev Date: Mon, 2 Sep 2024 21:00:52 +0800 Subject: [PATCH 1/4] BugFix: fix null exception when insert overwrite job run concurrency Signed-off-by: sevev --- .../starrocks/load/InsertOverwriteJob.java | 11 ++++++ .../load/InsertOverwriteJobRunner.java | 35 +++++++++++++++++-- .../InsertOverwriteStateChangeInfo.java | 12 ++++++- .../load/InsertOverwriteJobManagerTest.java | 4 +-- .../load/InsertOverwriteJobRunnerTest.java | 2 +- .../InsertOverwriteStateChangeInfoTest.java | 2 +- 6 files changed, 58 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java index 5c7ebc809b13c..e6715231debac 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java @@ -42,6 +42,9 @@ public class InsertOverwriteJob { @SerializedName(value = "warehouseId") private long warehouseId; + @SerializedName(value = "sourcePartitionNames") + private List sourcePartitionNames; + private transient InsertStmt insertStmt; @@ -89,6 +92,14 @@ public void setSourcePartitionIds(List sourcePartitionIds) { this.sourcePartitionIds = sourcePartitionIds; } + public List getSourcePartitionNames() { + return sourcePartitionNames; + } + + public void setSourcePartitionNames(List sourcePartitionNames) { + this.sourcePartitionNames = sourcePartitionNames; + } + public List getTmpPartitionIds() { return tmpPartitionIds; } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java index f8683c9ab83e2..00703b0f75087 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java @@ -183,6 +183,7 @@ public void replayStateChange(InsertOverwriteStateChangeInfo info) { case OVERWRITE_RUNNING: job.setSourcePartitionIds(info.getSourcePartitionIds()); job.setTmpPartitionIds(info.getTmpPartitionIds()); + job.setSourcePartitionNames(info.getSourcePartitionNames()); job.setJobState(InsertOverwriteJobState.OVERWRITE_RUNNING); break; case OVERWRITE_SUCCESS: @@ -225,8 +226,21 @@ private void prepare() throws Exception { } try { + OlapTable targetTable; + targetTable = checkAndGetTable(db, tableId); + List sourcePartitionNames = Lists.newArrayList(); + for (Long id : job.getSourcePartitionIds()) { + Partition partition = targetTable.getPartition(partitionId); + if (partition == null) { + throw new DmlException("partition id:%s does not exist in table id:%s", partitionId, tableId); + } + sourcePartitionNames.add(partition.getName()); + } + job.setSourcePartitionNames(sourcePartitionNames); + InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), - InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getTmpPartitionIds()); + InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getSourcePartitionNames(), + job.getTmpPartitionIds()); GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); } finally { locker.unLockDatabase(db, tableId, LockType.WRITE); @@ -405,7 +419,8 @@ private void gc(boolean isReplay) { sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete); InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), - OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getTmpPartitionIds()); + OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getSourcePartitionNames(), + job.getTmpPartitionIds()); GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); } } catch (Exception e) { @@ -429,6 +444,19 @@ private void doCommit(boolean isReplay) { // try exception to release write lock finally final OlapTable targetTable = checkAndGetTable(db, tableId); tmpTargetTable = targetTable; + List sourcePartitionNames = job.getSourcePartitionNames(); + if (sourcePartitionNames == null || sourcePartitionNames.isEmpty()) { + sourcePartitionNames = job.getSourcePartitionIds().stream() + .map(partitionId -> { + Partition partition = targetTable.getPartition(partitionId); + if (partition == null) { + throw new DmlException("Partition id:%s does not exist in table id:%s", partitionId, tableId); + } + return partition.getName(); + }) + .collect(Collectors.toList()); + } + List sourcePartitionNames = job.getSourcePartitionIds().stream() .map(partitionId -> targetTable.getPartition(partitionId).getName()) .collect(Collectors.toList()); @@ -457,7 +485,8 @@ private void doCommit(boolean isReplay) { sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete); InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), - InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getTmpPartitionIds()); + InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getSourcePartitionNames(), + job.getTmpPartitionIds()); GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); try { diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java b/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java index 63a41a1bbb0bf..b64a8773ffe25 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java @@ -42,13 +42,19 @@ public class InsertOverwriteStateChangeInfo implements Writable { @SerializedName(value = "tmpPartitionIds") private List tmpPartitionIds; + @SerializedName(value = "sourcePartitionNames") + private List sourcePartitionNames = null; + public InsertOverwriteStateChangeInfo(long jobId, InsertOverwriteJobState fromState, InsertOverwriteJobState toState, - List sourcePartitionIds, List tmpPartitionIds) { + List sourcePartitionIds, + List sourcePartitionNames, + List tmpPartitionIds) { this.jobId = jobId; this.fromState = fromState; this.toState = toState; this.sourcePartitionIds = sourcePartitionIds; + this.sourcePartitionNames = sourcePartitionNames; this.tmpPartitionIds = tmpPartitionIds; } @@ -72,6 +78,10 @@ public List getTmpPartitionIds() { return tmpPartitionIds; } + public List getSourcePartitionNames() { + return sourcePartitionNames; + } + @Override public String toString() { return "InsertOverwriteStateChangeInfo{" + diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java index 99afec42dec20..c84e4eabe08bd 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java @@ -132,12 +132,12 @@ public void testReplay() throws Exception { List newPartitionNames = Lists.newArrayList(10001L); InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(1100L, InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING, - sourcePartitionNames, newPartitionNames); + sourcePartitionNames, null, newPartitionNames); insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo); InsertOverwriteStateChangeInfo stateChangeInfo2 = new InsertOverwriteStateChangeInfo(1100L, InsertOverwriteJobState.OVERWRITE_RUNNING, InsertOverwriteJobState.OVERWRITE_SUCCESS, - sourcePartitionNames, newPartitionNames); + sourcePartitionNames, null, newPartitionNames); insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo2); } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java index 944cf9c976db4..9570bec04a303 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java @@ -97,7 +97,7 @@ public void testReplayInsertOverwrite() { Lists.newArrayList(olapTable.getPartition("t1").getId())); InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L, InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING, - Lists.newArrayList(2000L), Lists.newArrayList(2001L)); + Lists.newArrayList(2000L), null, Lists.newArrayList(2001L)); Assert.assertEquals(100L, stateChangeInfo.getJobId()); Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_PENDING, stateChangeInfo.getFromState()); Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_RUNNING, stateChangeInfo.getToState()); diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java index 9cc37af4c72f1..2f5c31cfa05d3 100644 --- a/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java @@ -35,7 +35,7 @@ public void testBasic() throws IOException { List newPartitionNames = Lists.newArrayList(1000L, 1001L); InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L, InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING, - sourcePartitionNames, newPartitionNames); + sourcePartitionNames, null, newPartitionNames); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(outputStream); stateChangeInfo.write(dataOutputStream); From 75525efd6e6e1c16f07a1ae9a5b7631b124723bb Mon Sep 17 00:00:00 2001 From: sevev Date: Tue, 3 Sep 2024 17:31:02 +0800 Subject: [PATCH 2/4] add ut Signed-off-by: sevev --- .../load/InsertOverwriteJobRunner.java | 29 ++++++++++--------- .../load/InsertOverwriteJobRunnerTest.java | 21 ++++++++++++++ 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java index 00703b0f75087..dc1f65db5f5b6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java @@ -54,6 +54,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -229,7 +230,7 @@ private void prepare() throws Exception { OlapTable targetTable; targetTable = checkAndGetTable(db, tableId); List sourcePartitionNames = Lists.newArrayList(); - for (Long id : job.getSourcePartitionIds()) { + for (Long partitionId : job.getSourcePartitionIds()) { Partition partition = targetTable.getPartition(partitionId); if (partition == null) { throw new DmlException("partition id:%s does not exist in table id:%s", partitionId, tableId); @@ -446,20 +447,16 @@ private void doCommit(boolean isReplay) { tmpTargetTable = targetTable; List sourcePartitionNames = job.getSourcePartitionNames(); if (sourcePartitionNames == null || sourcePartitionNames.isEmpty()) { - sourcePartitionNames = job.getSourcePartitionIds().stream() - .map(partitionId -> { - Partition partition = targetTable.getPartition(partitionId); - if (partition == null) { - throw new DmlException("Partition id:%s does not exist in table id:%s", partitionId, tableId); - } - return partition.getName(); - }) - .collect(Collectors.toList()); + sourcePartitionNames = new ArrayList<>(); + for (Long partitionId : job.getSourcePartitionIds()) { + Partition partition = targetTable.getPartition(partitionId); + if (partition == null) { + throw new DmlException("Partition id:%s does not exist in table id:%s", partitionId, tableId); + } else { + sourcePartitionNames.add(partition.getName()); + } + } } - - List sourcePartitionNames = job.getSourcePartitionIds().stream() - .map(partitionId -> targetTable.getPartition(partitionId).getName()) - .collect(Collectors.toList()); List tmpPartitionNames = job.getTmpPartitionIds().stream() .map(partitionId -> targetTable.getPartition(partitionId).getName()) .collect(Collectors.toList()); @@ -558,4 +555,8 @@ private OlapTable checkAndGetTable(Database db, long tableId) { Preconditions.checkState(table instanceof OlapTable); return (OlapTable) table; } + + public void testDoCommit(boolean isReplay) { + doCommit(isReplay); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java index 9570bec04a303..6ba39efefc051 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java @@ -28,11 +28,13 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ast.InsertStmt; +//import com.starrocks.sql.common.DmlException; import com.starrocks.utframe.StarRocksAssert; import com.starrocks.utframe.UtFrameUtils; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +//import org.junit.jupiter.api.Assertions; import java.sql.SQLException; @@ -139,4 +141,23 @@ public void testInsertOverwriteWithDuplicatePartitions() throws SQLException { String sql = "insert overwrite t3 partitions(p1, p1) select * from t4"; cluster.runSql("insert_overwrite_test", sql); } + + @Test + public void testInsertOverwriteConcurrencyWithSamePartitions() throws Exception { + Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("insert_overwrite_test"); + Table table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getFullName(), "t1"); + Assert.assertTrue(table instanceof OlapTable); + OlapTable olapTable = (OlapTable) table; + InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(100L, database.getId(), olapTable.getId(), + Lists.newArrayList(olapTable.getPartition("t1").getId())); + InsertOverwriteJobRunner runner = new InsertOverwriteJobRunner(insertOverwriteJob); + + connectContext.getSessionVariable().setOptimizerExecuteTimeout(300000000); + String sql = "insert overwrite t1 partitions(t1) select * from t2"; + cluster.runSql("insert_overwrite_test", sql); + + Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false)); + insertOverwriteJob.setSourcePartitionNames(Lists.newArrayList("t1")); + Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false)); + } } From 811e141e894d99da97249b4da414ff6c0134bf7b Mon Sep 17 00:00:00 2001 From: sevev Date: Tue, 3 Sep 2024 18:45:29 +0800 Subject: [PATCH 3/4] update Signed-off-by: sevev --- .../java/com/starrocks/load/InsertOverwriteJobRunnerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java index 6ba39efefc051..182d93386374f 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java @@ -28,13 +28,13 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ast.InsertStmt; -//import com.starrocks.sql.common.DmlException; +import com.starrocks.sql.common.DmlException; import com.starrocks.utframe.StarRocksAssert; import com.starrocks.utframe.UtFrameUtils; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -//import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assertions; import java.sql.SQLException; From 1b3dab55f060ab1ffbf20731ad45aed1e5c2b5c7 Mon Sep 17 00:00:00 2001 From: sevev Date: Thu, 5 Sep 2024 14:58:41 +0800 Subject: [PATCH 4/4] update Signed-off-by: sevev --- .../main/java/com/starrocks/load/InsertOverwriteJobRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java index dc1f65db5f5b6..0cb54470e487c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java @@ -556,7 +556,7 @@ private OlapTable checkAndGetTable(Database db, long tableId) { return (OlapTable) table; } - public void testDoCommit(boolean isReplay) { + protected void testDoCommit(boolean isReplay) { doCommit(isReplay); } }