Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ZeRO with auto parallel #9288

Merged
merged 10 commits into from
Nov 4, 2022
2 changes: 1 addition & 1 deletion docs/source/auto_parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ Configuration API for auto parallelism
enable_auto_parallel_ignore_user_sbp_config
set_auto_parallel_computation_cost_ratio
set_auto_parallel_wait_time
enable_auto_parallel_mainstem_algo
enable_auto_parallel_trunk_algo
enable_auto_parallel_sbp_collector

4 changes: 2 additions & 2 deletions oneflow/core/auto_parallel/sbp_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ void SbpCollector::ProxySbpCandidate(const OpGraph& op_graph,
// clip this edge if it no longer carries any blob
// We don't clip edges before since we have transfer cost
// Now we clip edges, which makes the topology simpler
if (edge_found->EmptyLbi() && edge_found->wait_time_ <= 0.0 && edge_found->wait_time_ > -0.5
&& sbp_graph.transfer_cost_ <= 0.0) {
if (edge_found->EmptyLbi() && edge_found->wait_time_ <= 0.0
&& edge_found->wait_time_ > -0.5) {
sbp_graph.ClipEdge(edge_found);
}
}
Expand Down
17 changes: 1 addition & 16 deletions oneflow/core/auto_parallel/sbp_constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,21 +264,7 @@ Maybe<void> SbpConstructor::InitCopyCost(const OpGraph& op_graph) {
}
// Find all those cases with wait time
// Do not skip edges carrying no lbi
sbp_node_consumer->InitializeCopyCost(false, use_sbp_collector_);
for (auto* sbp_edge : sbp_node_consumer->edges_in_) {
// skip it if proxy
if (!sbp_edge->start_node_->op_node_) { continue; }
// Reset Wait time
for (int32_t i = 0; i < sbp_edge->cost_.size(); ++i) {
for (int32_t j = 0; j < sbp_edge->cost_[i].size(); ++j) {
// If transferring between devices, we need to add wait time.
if (sbp_edge->cost_[i][j] > 0.0) { sbp_edge->cost_[i][j] = sbp_edge->wait_time_; }
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是因为这里不做 edge 的遍历,所以可以变快?

这样改完后,逻辑还等价不?

Copy link
Contributor Author

@Yipeng1994 Yipeng1994 Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里我测了一下,时间基本是一样的。逻辑都是等价的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Re-compute the costs, skip edges carrying no lbi
sbp_node_consumer->InitializeCopyCost(true, use_sbp_collector_);
sbp_node_consumer->InitializeCopyCost(use_sbp_collector_);
});
return Maybe<void>::Ok();
}
Expand Down Expand Up @@ -395,7 +381,6 @@ Maybe<HashMap<const OpNode*, HashSet<std::string>>> SbpConstructor::GetMutableOp
void SbpConstructor::PrintSBPGraphDebugInfo() {
// sbp constructor information
std::cout << "cost_ratio_:" << cost_ratio_ << std::endl;
std::cout << "transfer_cost_:" << sbp_graph_.transfer_cost_ << std::endl;
std::cout << "wait_time_:" << sbp_graph_.wait_time_ << std::endl;
std::cout << "use_sbp_collector_" << use_sbp_collector_ << std::endl;
// test debug
Expand Down
3 changes: 1 addition & 2 deletions oneflow/core/auto_parallel/sbp_constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ class SbpConstructor final {
SbpConstructor() = delete;
SbpConstructor(const OpGraph& op_graph, Job* job)
: cost_ratio_(job->job_conf().auto_parallel_computation_cost_ratio()),
enable_trunk_algo_(job->job_conf().enable_auto_parallel_mainstem_algo()),
enable_trunk_algo_(job->job_conf().enable_auto_parallel_trunk_algo()),
use_sbp_collector_(!Singleton<ResourceDesc, ForSession>::Get()
->resource()
.disable_group_boxing_by_dst_parallel()
&& job->job_conf().enable_auto_parallel_sbp_collector()),
op_graph_(&op_graph) {
sbp_graph_.SetWaitTime(job->job_conf().auto_parallel_wait_time());
sbp_graph_.SetTransferCost(job->job_conf().auto_parallel_transfer_cost());
CHECK_JUST(Init(op_graph, job));
}
~SbpConstructor() = default;
Expand Down
7 changes: 3 additions & 4 deletions oneflow/core/auto_parallel/sbp_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ double SbpEdge::GetMaxCost() const {

// Assemble copy cost

void SbpEdge::InitializeCopyCost(const std::string& ibn, bool compute_cost,
bool use_sbp_collector) {
void SbpEdge::InitializeCopyCost(const std::string& ibn, bool use_sbp_collector) {
// In this part, we assemble the cost from nodes to nodes.
if (start_node_->op_node_ && end_node_->op_node_) {
OpNode* consumer = end_node_->op_node_;
Expand All @@ -235,7 +234,7 @@ void SbpEdge::InitializeCopyCost(const std::string& ibn, bool compute_cost,
const LogicalBlobId& lbi = consumer->op().BnInOp2Lbi(ibn);

// Check whether lbi is transferred by this edge
if (use_sbp_collector && compute_cost && !SearchLbi(lbi)) { return; }
if (use_sbp_collector && !SearchLbi(lbi)) { return; }

OpNode* producer = start_node_->op_node_;
const std::string& producer_lbn = *CHECK_JUST(producer->op().obn4lbi(lbi));
Expand All @@ -250,7 +249,7 @@ void SbpEdge::InitializeCopyCost(const std::string& ibn, bool compute_cost,
const std::string& obn = *CHECK_JUST(producer->op().obn4lbi(lbi));
// If we are deciding whether we need the wait time, then make require_same_sbp true.
// B->S cause cudaEventSynchronize in current implementation.
bool require_same_sbp = (!compute_cost) || RequireSameSbp(consumer, ibn);
bool require_same_sbp = RequireSameSbp(consumer, ibn);
int32_t consumer_sbp_size = end_node_->sbp_sig_list_.size();
LazyMode::Guard enable_lazy_mode(true);

Expand Down
4 changes: 1 addition & 3 deletions oneflow/core/auto_parallel/sbp_edge.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ class SbpEdge final {
double GetMaxCost() const;

// Assemble copy cost
// compute_cost = true: It is computing cost
// compute_cost = false: It is deciding whether this edge needs the wait time.
void InitializeCopyCost(const std::string& ibn, bool compute_cost, bool use_sbp_collector);
void InitializeCopyCost(const std::string& ibn, bool use_sbp_collector);

// find the cut ratio
// (#c>GetValidMaxCopyCost() in Cost)/(#c in Cost)
Expand Down
12 changes: 3 additions & 9 deletions oneflow/core/auto_parallel/sbp_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,7 @@ void SbpGraph::FindTrunk(int32_t max_min_layer,
for (SbpNode* this_node : node_list_) { this_node->RaiseConsumerNum(op_name2sbp_node); }
// Reduce the wait time for tributaries
for (SbpNode* this_node : node_list_) {
this_node->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time_,
transfer_cost_);
this_node->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time_);
}

// Reduce the wait time for trunk from the end to the begin
Expand Down Expand Up @@ -787,11 +786,11 @@ void SbpGraph::FindTrunk(int32_t max_min_layer,
// This code maintains ( acc_tributary_cost + used_tributary_cost )
if (acc_tributary_cost > 0.0) {
if (acc_tributary_cost > wait_time_) {
curr_wait_time = transfer_cost_;
curr_wait_time = 0.0;
acc_tributary_cost -= wait_time_;
used_tributary_cost += wait_time_;
} else {
curr_wait_time = transfer_cost_ + wait_time_ - acc_tributary_cost;
curr_wait_time = wait_time_ - acc_tributary_cost;
used_tributary_cost += acc_tributary_cost;
acc_tributary_cost = 0.0;
}
Expand All @@ -804,12 +803,7 @@ void SbpGraph::FindTrunk(int32_t max_min_layer,
}

// Set wait time

void SbpGraph::SetWaitTime(double wait_time) { wait_time_ = wait_time; }

// Set transfer cost

void SbpGraph::SetTransferCost(double transfer_cost) { transfer_cost_ = transfer_cost; }

} // namespace auto_parallel
} // namespace oneflow
5 changes: 0 additions & 5 deletions oneflow/core/auto_parallel/sbp_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ class SbpGraph final {
// Set wait time
void SetWaitTime(double wait_time);

// Set transfer cost
void SetTransferCost(double transfer_cost);

private:
friend class SbpCollector;
friend class SbpConstructor;
Expand All @@ -106,8 +103,6 @@ class SbpGraph final {
int32_t threshold_ = 100;
// Overlayable wait time for copy cost, which occurs before communication between devices.
double wait_time_ = 16500.0;
// Uncovered wait time for copy cost, which is already set up somewhere else
double transfer_cost_ = 0.0;

// Remove a node from the node list
void RemoveFromNodeList(SbpNode* this_node);
Expand Down
21 changes: 12 additions & 9 deletions oneflow/core/auto_parallel/sbp_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ void SbpNode::RaiseConsumerNum(const HashMap<std::string, SbpNode*>& op_name2sbp
void SbpNode::SpreadAvailWaitTime(const std::vector<double>& trunk_cost,
const std::vector<double>& acc_trunk_cost,
const HashMap<std::string, SbpNode*>& op_name2sbp_node,
double wait_time, double transfer_cost) {
double wait_time) {
// skip the proxy nodes and the sources
if (min_layer_ <= 0) { return; }
// Have not finished spreading for consumers or downstream nodes or already visited.
Expand Down Expand Up @@ -577,15 +577,13 @@ void SbpNode::SpreadAvailWaitTime(const std::vector<double>& trunk_cost,
// (1) P->S0->S0->S0->B
// (2) p->B->B->B->B
// We would use (2) when the tensor is relatively tiny.
this_edge->wait_time_ += transfer_cost;
// Do not inherit trunk cost for nodes on the trunk
if (!producer->on_trunk_) {
// Inherit the minimal of the trunk cost from consumers
producer->DropAvailWaitTime(curr_trunk_cost);
}
producer->counter_--;
producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time,
transfer_cost);
producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time);
}
// Put the rest the trunk cost in the upstream nodes.
for (const auto& ctrl_in_op_name : op_node_->op().op_conf().ctrl_in_op_name()) {
Expand All @@ -601,8 +599,7 @@ void SbpNode::SpreadAvailWaitTime(const std::vector<double>& trunk_cost,
producer->DropAvailWaitTime(curr_trunk_cost);
}
producer->counter_--;
producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time,
transfer_cost);
producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time);
}
}
// Set counter_ to be -1, do not visit it again.
Expand All @@ -619,18 +616,24 @@ void SbpNode::DropAvailWaitTime(double curr_trunk_cost) {

// Assemble copy cost for all the incoming edges

void SbpNode::InitializeCopyCost(bool compute_cost, bool use_sbp_collector) {
void SbpNode::InitializeCopyCost(bool use_sbp_collector) {
for (SbpEdge* this_edge : edges_in_) {
const auto* sbp_node_producer = this_edge->start_node_;
OpNode* producer = sbp_node_producer->op_node_;

// skip it if proxy
if (use_sbp_collector && !producer) { continue; }

// look through input blobs
for (const std::string& ibn : op_node_->op().input_bns()) {
if (producer->op().op_name() == op_node_->SrcNode4Ibn(ibn).op().op_name()) {
this_edge->InitializeCopyCost(ibn, compute_cost, use_sbp_collector);
this_edge->InitializeCopyCost(ibn, use_sbp_collector);
}
}
// Add Wait time
for (auto& cost_row : this_edge->cost_) {
for (auto& cost_value : cost_row) {
// If transferring between devices, we need to add wait time.
if (cost_value > 0.0) { cost_value += this_edge->wait_time_; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来最主要的增加的逻辑是 Add Wait time 这里?

最主要的删除逻辑是删掉了很多 compute_cost 的开关

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是原有的,来自于 https://github.com/Oneflow-Inc/oneflow/pull/9288/files#diff-40b436fe2eff96c43760f1c9abc5c2a1518c696046d8df8279b5bb6d70b6beaaL268-L278

只不过由赋值改为了增加。当然这些实现在最终结果上都是一样的

}
}
}
Expand Down
6 changes: 3 additions & 3 deletions oneflow/core/auto_parallel/sbp_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ class SbpNode final {
// Compute the minimal available wait time for producers or upstream nodes
void SpreadAvailWaitTime(const std::vector<double>& trunk_cost,
const std::vector<double>& acc_trunk_cost,
const HashMap<std::string, SbpNode*>& op_name2sbp_node, double wait_time,
double transfer_cost);
const HashMap<std::string, SbpNode*>& op_name2sbp_node,
double wait_time);
// Reduce and set the wait time for op in the trunk
void SetTrunkWaitTime(double trunk_wait_time);

// Assemble copy cost for all the incoming edges
void InitializeCopyCost(bool compute_cost, bool use_sbp_collector);
void InitializeCopyCost(bool use_sbp_collector);

private:
friend class SbpEdge;
Expand Down
36 changes: 17 additions & 19 deletions oneflow/core/graph/straighten_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TopoStruct {
TaskNode* node = nullptr;
int32_t min_layer = -1;
int32_t tributary_layer = -1;
bool on_mainstem = false;
bool on_trunk = false;
int32_t counter = 0;
int32_t min_distance2transfer = -1;
int64_t memory_increment = -1;
Expand All @@ -55,7 +55,7 @@ class TopoStruct {

void SpreadTributaryLayer(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct);

void SpreadMainstem(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct);
void SpreadTrunk(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct);

// The minimum computation distance from the beginning of this op to the next transfer
int32_t GetMinDistance2Transfer(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct);
Expand Down Expand Up @@ -180,7 +180,7 @@ void TopoStruct::DropTributaryLayer(int32_t upper_bound) {
void TopoStruct::SpreadTributaryLayer(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct) {
if (counter || min_layer <= 0) { return; }
int32_t producer_max_lay = 0;
if (on_mainstem) {
if (on_trunk) {
producer_max_lay = min_layer - 1;
} else {
// On a tributary, the operator could be run later.
Expand All @@ -196,15 +196,15 @@ void TopoStruct::SpreadTributaryLayer(HashMap<TaskNode*, TopoStruct>* task_node2
counter--;
}

// Judge if this node is on the mainstem
// Judge if this node is on the trunk
// If so, judge it for its producer/upstream nodes
void TopoStruct::SpreadMainstem(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct) {
void TopoStruct::SpreadTrunk(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct) {
// Skip it if this node is already judged.
if (on_mainstem) { return; }
if (on_trunk) { return; }
CHECK_GE(min_layer, 0) << "TopoStruct not initialized!";
on_mainstem = true;
// If I am in the mainstem, then all the children with (min_layer >= my layer id - 1) would be
// considered as in the mainstem
on_trunk = true;
// If I am in the trunk, then all the children with (min_layer >= my layer id - 1) would be
// considered as in the trunk
node->ForEachNodeOnInEdge([&](TaskNode* in) {
auto& topo_struct_in = task_node2topo_struct->at(in);
if (topo_struct_in.min_layer == min_layer - 1) {
Expand Down Expand Up @@ -281,25 +281,23 @@ int64_t TopoStruct::GetDecidingParameter(int32_t i) const {
return 0;
}

// Find the mainstem of the task graph, then reduce the wait time for tributaries
void FindMainstem(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct) {
// Find the trunk of the task graph, then reduce the wait time for tributaries
void FindTrunk(HashMap<TaskNode*, TopoStruct>* task_node2topo_struct) {
// Find the maximum layer number
int32_t max_min_layer = -1;
for (const auto& pair : *task_node2topo_struct) {
if (max_min_layer < pair.second.min_layer) { max_min_layer = pair.second.min_layer; }
}
// All the nodes with min_layer>=mainstem_end_id would be considered as mainstem nodes
// The last 5 layers would be considered as in mainstem anyway.
int32_t mainstem_end_id = max_min_layer - 4;
// All the nodes with min_layer>=trunk_end_id would be considered as trunk nodes
// The last 5 layers would be considered as in trunk anyway.
int32_t trunk_end_id = max_min_layer - 4;
for (auto& pair : *task_node2topo_struct) {
auto& topo_struct = pair.second;
// Initialize the counter and Tributary Layer
topo_struct.counter = pair.first->out_edges().size();
topo_struct.tributary_layer = max_min_layer;
// Find out all the nodes on the mainstem.
if (topo_struct.min_layer >= mainstem_end_id) {
topo_struct.SpreadMainstem(task_node2topo_struct);
}
// Find out all the nodes on the trunk.
if (topo_struct.min_layer >= trunk_end_id) { topo_struct.SpreadTrunk(task_node2topo_struct); }
}

for (auto& pair : *task_node2topo_struct) {
Expand Down Expand Up @@ -384,7 +382,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector<TaskNode*>* ordered_task
});

// Generate other parameters in the topological data structure
FindMainstem(&task_node2topo_struct);
FindTrunk(&task_node2topo_struct);

// Decide which node should run first
InitDecideParameters(GlobalJobDesc().job_conf().straighten_algorithm_tag_in_task_graph());
Expand Down
7 changes: 3 additions & 4 deletions oneflow/core/job/job_conf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,9 @@ message JobConfigProto {
optional bool enable_auto_parallel = 700 [default = false];
optional double auto_parallel_computation_cost_ratio = 701 [default = 0.05];
optional double auto_parallel_wait_time = 702 [default = 1.65e4];
optional double auto_parallel_transfer_cost = 703 [default = 0.0];
optional bool enable_auto_parallel_mainstem_algo = 704 [default = true];
optional bool enable_auto_parallel_sbp_collector = 705 [default = false];
optional bool enable_auto_parallel_ignore_user_sbp_config = 706 [default = false];
optional bool enable_auto_parallel_trunk_algo = 703 [default = true];
optional bool enable_auto_parallel_sbp_collector = 704 [default = false];
optional bool enable_auto_parallel_ignore_user_sbp_config = 705 [default = false];

optional StraightenAlgorithmTag straighten_algorithm_tag_in_task_graph = 800 [default = kCompressMemory];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ class OptimizerPlacementOptimizationPass final : public JobPass {
&& ctx->job_desc().job_conf().optimizer_placement_optimization_mode() != "none")) {
return Maybe<void>::Ok();
}
if (job->job_conf().enable_auto_parallel()) {
if (job->job_conf().enable_auto_parallel()
&& job->job_conf().enable_auto_parallel_ignore_user_sbp_config()) {
LOG(WARNING) << "ZeRO optimization will be ignored when enabling AutoParallel";
return Maybe<void>::Ok();
}
Expand Down
17 changes: 3 additions & 14 deletions python/oneflow/nn/graph/graph_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,22 +342,11 @@ def set_auto_parallel_wait_time(self, cost):
"""
self.proto.auto_parallel_wait_time = cost

def set_auto_parallel_transfer_cost(self, cost):
def enable_auto_parallel_trunk_algo(self, mode: bool = True):
"""
Set transfer cost for auto-parallel algorithm.

transfer cost: An auto-parallel parameter. Describe the fixed extra time it will take when
communication between devices occurs. It will be added to the copy cost and can not be reduced.
Default value: 0
Using a positive number such as 1.65e8 would reduce the frequency of data transfer.
Find the trunk of the SBP graph, then reduce the wait time for tributaries.
"""
self.proto.auto_parallel_transfer_cost = cost

def enable_auto_parallel_mainstem_algo(self, mode: bool = True):
"""
Find the mainstem of the SBP graph, then reduce the wait time for tributaries.
"""
self.proto.enable_auto_parallel_mainstem_algo = mode
self.proto.enable_auto_parallel_trunk_algo = mode

def enable_auto_parallel_sbp_collector(self, mode: bool = True):
"""
Expand Down
Loading