Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix null exception when insert overwrite job run concurrecy #50628

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class InsertOverwriteJob {
@SerializedName(value = "warehouseId")
private long warehouseId;

@SerializedName(value = "sourcePartitionNames")
private List<String> sourcePartitionNames;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to the back of sourcePartitionIds



private transient InsertStmt insertStmt;

Expand Down Expand Up @@ -89,6 +92,14 @@ public void setSourcePartitionIds(List<Long> sourcePartitionIds) {
this.sourcePartitionIds = sourcePartitionIds;
}

public List<String> getSourcePartitionNames() {
return sourcePartitionNames;
}

public void setSourcePartitionNames(List<String> sourcePartitionNames) {
this.sourcePartitionNames = sourcePartitionNames;
}

public List<Long> getTmpPartitionIds() {
return tmpPartitionIds;
}
sevev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -183,6 +184,7 @@ public void replayStateChange(InsertOverwriteStateChangeInfo info) {
case OVERWRITE_RUNNING:
job.setSourcePartitionIds(info.getSourcePartitionIds());
job.setTmpPartitionIds(info.getTmpPartitionIds());
job.setSourcePartitionNames(info.getSourcePartitionNames());
job.setJobState(InsertOverwriteJobState.OVERWRITE_RUNNING);
break;
case OVERWRITE_SUCCESS:
Expand Down Expand Up @@ -225,8 +227,21 @@ private void prepare() throws Exception {
}

try {
OlapTable targetTable;
targetTable = checkAndGetTable(db, tableId);
List<String> sourcePartitionNames = Lists.newArrayList();
for (Long partitionId : job.getSourcePartitionIds()) {
Partition partition = targetTable.getPartition(partitionId);
if (partition == null) {
throw new DmlException("partition id:%s does not exist in table id:%s", partitionId, tableId);
}
sourcePartitionNames.add(partition.getName());
}
job.setSourcePartitionNames(sourcePartitionNames);

InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(),
InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getTmpPartitionIds());
InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getSourcePartitionNames(),
job.getTmpPartitionIds());
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info);
} finally {
locker.unLockDatabase(db, tableId, LockType.WRITE);
Expand Down Expand Up @@ -405,7 +420,8 @@ private void gc(boolean isReplay) {
sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete);

InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(),
OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getTmpPartitionIds());
OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getSourcePartitionNames(),
job.getTmpPartitionIds());
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info);
}
} catch (Exception e) {
Expand All @@ -429,9 +445,18 @@ private void doCommit(boolean isReplay) {
// try exception to release write lock finally
final OlapTable targetTable = checkAndGetTable(db, tableId);
tmpTargetTable = targetTable;
List<String> sourcePartitionNames = job.getSourcePartitionIds().stream()
.map(partitionId -> targetTable.getPartition(partitionId).getName())
.collect(Collectors.toList());
List<String> sourcePartitionNames = job.getSourcePartitionNames();
meegoo marked this conversation as resolved.
Show resolved Hide resolved
if (sourcePartitionNames == null || sourcePartitionNames.isEmpty()) {
sourcePartitionNames = new ArrayList<>();
for (Long partitionId : job.getSourcePartitionIds()) {
Partition partition = targetTable.getPartition(partitionId);
if (partition == null) {
throw new DmlException("Partition id:%s does not exist in table id:%s", partitionId, tableId);
} else {
sourcePartitionNames.add(partition.getName());
}
}
}
List<String> tmpPartitionNames = job.getTmpPartitionIds().stream()
.map(partitionId -> targetTable.getPartition(partitionId).getName())
.collect(Collectors.toList());
Expand All @@ -457,7 +482,8 @@ private void doCommit(boolean isReplay) {
sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete);

InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(),
InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getTmpPartitionIds());
InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getSourcePartitionNames(),
job.getTmpPartitionIds());
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info);

