Skip to content

Commit

Permalink
[BugFix] Fix null exception when insert overwrite job run concurrecy …
Browse files Browse the repository at this point in the history
…(backport #50628) (#50984)

Co-authored-by: zhangqiang <[email protected]>
  • Loading branch information
mergify[bot] and sevev committed Sep 18, 2024
1 parent 67a12c7 commit 476d45b
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class InsertOverwriteJob {
@SerializedName(value = "warehouseId")
private long warehouseId;

@SerializedName(value = "sourcePartitionNames")
private List<String> sourcePartitionNames;


private transient InsertStmt insertStmt;

Expand Down Expand Up @@ -89,6 +92,14 @@ public void setSourcePartitionIds(List<Long> sourcePartitionIds) {
this.sourcePartitionIds = sourcePartitionIds;
}

public List<String> getSourcePartitionNames() {
return sourcePartitionNames;
}

public void setSourcePartitionNames(List<String> sourcePartitionNames) {
this.sourcePartitionNames = sourcePartitionNames;
}

public List<Long> getTmpPartitionIds() {
return tmpPartitionIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -183,6 +184,7 @@ public void replayStateChange(InsertOverwriteStateChangeInfo info) {
case OVERWRITE_RUNNING:
job.setSourcePartitionIds(info.getSourcePartitionIds());
job.setTmpPartitionIds(info.getTmpPartitionIds());
job.setSourcePartitionNames(info.getSourcePartitionNames());
job.setJobState(InsertOverwriteJobState.OVERWRITE_RUNNING);
break;
case OVERWRITE_SUCCESS:
Expand Down Expand Up @@ -225,8 +227,21 @@ private void prepare() throws Exception {
}

try {
OlapTable targetTable;
targetTable = checkAndGetTable(db, tableId);
List<String> sourcePartitionNames = Lists.newArrayList();
for (Long partitionId : job.getSourcePartitionIds()) {
Partition partition = targetTable.getPartition(partitionId);
if (partition == null) {
throw new DmlException("partition id:%s does not exist in table id:%s", partitionId, tableId);
}
sourcePartitionNames.add(partition.getName());
}
job.setSourcePartitionNames(sourcePartitionNames);

InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(),
InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getTmpPartitionIds());
InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getSourcePartitionNames(),
job.getTmpPartitionIds());
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info);
} finally {
locker.unLockDatabase(db, tableId, LockType.WRITE);
Expand Down Expand Up @@ -405,7 +420,8 @@ private void gc(boolean isReplay) {
sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete);

InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(),
OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getTmpPartitionIds());
OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getSourcePartitionNames(),
job.getTmpPartitionIds());
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info);
}
} catch (Exception e) {
Expand All @@ -429,9 +445,18 @@ private void doCommit(boolean isReplay) {
// try exception to release write lock finally
final OlapTable targetTable = checkAndGetTable(db, tableId);
tmpTargetTable = targetTable;
List<String> sourcePartitionNames = job.getSourcePartitionIds().stream()
.map(partitionId -> targetTable.getPartition(partitionId).getName())
.collect(Collectors.toList());
List<String> sourcePartitionNames = job.getSourcePartitionNames();
if (sourcePartitionNames == null || sourcePartitionNames.isEmpty()) {
sourcePartitionNames = new ArrayList<>();
for (Long partitionId : job.getSourcePartitionIds()) {
Partition partition = targetTable.getPartition(partitionId);
if (partition == null) {
throw new DmlException("Partition id:%s does not exist in table id:%s", partitionId, tableId);
} else {
sourcePartitionNames.add(partition.getName());
}
}
}
List<String> tmpPartitionNames = job.getTmpPartitionIds().stream()
.map(partitionId -> targetTable.getPartition(partitionId).getName())
.collect(Collectors.toList());
Expand All @@ -457,7 +482,8 @@ private void doCommit(boolean isReplay) {
sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete);

InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(),
InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getTmpPartitionIds());
InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getSourcePartitionNames(),
job.getTmpPartitionIds());
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info);

