From 40e54c8632336f386b28c1601a3edeee2a71f437 Mon Sep 17 00:00:00 2001
From: 924060929 <924060929@qq.com>
Date: Thu, 9 May 2024 17:48:50 +0800
Subject: [PATCH] save
---
fe/fe-core/pom.xml | 6 ++
.../main/java/org/apache/doris/common/Id.java | 9 ++-
.../apache/doris/nereids/NereidsPlanner.java | 3 +-
.../plans/distribute/DistributePlanner.java | 2 +-
.../plans/distribute/FragmentIdMapping.java | 24 +++---
.../doris/nereids/worker/BackendWorker.java | 64 ++++++++++++++++
.../doris/nereids/worker/ReplicaLocation.java | 73 +++++++++++++++++++
.../worker/RoundRobinWorkerSelector.java | 2 +-
.../doris/nereids/worker/TabletTopology.java | 64 ++++++++++++++++
.../doris/nereids/worker/WorkerManager.java | 53 ++++++++++++--
.../doris/nereids/worker/job/AssignedJob.java | 5 --
.../nereids/worker/job/AssignedJobImpl.java | 27 ++++---
.../doris/nereids/worker/job/DataIds.java | 26 +++++++
.../nereids/worker/job/DataLocation.java | 2 +-
.../nereids/worker/job/ExchangeInputs.java | 24 +++---
.../doris/nereids/worker/job/ListDataIds.java | 41 +++++++++++
.../worker/job/UnassignedGatherJob.java | 8 --
.../nereids/worker/job/UnassignedJob.java | 13 +++-
.../worker/job/UnassignedJobBuilder.java | 2 +-
.../worker/job/UnassignedNearStorageJob.java | 15 ++++
.../job/UnassignedQueryConstantJob.java | 9 ---
.../job/UnassignedScanNativeTableJob.java | 15 ++--
.../job/UnassignedScanRemoteTableJob.java | 7 --
.../worker/job/UnassignedShuffleJob.java | 7 --
24 files changed, 410 insertions(+), 91 deletions(-)
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ReplicaLocation.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/TabletTopology.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataIds.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ListDataIds.java
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index a9f429a0c7a425e..f803c5e6e921534 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -803,6 +803,12 @@ under the License.
io.airlift
concurrent
+
+ org.codehaus.groovy
+ groovy-all
+ 3.0.7
+ compile
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Id.java b/fe/fe-core/src/main/java/org/apache/doris/common/Id.java
index 9d6dad50a462380..ef0cbb91cde5a8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Id.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Id.java
@@ -20,12 +20,14 @@
package org.apache.doris.common;
+import org.jetbrains.annotations.NotNull;
+
import java.util.ArrayList;
/**
* Integer ids that cannot accidentally be compared with ints.
*/
-public class Id> {
+public class Id> implements Comparable> {
protected final int id;
public Id(int id) {
@@ -62,4 +64,9 @@ public ArrayList asList() {
public String toString() {
return Integer.toString(id);
}
+
+ @Override
+ public int compareTo(@NotNull Id idTypeId) {
+ return id - idTypeId.id;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index abc6bac5fe95660..20f80a7fd952158 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -53,7 +53,6 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
-import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
@@ -366,7 +365,7 @@ private void distribute(LogicalPlanAdapter logicalPlanAdapter) throws UserExcept
return;
}
DistributePlanner distributePlanner = new DistributePlanner(fragments);
- distributePlanner.plan();
+ // distributePlanner.plan();
}
private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index 9da57d57df39af8..703997cf90c46d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -50,7 +50,7 @@ public Map plan() {
private Map buildDistributePlans(
Map idToUnassignedJobs,
ListMultimap idToAssignedJobs) {
- FragmentIdMapping idToDistributedPlans = new FragmentIdMapping<>(idToFragments.size());
+ FragmentIdMapping idToDistributedPlans = new FragmentIdMapping<>();
for (Entry kv : idToFragments.entrySet()) {
PlanFragmentId fragmentId = kv.getKey();
PlanFragment fragment = kv.getValue();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java
index 503708b26d02a98..59d43ad7969b9ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java
@@ -22,33 +22,33 @@
import com.google.common.collect.ImmutableList;
-import java.util.LinkedHashMap;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
/**
* FragmentIdMapping:
* key: PlanFragmentId
* value: T
+ *
+ * NOTE: this map should order by PlanFragmentId asc
*/
-public class FragmentIdMapping extends LinkedHashMap {
- public FragmentIdMapping(int initialCapacity, float loadFactor) {
- super(initialCapacity, loadFactor);
+public class FragmentIdMapping extends TreeMap {
+ public FragmentIdMapping() {
}
- public FragmentIdMapping(int initialCapacity) {
- super(initialCapacity);
- }
-
- public FragmentIdMapping() {
+ public FragmentIdMapping(Comparator super PlanFragmentId> comparator) {
+ super(comparator);
}
public FragmentIdMapping(Map extends PlanFragmentId, ? extends T> m) {
super(m);
}
- public FragmentIdMapping(int initialCapacity, float loadFactor, boolean accessOrder) {
- super(initialCapacity, loadFactor, accessOrder);
+ public FragmentIdMapping(SortedMap m) {
+ super(m);
}
public List getByChildrenFragments(PlanFragment fragment) {
@@ -61,7 +61,7 @@ public List getByChildrenFragments(PlanFragment fragment) {
}
public static FragmentIdMapping buildFragmentMapping(List fragments) {
- FragmentIdMapping idToFragments = new FragmentIdMapping<>(fragments.size());
+ FragmentIdMapping idToFragments = new FragmentIdMapping<>();
for (PlanFragment fragment : fragments) {
idToFragments.put(fragment.getFragmentId(), fragment);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java
new file mode 100644
index 000000000000000..e960a46877b3fea
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.worker;
+
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableList;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.List;
+
+/** BackendWorker */
+public class BackendWorker implements Worker {
+ private final Backend backend;
+
+ public BackendWorker(Backend backend) {
+ this.backend = backend;
+ }
+
+ @Override
+ public String address() {
+ return backend.getAddress();
+ }
+
+ @Override
+ public boolean available() {
+ return backend.isQueryAvailable();
+ }
+
+ @Override
+ public Workload workload() {
+ return new Workload() {
+ @Override
+ public int compareTo(@NotNull Workload workload) {
+ return 0;
+ }
+ };
+ }
+
+ @Override
+ public WorkerGroup team() {
+ return new WorkerGroup() {
+ @Override
+ public List filterWorkers(WorkerPool workerPool) {
+ return ImmutableList.of();
+ }
+ };
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ReplicaLocation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ReplicaLocation.java
new file mode 100644
index 000000000000000..553a749b1cf3dba
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ReplicaLocation.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.worker;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.nereids.worker.job.DataLocation;
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.ReplicaId;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Objects;
+
+public class ReplicaLocation implements DataLocation {
+ public final long backendId;
+ public final long tabletId;
+ public final String host;
+ public final int port;
+
+ public ReplicaLocation(long backendId, long tabletId, String host, int port) {
+ this.backendId = backendId;
+ this.tabletId = tabletId;
+ this.host = Objects.requireNonNull(host, "host can not be null");
+ this.port = port;
+ }
+
+ @Override
+ public DataId dataId() {
+ return new ReplicaId(backendId, host, port);
+ }
+
+ @Override
+ public String location() {
+ return host + ":" + port;
+ }
+
+ @Override
+ public Worker toWorker() {
+ try {
+ ImmutableMap backendsWithIdByCurrentCluster = Env.getCurrentSystemInfo()
+ .getBackendsWithIdByCurrentCluster();
+ Backend backend = backendsWithIdByCurrentCluster.get(backendId);
+ if (backend == null) {
+ throw new IllegalStateException("Backend " + backendId + " is not exist");
+ }
+ return new BackendWorker(backend);
+ } catch (Exception t) {
+ throw new NereidsException("Get worker failed: " + this + ". Cause: " + t, t);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "backendId=" + location() + ", tabletId=" + tabletId;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/RoundRobinWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/RoundRobinWorkerSelector.java
index 5344943bcbb1be2..f47a9e5abc8c263 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/RoundRobinWorkerSelector.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/RoundRobinWorkerSelector.java
@@ -48,7 +48,7 @@ public Multimap selectScanDatasource(UnassignedNearStorageJob un
for (Entry> entry : tabletToReplicas.entrySet()) {
List replicas = entry.getValue();
DataLocation selectedReplica = selectScanReplica(replicas);
- workerToReplicas.put(selectedReplica.toWorker(), selectedReplica.id());
+ workerToReplicas.put(selectedReplica.toWorker(), selectedReplica.dataId());
}
return workerToReplicas;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/TabletTopology.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/TabletTopology.java
new file mode 100644
index 000000000000000..c6f198d394b0387
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/TabletTopology.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.worker;
+
+import org.apache.doris.nereids.worker.job.DataLocation;
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.TabletId;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.ArrayListMultimap;
+
+import java.util.List;
+import java.util.Map;
+
+/** TabletTopology */
+public class TabletTopology implements DataTopology {
+ private Map> topology;
+
+ public TabletTopology(List olapScanNodes) {
+ ArrayListMultimap topology = ArrayListMultimap.create();
+ for (OlapScanNode olapScanNode : olapScanNodes) {
+ List tabletLocations = olapScanNode.getScanRangeLocations(0);
+ if (tabletLocations == null || tabletLocations.isEmpty()) {
+ continue;
+ }
+ for (TScanRangeLocations tabletLocation : tabletLocations) {
+ long tabletId = tabletLocation.getScanRange().getPaloScanRange().getTabletId();
+ List replicaLocations = tabletLocation.getLocations();
+ for (TScanRangeLocation replicaLocation : replicaLocations) {
+ long backendId = replicaLocation.getBackendId();
+ TNetworkAddress server = replicaLocation.getServer();
+ topology.put(
+ new TabletId(tabletId),
+ new ReplicaLocation(backendId, tabletId, server.getHostname(), server.getPort())
+ );
+ }
+ }
+ }
+ this.topology = (Map) topology.asMap();
+ }
+
+ @Override
+ public Map> dataToReplicas() {
+ return topology;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java
index 3a27815b18ba8b4..6258bbdf239a985 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java
@@ -18,19 +18,27 @@
package org.apache.doris.nereids.worker;
import org.apache.doris.nereids.worker.job.AssignedJob;
+import org.apache.doris.nereids.worker.job.AssignedJobImpl;
import org.apache.doris.nereids.worker.job.CustomAssignmentJob;
+import org.apache.doris.nereids.worker.job.DataIds;
import org.apache.doris.nereids.worker.job.DataLocation;
+import org.apache.doris.nereids.worker.job.ExchangeInputs.AllInputs;
+import org.apache.doris.nereids.worker.job.UnassignedGatherJob;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedLeafJob;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.ScanNode;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import java.util.Collection;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -47,18 +55,20 @@ default List offerJob(
return ((CustomAssignmentJob) unassignedJob).customAssignment(inputJobs);
} else if (unassignedJob instanceof UnassignedNearStorageJob) {
// scan native olap table, we can assign a worker near the storage
- return offerScanJob(workerSelector, (UnassignedNearStorageJob) unassignedJob);
+ return offerNearStorageJob(workerSelector, (UnassignedNearStorageJob) unassignedJob);
} else if (unassignedJob instanceof UnassignedLeafJob) {
// it should be a leaf job which not contains scan native olap table node,
// for example, select literal without table, or scan an external table
- return offerNonScanDatasourceJob(workerSelector, (UnassignedLeafJob) unassignedJob);
+ return offerRemoteStorageJob(workerSelector, (UnassignedLeafJob) unassignedJob);
+ } else if (unassignedJob instanceof UnassignedGatherJob) {
+ return offerGatherJob((UnassignedGatherJob) unassignedJob, inputJobs);
} else {
return offerExchangeJob(workerSelector, unassignedJob, inputJobs);
}
}
/** offerScanJob */
- default List offerScanJob(
+ default List offerNearStorageJob(
WorkerSelector workerSelector, UnassignedNearStorageJob unassignedNearStorageJob) {
DataTopology topology = unassignedNearStorageJob.usedDataTopology();
Map> tabletToReplicas = topology.dataToReplicas();
@@ -71,14 +81,18 @@ default List offerScanJob(
for (Entry> entry : workerToReplicas.asMap().entrySet()) {
Worker selectedWorker = entry.getKey();
List scanReplicaIds = (List) entry.getValue();
- AssignedJob instanceJob =
- unassignedNearStorageJob.assignWorkerAndDataSources(selectedWorker, scanReplicaIds);
- assignments.add(instanceJob);
+
+
+
+ // AssignedJob instanceJob = unassignedNearStorageJob.assignWorkerAndDataSources(
+ // selectedWorker, new ListDataIds(scanReplicaIds)
+ // );
+ // assignments.add(instanceJob);
}
return assignments;
}
- default List offerNonScanDatasourceJob(WorkerSelector workerSelector, UnassignedLeafJob leafJob) {
+ default List offerRemoteStorageJob(WorkerSelector workerSelector, UnassignedLeafJob leafJob) {
return null;
}
@@ -91,4 +105,29 @@ default List offerExchangeJob(
return null;
}
+ /** offerExchangeJob */
+ default List offerGatherJob(
+ UnassignedGatherJob gatherJob, ListMultimap inputJobs) {
+ PlanFragmentId childFragmentId = gatherJob.child(0).getFragment().getFragmentId();
+ List childFragmentInstances = inputJobs.get(childFragmentId);
+
+ // if someone backend execute lots of instance, it will be more likely selected as the gather worker
+ AssignedJob randomChildInstance = childFragmentInstances.get(
+ (int) (Math.random() * childFragmentInstances.size())
+ );
+
+ ExchangeNode exchangeNode = gatherJob.getExchangeNode();
+
+ IdentityHashMap exchangeInputs = new IdentityHashMap<>();
+ exchangeInputs.put(exchangeNode, new AllInputs(childFragmentInstances.size()));
+
+ IdentityHashMap scanInputs = new IdentityHashMap<>();
+
+ Worker randomChildFragmentWorker = randomChildInstance.getAssignedWorker();
+ return ImmutableList.of(
+ new AssignedJobImpl(0, gatherJob, randomChildFragmentWorker,
+ scanInputs, exchangeInputs
+ )
+ );
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java
index 1c67e34d3c4b016..d5e08df96295ac6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java
@@ -18,9 +18,6 @@
package org.apache.doris.nereids.worker.job;
import org.apache.doris.nereids.worker.Worker;
-import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
-
-import java.util.List;
/**
* AssignedJob.
@@ -32,6 +29,4 @@ public interface AssignedJob {
UnassignedJob unassignedJob();
Worker getAssignedWorker();
-
- List getAssignedDataSourceIds();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java
index bf1da586dd1452e..1302eb260b225b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java
@@ -18,23 +18,29 @@
package org.apache.doris.nereids.worker.job;
import org.apache.doris.nereids.worker.Worker;
-import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.ScanNode;
-import java.util.List;
+import java.util.IdentityHashMap;
+import java.util.Objects;
/** AssignedJobImpl */
public class AssignedJobImpl implements AssignedJob {
private final int indexInUnassignedJob;
private final UnassignedJob unassignedJob;
private final Worker worker;
- private final List dataSourceIds;
+ private final IdentityHashMap scanInputs;
+ private final IdentityHashMap exchangeInputs;
public AssignedJobImpl(
- int indexInUnassignedJob, UnassignedJob unassignedJob, Worker worker, List dataSourceIds) {
+ int indexInUnassignedJob, UnassignedJob unassignedJob, Worker worker,
+ IdentityHashMap scanInputs,
+ IdentityHashMap exchangeInputs) {
this.indexInUnassignedJob = indexInUnassignedJob;
- this.unassignedJob = unassignedJob;
+ this.unassignedJob = Objects.requireNonNull(unassignedJob, "unassignedJob can not be null");
this.worker = worker;
- this.dataSourceIds = dataSourceIds;
+ this.scanInputs = Objects.requireNonNull(scanInputs, "scanInputs can not be null");
+ this.exchangeInputs = Objects.requireNonNull(exchangeInputs, "exchangeInputs can not be null");
}
@Override
@@ -52,8 +58,11 @@ public Worker getAssignedWorker() {
return worker;
}
- @Override
- public List getAssignedDataSourceIds() {
- return dataSourceIds;
+ public IdentityHashMap getScanInputs() {
+ return scanInputs;
+ }
+
+ public IdentityHashMap getExchangeInputs() {
+ return exchangeInputs;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataIds.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataIds.java
new file mode 100644
index 000000000000000..204361ac092c295
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataIds.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.worker.job;
+
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+
+import java.util.Iterator;
+
+public interface DataIds {
+ Iterator toDataIdIterator();
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataLocation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataLocation.java
index fc86935f05fb9dc..df5059d0dcb1ce1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataLocation.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DataLocation.java
@@ -22,7 +22,7 @@
/** DataLocation */
public interface DataLocation {
- DataId id();
+ DataId dataId();
String location();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ExchangeInputs.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ExchangeInputs.java
index 29787240e626bc9..00182a2d8d3460d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ExchangeInputs.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ExchangeInputs.java
@@ -17,14 +17,17 @@
package org.apache.doris.nereids.worker.job;
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.ExchangeSourceId;
+
import org.apache.commons.collections.iterators.SingletonIterator;
import java.util.Iterator;
-import java.util.stream.IntStream;
+import java.util.stream.LongStream;
/** InputIndexes */
-public interface ExchangeInputs {
- Iterator inputIndexes();
+public interface ExchangeInputs extends DataIds {
+ Iterator toDataIdIterator();
/**
* AllInputs.
@@ -38,8 +41,11 @@ public AllInputs(int inputNum) {
}
@Override
- public Iterator inputIndexes() {
- return IntStream.range(0, inputNum).iterator();
+ public Iterator toDataIdIterator() {
+ return LongStream.range(0, inputNum)
+ .mapToObj(ExchangeSourceId::new)
+ .map(DataId.class::cast)
+ .iterator();
}
}
@@ -48,15 +54,15 @@ public Iterator inputIndexes() {
* use for bucket shuffle join and colocate join
*/
class SingleExchangeInput implements ExchangeInputs {
- private final int input;
+ private final long input;
- public SingleExchangeInput(int input) {
+ public SingleExchangeInput(long input) {
this.input = input;
}
@Override
- public Iterator inputIndexes() {
- return new SingletonIterator(input);
+ public Iterator toDataIdIterator() {
+ return new SingletonIterator(new ExchangeSourceId(input));
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ListDataIds.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ListDataIds.java
new file mode 100644
index 000000000000000..16c68f638a36dd8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ListDataIds.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.worker.job;
+
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/** ListDataIds */
+public class ListDataIds implements DataIds {
+ private List sourceIds;
+
+ public ListDataIds(List sourceIds) {
+ this.sourceIds = Utils.fastToImmutableList(
+ Objects.requireNonNull(sourceIds, "sourceIds can not be null")
+ );
+ }
+
+ @Override
+ public Iterator toDataIdIterator() {
+ return sourceIds.iterator();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherJob.java
index 1b91d2076812d01..e31c2f5330c442b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherJob.java
@@ -17,14 +17,11 @@
package org.apache.doris.nereids.worker.job;
-import org.apache.doris.nereids.worker.Worker;
-import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
import com.google.common.collect.ImmutableList;
-import java.util.List;
import java.util.Objects;
/** UnassignedGatherJob */
@@ -40,9 +37,4 @@ public UnassignedGatherJob(PlanFragment fragment, ExchangeNode exchangeNode, Una
public ExchangeNode getExchangeNode() {
return exchangeNode;
}
-
- @Override
- public AssignedJob assignWorkerAndDataSources(Worker worker, List dataSourceIds) {
- return new AssignedJobImpl(assignedJobNum++, this, worker, dataSourceIds);
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java
index ddce6b1647fcde4..87abfdf9217eec1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java
@@ -19,10 +19,11 @@
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.worker.Worker;
-import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
+import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.ScanNode;
-import java.util.List;
+import java.util.IdentityHashMap;
/**
* WorkerJob.
@@ -31,12 +32,16 @@
public interface UnassignedJob extends TreeNode {
default int computeDegreeOfParallelism() {
- return 1;
+ return getFragment().getParallelExecNum();
}
PlanFragment getFragment();
// generate an instance job
// e.g. build an instance job by a backends and the replica ids it contains
- AssignedJob assignWorkerAndDataSources(Worker worker, List dataSourceIds);
+ default AssignedJob assignWorkerAndDataSources(
+ Worker worker, IdentityHashMap scanInputs,
+ IdentityHashMap exchangeInputs) {
+ return new AssignedJobImpl(0, this, worker, scanInputs, exchangeInputs);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java
index 1a832349311af51..1b185094b6bd871 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java
@@ -40,7 +40,7 @@ public class UnassignedJobBuilder {
*/
public static FragmentIdMapping buildJobs(FragmentIdMapping fragments) {
// build from leaf to parent
- FragmentIdMapping unassignedJobs = new FragmentIdMapping<>(fragments.size());
+ FragmentIdMapping unassignedJobs = new FragmentIdMapping<>();
for (Entry kv : fragments.entrySet()) {
PlanFragmentId fragmentId = kv.getKey();
PlanFragment fragment = kv.getValue();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java
index 58aae20f617306f..641472f15fe9aae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java
@@ -35,10 +35,25 @@ class TabletId implements DataId {
public final long id;
}
+ /** ExchangeSourceId */
+ @lombok.Data
+ @lombok.AllArgsConstructor
+ class ExchangeSourceId implements DataId {
+ public final long id;
+ }
+
/** ReplicaId */
@lombok.Data
@lombok.AllArgsConstructor
class ReplicaId implements DataId {
+ public final long tabletId;
+ public final String host;
+ public final int port;
+ }
+
+ @lombok.Data
+ @lombok.AllArgsConstructor
+ class BackendId implements DataId {
public final long id;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java
index 5788414d6a2cb5d..a95c0c1e504d268 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java
@@ -17,20 +17,11 @@
package org.apache.doris.nereids.worker.job;
-import org.apache.doris.nereids.worker.Worker;
-import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
import org.apache.doris.planner.PlanFragment;
-import java.util.List;
-
/** UnassignedQueryConstantJob */
public class UnassignedQueryConstantJob extends AbstractUnassignedJob implements UnassignedLeafJob {
public UnassignedQueryConstantJob(PlanFragment fragment) {
super(fragment);
}
-
- @Override
- public AssignedJob assignWorkerAndDataSources(Worker worker, List dataSourceIds) {
- return null;
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java
index 2e66b4ccc237b66..58221c3deb4c08b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java
@@ -19,31 +19,32 @@
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.nereids.worker.DataTopology;
-import org.apache.doris.nereids.worker.Worker;
+import org.apache.doris.nereids.worker.TabletTopology;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
+import com.google.common.base.Suppliers;
+
import java.util.List;
import java.util.Objects;
+import java.util.function.Supplier;
public class UnassignedScanNativeTableJob extends AbstractUnassignedJob implements UnassignedNearStorageJob {
private final List olapScanNodes;
private int assignedJobNum;
+ private final Supplier dataTopologyCache;
public UnassignedScanNativeTableJob(
PlanFragment fragment, List olapScanNodes, List inputJobs) {
super(fragment, inputJobs);
this.olapScanNodes = Utils.fastToImmutableList(
Objects.requireNonNull(olapScanNodes, "olapScanNodes can not be null"));
- }
- @Override
- public DataTopology usedDataTopology() {
- return null;
+ this.dataTopologyCache = Suppliers.memoize(() -> new TabletTopology(olapScanNodes));
}
@Override
- public AssignedJob assignWorkerAndDataSources(Worker worker, List dataSourceIds) {
- return new AssignedJobImpl(assignedJobNum++, this, worker, dataSourceIds);
+ public DataTopology usedDataTopology() {
+ return dataTopologyCache.get();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java
index 54cb3d722df377d..8e752c2506570d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java
@@ -18,8 +18,6 @@
package org.apache.doris.nereids.worker.job;
import org.apache.doris.nereids.util.Utils;
-import org.apache.doris.nereids.worker.Worker;
-import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ScanNode;
@@ -37,9 +35,4 @@ public UnassignedScanRemoteTableJob(
this.scanNodes = Utils.fastToImmutableList(
Objects.requireNonNull(scanNodes, "scanNodes can not be null"));
}
-
- @Override
- public AssignedJob assignWorkerAndDataSources(Worker worker, List dataSourceIds) {
- return new AssignedJobImpl(assignedJobNum++, this, worker, dataSourceIds);
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java
index 43368ed6d437a09..7b0babb346b13d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java
@@ -18,8 +18,6 @@
package org.apache.doris.nereids.worker.job;
import org.apache.doris.nereids.util.Utils;
-import org.apache.doris.nereids.worker.Worker;
-import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
@@ -40,9 +38,4 @@ public UnassignedShuffleJob(PlanFragment fragment, List exchangeNo
public List getExchangeNodes() {
return exchangeNodes;
}
-
- @Override
- public AssignedJob assignWorkerAndDataSources(Worker worker, List dataSourceIds) {
- return new AssignedJobImpl(assignedJobNum++, this, worker, dataSourceIds);
- }
}