try {
Expand Down Expand Up @@ -529,4 +555,8 @@ private OlapTable checkAndGetTable(Database db, long tableId) {
Preconditions.checkState(table instanceof OlapTable);
return (OlapTable) table;
}

protected void testDoCommit(boolean isReplay) {
doCommit(isReplay);
}
}
sevev marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ public class InsertOverwriteStateChangeInfo implements Writable {
@SerializedName(value = "tmpPartitionIds")
private List<Long> tmpPartitionIds;

@SerializedName(value = "sourcePartitionNames")
private List<String> sourcePartitionNames = null;

public InsertOverwriteStateChangeInfo(long jobId, InsertOverwriteJobState fromState,
InsertOverwriteJobState toState,
List<Long> sourcePartitionIds, List<Long> tmpPartitionIds) {
List<Long> sourcePartitionIds,
List<String> sourcePartitionNames,
List<Long> tmpPartitionIds) {
this.jobId = jobId;
this.fromState = fromState;
this.toState = toState;
this.sourcePartitionIds = sourcePartitionIds;
this.sourcePartitionNames = sourcePartitionNames;
this.tmpPartitionIds = tmpPartitionIds;
}

Expand All @@ -72,6 +78,10 @@ public List<Long> getTmpPartitionIds() {
return tmpPartitionIds;
}

public List<String> getSourcePartitionNames() {
return sourcePartitionNames;
}

@Override
public String toString() {
return "InsertOverwriteStateChangeInfo{" +
sevev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ public void testReplay() throws Exception {
List<Long> newPartitionNames = Lists.newArrayList(10001L);
InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(1100L,
InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING,
sourcePartitionNames, newPartitionNames);
sourcePartitionNames, null, newPartitionNames);
insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo);

InsertOverwriteStateChangeInfo stateChangeInfo2 = new InsertOverwriteStateChangeInfo(1100L,
InsertOverwriteJobState.OVERWRITE_RUNNING, InsertOverwriteJobState.OVERWRITE_SUCCESS,
sourcePartitionNames, newPartitionNames);
sourcePartitionNames, null, newPartitionNames);
insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.common.DmlException;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.sql.SQLException;

Expand Down Expand Up @@ -97,7 +99,7 @@ public void testReplayInsertOverwrite() {
Lists.newArrayList(olapTable.getPartition("t1").getId()));
InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L,
InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING,
Lists.newArrayList(2000L), Lists.newArrayList(2001L));
Lists.newArrayList(2000L), null, Lists.newArrayList(2001L));
Assert.assertEquals(100L, stateChangeInfo.getJobId());
Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_PENDING, stateChangeInfo.getFromState());
Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_RUNNING, stateChangeInfo.getToState());
Expand Down Expand Up @@ -139,4 +141,23 @@ public void testInsertOverwriteWithDuplicatePartitions() throws SQLException {
String sql = "insert overwrite t3 partitions(p1, p1) select * from t4";
cluster.runSql("insert_overwrite_test", sql);
}

@Test
public void testInsertOverwriteConcurrencyWithSamePartitions() throws Exception {
Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("insert_overwrite_test");
Table table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getFullName(), "t1");
Assert.assertTrue(table instanceof OlapTable);
OlapTable olapTable = (OlapTable) table;
InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(100L, database.getId(), olapTable.getId(),
Lists.newArrayList(olapTable.getPartition("t1").getId()));
InsertOverwriteJobRunner runner = new InsertOverwriteJobRunner(insertOverwriteJob);

connectContext.getSessionVariable().setOptimizerExecuteTimeout(300000000);
String sql = "insert overwrite t1 partitions(t1) select * from t2";
cluster.runSql("insert_overwrite_test", sql);

Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false));
insertOverwriteJob.setSourcePartitionNames(Lists.newArrayList("t1"));
Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testBasic() throws IOException {
List<Long> newPartitionNames = Lists.newArrayList(1000L, 1001L);
InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L,
InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING,
sourcePartitionNames, newPartitionNames);
sourcePartitionNames, null, newPartitionNames);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
stateChangeInfo.write(dataOutputStream);
Expand Down
Loading