try {
Expand Down Expand Up @@ -529,4 +555,8 @@ private OlapTable checkAndGetTable(Database db, long tableId) {
Preconditions.checkState(table instanceof OlapTable);
return (OlapTable) table;
}

protected void testDoCommit(boolean isReplay) {
doCommit(isReplay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ public class InsertOverwriteStateChangeInfo implements Writable {
@SerializedName(value = "tmpPartitionIds")
private List<Long> tmpPartitionIds;

@SerializedName(value = "sourcePartitionNames")
private List<String> sourcePartitionNames = null;

public InsertOverwriteStateChangeInfo(long jobId, InsertOverwriteJobState fromState,
InsertOverwriteJobState toState,
List<Long> sourcePartitionIds, List<Long> tmpPartitionIds) {
List<Long> sourcePartitionIds,
List<String> sourcePartitionNames,
List<Long> tmpPartitionIds) {
this.jobId = jobId;
this.fromState = fromState;
this.toState = toState;
this.sourcePartitionIds = sourcePartitionIds;
this.sourcePartitionNames = sourcePartitionNames;
this.tmpPartitionIds = tmpPartitionIds;
}

Expand All @@ -72,6 +78,10 @@ public List<Long> getTmpPartitionIds() {
return tmpPartitionIds;
}

public List<String> getSourcePartitionNames() {
return sourcePartitionNames;
}

@Override
public String toString() {
return "InsertOverwriteStateChangeInfo{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ public void testReplay() throws Exception {
List<Long> newPartitionNames = Lists.newArrayList(10001L);
InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(1100L,
InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING,
sourcePartitionNames, newPartitionNames);
sourcePartitionNames, null, newPartitionNames);
insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo);

InsertOverwriteStateChangeInfo stateChangeInfo2 = new InsertOverwriteStateChangeInfo(1100L,
InsertOverwriteJobState.OVERWRITE_RUNNING, InsertOverwriteJobState.OVERWRITE_SUCCESS,
sourcePartitionNames, newPartitionNames);
sourcePartitionNames, null, newPartitionNames);
insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.common.DmlException;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.sql.SQLException;

Expand Down Expand Up @@ -97,7 +99,7 @@ public void testReplayInsertOverwrite() {
Lists.newArrayList(olapTable.getPartition("t1").getId()));
InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L,
InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING,
Lists.newArrayList(2000L), Lists.newArrayList(2001L));
Lists.newArrayList(2000L), null, Lists.newArrayList(2001L));
Assert.assertEquals(100L, stateChangeInfo.getJobId());
Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_PENDING, stateChangeInfo.getFromState());
Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_RUNNING, stateChangeInfo.getToState());
Expand Down Expand Up @@ -139,4 +141,23 @@ public void testInsertOverwriteWithDuplicatePartitions() throws SQLException {
String sql = "insert overwrite t3 partitions(p1, p1) select * from t4";
cluster.runSql("insert_overwrite_test", sql);
}

@Test
public void testInsertOverwriteConcurrencyWithSamePartitions() throws Exception {
Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("insert_overwrite_test");
Table table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getFullName(), "t1");
Assert.assertTrue(table instanceof OlapTable);
OlapTable olapTable = (OlapTable) table;
InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(100L, database.getId(), olapTable.getId(),
Lists.newArrayList(olapTable.getPartition("t1").getId()));
InsertOverwriteJobRunner runner = new InsertOverwriteJobRunner(insertOverwriteJob);

connectContext.getSessionVariable().setOptimizerExecuteTimeout(300000000);
String sql = "insert overwrite t1 partitions(t1) select * from t2";
cluster.runSql("insert_overwrite_test", sql);

Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false));
insertOverwriteJob.setSourcePartitionNames(Lists.newArrayList("t1"));
Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testBasic() throws IOException {
List<Long> newPartitionNames = Lists.newArrayList(1000L, 1001L);
InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L,
InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING,
sourcePartitionNames, newPartitionNames);
sourcePartitionNames, null, newPartitionNames);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
stateChangeInfo.write(dataOutputStream);
Expand Down

0 comments on commit 476d45b

Please sign in to comment.