Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 15, 2024
1 parent 1b8dd5e commit 542c3a6
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ private void linkPipelinePlan(
DataSink sink = senderPlan.getFragmentJob().getFragment().getSink();
if (sink instanceof MultiCastDataSink) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) sink;
receiverPlan.getFragmentJob().getFragment().setOutputPartition(multiCastDataSink.getOutputPartition());
for (DataStreamSink realSink : multiCastDataSink.getDataStreamSinks()) {
if (realSink.getExchNodeId() == linkNode.getId()) {
senderPlan.addDestinations(realSink, receiverInstances);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public QueryProcessor asQueryProcessor() {
return coordinatorContext.asQueryProcessor();
}

public JobProcessor getJobProcessor() {
return coordinatorContext.getJobProcessor();
}

public LoadProcessor asLoadProcessor() {
return coordinatorContext.asLoadProcessor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public synchronized Status readCloneStatus() {
public synchronized Status updateStatusIfOk(Status newStatus) {
// If query is done, we will ignore their cancelled updates, and let the remote fragments to clean up async.
Status originStatus = readCloneStatus();
if (coordinator.isEof() && newStatus.isCancelled()) {
if (coordinator.getJobProcessor() instanceof QueryProcessor && coordinator.isEof()
&& newStatus.isCancelled()) {
return originStatus;
}
// nothing to update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ private static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToTh
Multiset<DistributedPlanWorker> workerProcessInstanceNum = computeInstanceNumPerWorker(distributedPlans);
Map<DistributedPlanWorker, TPipelineFragmentParamsList> fragmentsGroupByWorker = Maps.newLinkedHashMap();
int currentInstanceIndex = 0;
Map<Integer, TFileScanRangeParams> sharedFileScanRangeParams = Maps.newLinkedHashMap();
for (PipelineDistributedPlan currentFragmentPlan : distributedPlans) {
Map<Integer, TFileScanRangeParams> fileScanRangeParams = computeFileScanRangeParams(currentFragmentPlan);
sharedFileScanRangeParams.putAll(computeFileScanRangeParams(currentFragmentPlan));

Map<Integer, Integer> exchangeSenderNum = computeExchangeSenderNum(currentFragmentPlan);
Map<DistributedPlanWorker, TPipelineFragmentParams> workerToCurrentFragment = Maps.newLinkedHashMap();

Expand All @@ -110,7 +112,7 @@ private static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToTh
AssignedJob instanceJob = currentFragmentPlan.getInstanceJobs().get(instanceNumInCurrentFragment);
TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent(
currentFragmentPlan, instanceJob, workerToCurrentFragment,
exchangeSenderNum, fileScanRangeParams,
exchangeSenderNum, sharedFileScanRangeParams,
workerProcessInstanceNum, coordinatorContext);

TPipelineInstanceParams instanceParam = instanceToThrift(
Expand Down

0 comments on commit 542c3a6

Please sign in to comment.