-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] Fix null exception when insert overwrite job run concurrecy #50628
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ | |
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
@@ -183,6 +184,7 @@ public void replayStateChange(InsertOverwriteStateChangeInfo info) { | |
case OVERWRITE_RUNNING: | ||
job.setSourcePartitionIds(info.getSourcePartitionIds()); | ||
job.setTmpPartitionIds(info.getTmpPartitionIds()); | ||
job.setSourcePartitionNames(info.getSourcePartitionNames()); | ||
job.setJobState(InsertOverwriteJobState.OVERWRITE_RUNNING); | ||
break; | ||
case OVERWRITE_SUCCESS: | ||
|
@@ -225,8 +227,21 @@ private void prepare() throws Exception { | |
} | ||
|
||
try { | ||
OlapTable targetTable; | ||
targetTable = checkAndGetTable(db, tableId); | ||
List<String> sourcePartitionNames = Lists.newArrayList(); | ||
for (Long partitionId : job.getSourcePartitionIds()) { | ||
Partition partition = targetTable.getPartition(partitionId); | ||
if (partition == null) { | ||
throw new DmlException("partition id:%s does not exist in table id:%s", partitionId, tableId); | ||
} | ||
sourcePartitionNames.add(partition.getName()); | ||
} | ||
job.setSourcePartitionNames(sourcePartitionNames); | ||
|
||
InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), | ||
InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getTmpPartitionIds()); | ||
InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getSourcePartitionNames(), | ||
job.getTmpPartitionIds()); | ||
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); | ||
} finally { | ||
locker.unLockDatabase(db, tableId, LockType.WRITE); | ||
|
@@ -405,7 +420,8 @@ private void gc(boolean isReplay) { | |
sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete); | ||
|
||
InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), | ||
OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getTmpPartitionIds()); | ||
OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getSourcePartitionNames(), | ||
job.getTmpPartitionIds()); | ||
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); | ||
} | ||
} catch (Exception e) { | ||
|
@@ -429,9 +445,18 @@ private void doCommit(boolean isReplay) { | |
// try exception to release write lock finally | ||
final OlapTable targetTable = checkAndGetTable(db, tableId); | ||
tmpTargetTable = targetTable; | ||
List<String> sourcePartitionNames = job.getSourcePartitionIds().stream() | ||
.map(partitionId -> targetTable.getPartition(partitionId).getName()) | ||
.collect(Collectors.toList()); | ||
List<String> sourcePartitionNames = job.getSourcePartitionNames(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will still lead to behavioral changes. Is there any intention in the original design? @Astralidea There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not entirely sure, but I feel like it should be fine. |
||
if (sourcePartitionNames == null || sourcePartitionNames.isEmpty()) { | ||
sourcePartitionNames = new ArrayList<>(); | ||
for (Long partitionId : job.getSourcePartitionIds()) { | ||
Partition partition = targetTable.getPartition(partitionId); | ||
if (partition == null) { | ||
throw new DmlException("Partition id:%s does not exist in table id:%s", partitionId, tableId); | ||
} else { | ||
sourcePartitionNames.add(partition.getName()); | ||
} | ||
} | ||
} | ||
List<String> tmpPartitionNames = job.getTmpPartitionIds().stream() | ||
.map(partitionId -> targetTable.getPartition(partitionId).getName()) | ||
.collect(Collectors.toList()); | ||
|
@@ -457,7 +482,8 @@ private void doCommit(boolean isReplay) { | |
sourceTablets.forEach(GlobalStateMgr.getCurrentState().getTabletInvertedIndex()::markTabletForceDelete); | ||
|
||
InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), | ||
InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getTmpPartitionIds()); | ||
InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getSourcePartitionNames(), | ||
job.getTmpPartitionIds()); | ||
GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); | ||
|
||
try { | ||
|
@@ -529,4 +555,8 @@ private OlapTable checkAndGetTable(Database db, long tableId) { | |
Preconditions.checkState(table instanceof OlapTable); | ||
return (OlapTable) table; | ||
} | ||
|
||
protected void testDoCommit(boolean isReplay) { | ||
doCommit(isReplay); | ||
} | ||
} | ||
sevev marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to the back of
sourcePartitionIds