Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed May 9, 2024
1 parent 9c5c874 commit 40e54c8
Show file tree
Hide file tree
Showing 24 changed files with 410 additions and 91 deletions.
6 changes: 6 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,12 @@ under the License.
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>3.0.7</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<!-- for huawei obs sdk -->
Expand Down
9 changes: 8 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/common/Id.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

package org.apache.doris.common;

import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;

/**
* Integer ids that cannot accidentally be compared with ints.
*/
public class Id<IdType extends Id<IdType>> {
public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> {
protected final int id;

public Id(int id) {
Expand Down Expand Up @@ -62,4 +64,9 @@ public ArrayList<IdType> asList() {
public String toString() {
return Integer.toString(id);
}

@Override
public int compareTo(@NotNull Id<IdType> idTypeId) {
return id - idTypeId.id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
Expand Down Expand Up @@ -366,7 +365,7 @@ private void distribute(LogicalPlanAdapter logicalPlanAdapter) throws UserExcept
return;
}
DistributePlanner distributePlanner = new DistributePlanner(fragments);
distributePlanner.plan();
// distributePlanner.plan();
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Map<PlanFragmentId, DistributedPlan> plan() {
private Map<PlanFragmentId, DistributedPlan> buildDistributePlans(
Map<PlanFragmentId, UnassignedJob> idToUnassignedJobs,
ListMultimap<PlanFragmentId, AssignedJob> idToAssignedJobs) {
FragmentIdMapping<DistributedPlan> idToDistributedPlans = new FragmentIdMapping<>(idToFragments.size());
FragmentIdMapping<DistributedPlan> idToDistributedPlans = new FragmentIdMapping<>();
for (Entry<PlanFragmentId, PlanFragment> kv : idToFragments.entrySet()) {
PlanFragmentId fragmentId = kv.getKey();
PlanFragment fragment = kv.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,33 @@

import com.google.common.collect.ImmutableList;

import java.util.LinkedHashMap;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* FragmentIdMapping:
* key: PlanFragmentId
* value: T
*
* NOTE: this map should order by PlanFragmentId asc
*/
public class FragmentIdMapping<T> extends LinkedHashMap<PlanFragmentId, T> {
public FragmentIdMapping(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
public class FragmentIdMapping<T> extends TreeMap<PlanFragmentId, T> {
public FragmentIdMapping() {
}

public FragmentIdMapping(int initialCapacity) {
super(initialCapacity);
}

public FragmentIdMapping() {
public FragmentIdMapping(Comparator<? super PlanFragmentId> comparator) {
super(comparator);
}

public FragmentIdMapping(Map<? extends PlanFragmentId, ? extends T> m) {
super(m);
}

public FragmentIdMapping(int initialCapacity, float loadFactor, boolean accessOrder) {
super(initialCapacity, loadFactor, accessOrder);
public FragmentIdMapping(SortedMap<PlanFragmentId, ? extends T> m) {
super(m);
}

public List<T> getByChildrenFragments(PlanFragment fragment) {
Expand All @@ -61,7 +61,7 @@ public List<T> getByChildrenFragments(PlanFragment fragment) {
}

public static FragmentIdMapping<PlanFragment> buildFragmentMapping(List<PlanFragment> fragments) {
FragmentIdMapping<PlanFragment> idToFragments = new FragmentIdMapping<>(fragments.size());
FragmentIdMapping<PlanFragment> idToFragments = new FragmentIdMapping<>();
for (PlanFragment fragment : fragments) {
idToFragments.put(fragment.getFragmentId(), fragment);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;

import org.apache.doris.system.Backend;

import com.google.common.collect.ImmutableList;
import org.jetbrains.annotations.NotNull;

import java.util.List;

/** BackendWorker */
public class BackendWorker implements Worker {
private final Backend backend;

public BackendWorker(Backend backend) {
this.backend = backend;
}

@Override
public String address() {
return backend.getAddress();
}

@Override
public boolean available() {
return backend.isQueryAvailable();
}

@Override
public Workload workload() {
return new Workload() {
@Override
public int compareTo(@NotNull Workload workload) {
return 0;
}
};
}

@Override
public WorkerGroup team() {
return new WorkerGroup() {
@Override
public List<Worker> filterWorkers(WorkerPool workerPool) {
return ImmutableList.of();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.NereidsException;
import org.apache.doris.nereids.worker.job.DataLocation;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.ReplicaId;
import org.apache.doris.system.Backend;

import com.google.common.collect.ImmutableMap;

import java.util.Objects;

public class ReplicaLocation implements DataLocation {
public final long backendId;
public final long tabletId;
public final String host;
public final int port;

public ReplicaLocation(long backendId, long tabletId, String host, int port) {
this.backendId = backendId;
this.tabletId = tabletId;
this.host = Objects.requireNonNull(host, "host can not be null");
this.port = port;
}

@Override
public DataId dataId() {
return new ReplicaId(backendId, host, port);
}

@Override
public String location() {
return host + ":" + port;
}

@Override
public Worker toWorker() {
try {
ImmutableMap<Long, Backend> backendsWithIdByCurrentCluster = Env.getCurrentSystemInfo()
.getBackendsWithIdByCurrentCluster();
Backend backend = backendsWithIdByCurrentCluster.get(backendId);
if (backend == null) {
throw new IllegalStateException("Backend " + backendId + " is not exist");
}
return new BackendWorker(backend);
} catch (Exception t) {
throw new NereidsException("Get worker failed: " + this + ". Cause: " + t, t);
}
}

@Override
public String toString() {
return "backendId=" + location() + ", tabletId=" + tabletId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Multimap<Worker, DataId> selectScanDatasource(UnassignedNearStorageJob un
for (Entry<DataId, List<DataLocation>> entry : tabletToReplicas.entrySet()) {
List<DataLocation> replicas = entry.getValue();
DataLocation selectedReplica = selectScanReplica(replicas);
workerToReplicas.put(selectedReplica.toWorker(), selectedReplica.id());
workerToReplicas.put(selectedReplica.toWorker(), selectedReplica.dataId());
}
return workerToReplicas;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;

import org.apache.doris.nereids.worker.job.DataLocation;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.TabletId;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.collect.ArrayListMultimap;

import java.util.List;
import java.util.Map;

/** TabletTopology */
public class TabletTopology implements DataTopology {
private Map<DataId, List<DataLocation>> topology;

public TabletTopology(List<OlapScanNode> olapScanNodes) {
ArrayListMultimap<DataId, DataLocation> topology = ArrayListMultimap.create();
for (OlapScanNode olapScanNode : olapScanNodes) {
List<TScanRangeLocations> tabletLocations = olapScanNode.getScanRangeLocations(0);
if (tabletLocations == null || tabletLocations.isEmpty()) {
continue;
}
for (TScanRangeLocations tabletLocation : tabletLocations) {
long tabletId = tabletLocation.getScanRange().getPaloScanRange().getTabletId();
List<TScanRangeLocation> replicaLocations = tabletLocation.getLocations();
for (TScanRangeLocation replicaLocation : replicaLocations) {
long backendId = replicaLocation.getBackendId();
TNetworkAddress server = replicaLocation.getServer();
topology.put(
new TabletId(tabletId),
new ReplicaLocation(backendId, tabletId, server.getHostname(), server.getPort())
);
}
}
}
this.topology = (Map) topology.asMap();
}

@Override
public Map<DataId, List<DataLocation>> dataToReplicas() {
return topology;
}
}
Loading

0 comments on commit 40e54c8

Please sign in to comment.