diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 0153700863d3bef..43a3327a3788845 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -40,6 +40,7 @@ import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; @@ -139,7 +140,28 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys // set schema and partition info for tablet id shuffle exchange if (fragment.getPlanRoot() instanceof ExchangeNode && fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) { - DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink()); + DataSink childFragmentSink = fragment.getChild(0).getSink(); + DataStreamSink dataStreamSink = null; + if (childFragmentSink instanceof MultiCastDataSink) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink; + int outputExchangeId = (fragment.getPlanRoot()).getId().asInt(); + // which DataStreamSink link to the output exchangeNode? + for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) { + int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt(); + if (outputExchangeId == sinkExchangeId) { + dataStreamSink = currentDataStreamSink; + break; + } + } + if (dataStreamSink == null) { + throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink"); + } + } else if (childFragmentSink instanceof DataStreamSink) { + dataStreamSink = (DataStreamSink) childFragmentSink; + } else { + throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink); + } + Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get()); dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema( database.getId(), olapTableSink.getDstTable(), analyzer));