Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jul 30, 2024
1 parent 39f9f1c commit 7b08447
Showing 1 changed file with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -139,7 +140,28 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys
// set schema and partition info for tablet id shuffle exchange
if (fragment.getPlanRoot() instanceof ExchangeNode
&& fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink());
DataSink childFragmentSink = fragment.getChild(0).getSink();
DataStreamSink dataStreamSink = null;
if (childFragmentSink instanceof MultiCastDataSink) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink;
int outputExchangeId = (fragment.getPlanRoot()).getId().asInt();
// which DataStreamSink link to the output exchangeNode?
for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) {
int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt();
if (outputExchangeId == sinkExchangeId) {
dataStreamSink = currentDataStreamSink;
break;
}
}
if (dataStreamSink == null) {
throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink");
}
} else if (childFragmentSink instanceof DataStreamSink) {
dataStreamSink = (DataStreamSink) childFragmentSink;
} else {
throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink);
}

Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
database.getId(), olapTableSink.getDstTable(), analyzer));
Expand Down

0 comments on commit 7b08447

Please sign in to comment.