From 54771bc917aa1b7509e758b7d5c1344ce00e7246 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Sat, 22 Oct 2022 12:59:23 +0000 Subject: [PATCH 1/6] Enable ZeRO with auto parallel in the first setting and speed up --- oneflow/core/auto_parallel/sbp_constructor.cpp | 6 +++--- .../job_rewriter/optimizer_placement_optimization_pass.cpp | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/oneflow/core/auto_parallel/sbp_constructor.cpp b/oneflow/core/auto_parallel/sbp_constructor.cpp index 189410d8a57..d451680c2cb 100644 --- a/oneflow/core/auto_parallel/sbp_constructor.cpp +++ b/oneflow/core/auto_parallel/sbp_constructor.cpp @@ -264,7 +264,7 @@ Maybe SbpConstructor::InitCopyCost(const OpGraph& op_graph) { } // Find all those cases with wait time // Do not skip edges carrying no lbi - sbp_node_consumer->InitializeCopyCost(false, use_sbp_collector_); + sbp_node_consumer->InitializeCopyCost(true, use_sbp_collector_); for (auto* sbp_edge : sbp_node_consumer->edges_in_) { // skip it if proxy if (!sbp_edge->start_node_->op_node_) { continue; } @@ -272,13 +272,13 @@ Maybe SbpConstructor::InitCopyCost(const OpGraph& op_graph) { for (int32_t i = 0; i < sbp_edge->cost_.size(); ++i) { for (int32_t j = 0; j < sbp_edge->cost_[i].size(); ++j) { // If transferring between devices, we need to add wait time. - if (sbp_edge->cost_[i][j] > 0.0) { sbp_edge->cost_[i][j] = sbp_edge->wait_time_; } + if (sbp_edge->cost_[i][j] > 0.0) { sbp_edge->cost_[i][j] += sbp_edge->wait_time_; } } } } // Re-compute the costs, skip edges carrying no lbi - sbp_node_consumer->InitializeCopyCost(true, use_sbp_collector_); + // sbp_node_consumer->InitializeCopyCost(true, use_sbp_collector_); }); return Maybe::Ok(); } diff --git a/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp b/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp index 2c6e16a8bb8..bdc3d8d5686 100644 --- a/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp +++ b/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp @@ -521,7 +521,8 @@ class OptimizerPlacementOptimizationPass final : public JobPass { && ctx->job_desc().job_conf().optimizer_placement_optimization_mode() != "none")) { return Maybe::Ok(); } - if (job->job_conf().enable_auto_parallel()) { + if (job->job_conf().enable_auto_parallel() + && job->job_conf().enable_auto_parallel_ignore_user_sbp_config()) { LOG(WARNING) << "ZeRO optimization will be ignored when enabling AutoParallel"; return Maybe::Ok(); } From d7ab8c950a14a12e7dbd7bc2ebe385349d33a808 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 3 Nov 2022 13:45:30 +0800 Subject: [PATCH 2/6] Remove compute_cost parameter from Initialization of copy cost --- oneflow/core/auto_parallel/sbp_constructor.cpp | 5 +---- oneflow/core/auto_parallel/sbp_edge.cpp | 7 +++---- oneflow/core/auto_parallel/sbp_edge.h | 4 +--- oneflow/core/auto_parallel/sbp_node.cpp | 4 ++-- oneflow/core/auto_parallel/sbp_node.h | 2 +- 5 files changed, 8 insertions(+), 14 deletions(-) diff --git a/oneflow/core/auto_parallel/sbp_constructor.cpp b/oneflow/core/auto_parallel/sbp_constructor.cpp index d451680c2cb..1a8b4f63d01 100644 --- a/oneflow/core/auto_parallel/sbp_constructor.cpp +++ b/oneflow/core/auto_parallel/sbp_constructor.cpp @@ -264,7 +264,7 @@ Maybe SbpConstructor::InitCopyCost(const OpGraph& op_graph) { } // Find all those cases with wait time // Do not skip edges carrying no lbi - sbp_node_consumer->InitializeCopyCost(true, use_sbp_collector_); + sbp_node_consumer->InitializeCopyCost(use_sbp_collector_); for (auto* sbp_edge : sbp_node_consumer->edges_in_) { // skip it if proxy if (!sbp_edge->start_node_->op_node_) { continue; } @@ -276,9 +276,6 @@ Maybe SbpConstructor::InitCopyCost(const OpGraph& op_graph) { } } } - - // Re-compute the costs, skip edges carrying no lbi - // sbp_node_consumer->InitializeCopyCost(true, use_sbp_collector_); }); return Maybe::Ok(); } diff --git a/oneflow/core/auto_parallel/sbp_edge.cpp b/oneflow/core/auto_parallel/sbp_edge.cpp index c198ecbdd00..649cf3b1515 100644 --- a/oneflow/core/auto_parallel/sbp_edge.cpp +++ b/oneflow/core/auto_parallel/sbp_edge.cpp @@ -225,8 +225,7 @@ double SbpEdge::GetMaxCost() const { // Assemble copy cost -void SbpEdge::InitializeCopyCost(const std::string& ibn, bool compute_cost, - bool use_sbp_collector) { +void SbpEdge::InitializeCopyCost(const std::string& ibn, bool use_sbp_collector) { // In this part, we assemble the cost from nodes to nodes. if (start_node_->op_node_ && end_node_->op_node_) { OpNode* consumer = end_node_->op_node_; @@ -235,7 +234,7 @@ void SbpEdge::InitializeCopyCost(const std::string& ibn, bool compute_cost, const LogicalBlobId& lbi = consumer->op().BnInOp2Lbi(ibn); // Check whether lbi is transferred by this edge - if (use_sbp_collector && compute_cost && !SearchLbi(lbi)) { return; } + if (use_sbp_collector && !SearchLbi(lbi)) { return; } OpNode* producer = start_node_->op_node_; const std::string& producer_lbn = *CHECK_JUST(producer->op().obn4lbi(lbi)); @@ -250,7 +249,7 @@ void SbpEdge::InitializeCopyCost(const std::string& ibn, bool compute_cost, const std::string& obn = *CHECK_JUST(producer->op().obn4lbi(lbi)); // If we are deciding whether we need the wait time, then make require_same_sbp true. // B->S cause cudaEventSynchronize in current implementation. - bool require_same_sbp = (!compute_cost) || RequireSameSbp(consumer, ibn); + bool require_same_sbp = RequireSameSbp(consumer, ibn); int32_t consumer_sbp_size = end_node_->sbp_sig_list_.size(); LazyMode::Guard enable_lazy_mode(true); diff --git a/oneflow/core/auto_parallel/sbp_edge.h b/oneflow/core/auto_parallel/sbp_edge.h index 9066f1810ce..83ada89eb53 100644 --- a/oneflow/core/auto_parallel/sbp_edge.h +++ b/oneflow/core/auto_parallel/sbp_edge.h @@ -86,9 +86,7 @@ class SbpEdge final { double GetMaxCost() const; // Assemble copy cost - // compute_cost = true: It is computing cost - // compute_cost = false: It is deciding whether this edge needs the wait time. - void InitializeCopyCost(const std::string& ibn, bool compute_cost, bool use_sbp_collector); + void InitializeCopyCost(const std::string& ibn, bool use_sbp_collector); // find the cut ratio // (#c>GetValidMaxCopyCost() in Cost)/(#c in Cost) diff --git a/oneflow/core/auto_parallel/sbp_node.cpp b/oneflow/core/auto_parallel/sbp_node.cpp index d2a7e90cf1f..286eda47101 100644 --- a/oneflow/core/auto_parallel/sbp_node.cpp +++ b/oneflow/core/auto_parallel/sbp_node.cpp @@ -619,7 +619,7 @@ void SbpNode::DropAvailWaitTime(double curr_trunk_cost) { // Assemble copy cost for all the incoming edges -void SbpNode::InitializeCopyCost(bool compute_cost, bool use_sbp_collector) { +void SbpNode::InitializeCopyCost(bool use_sbp_collector) { for (SbpEdge* this_edge : edges_in_) { const auto* sbp_node_producer = this_edge->start_node_; OpNode* producer = sbp_node_producer->op_node_; @@ -630,7 +630,7 @@ void SbpNode::InitializeCopyCost(bool compute_cost, bool use_sbp_collector) { // look through input blobs for (const std::string& ibn : op_node_->op().input_bns()) { if (producer->op().op_name() == op_node_->SrcNode4Ibn(ibn).op().op_name()) { - this_edge->InitializeCopyCost(ibn, compute_cost, use_sbp_collector); + this_edge->InitializeCopyCost(ibn, use_sbp_collector); } } } diff --git a/oneflow/core/auto_parallel/sbp_node.h b/oneflow/core/auto_parallel/sbp_node.h index 42e0d0d33f4..5b1f29b6b99 100644 --- a/oneflow/core/auto_parallel/sbp_node.h +++ b/oneflow/core/auto_parallel/sbp_node.h @@ -125,7 +125,7 @@ class SbpNode final { void SetTrunkWaitTime(double trunk_wait_time); // Assemble copy cost for all the incoming edges - void InitializeCopyCost(bool compute_cost, bool use_sbp_collector); + void InitializeCopyCost(bool use_sbp_collector); private: friend class SbpEdge; From 021f30e87b22b31cbb49bd2ade12b895e43ad2db Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 3 Nov 2022 14:46:59 +0800 Subject: [PATCH 3/6] Move the addition of wait time into sbp_node --- oneflow/core/auto_parallel/sbp_constructor.cpp | 11 ----------- oneflow/core/auto_parallel/sbp_node.cpp | 8 +++++++- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/oneflow/core/auto_parallel/sbp_constructor.cpp b/oneflow/core/auto_parallel/sbp_constructor.cpp index 1a8b4f63d01..f98a2d3826a 100644 --- a/oneflow/core/auto_parallel/sbp_constructor.cpp +++ b/oneflow/core/auto_parallel/sbp_constructor.cpp @@ -265,17 +265,6 @@ Maybe SbpConstructor::InitCopyCost(const OpGraph& op_graph) { // Find all those cases with wait time // Do not skip edges carrying no lbi sbp_node_consumer->InitializeCopyCost(use_sbp_collector_); - for (auto* sbp_edge : sbp_node_consumer->edges_in_) { - // skip it if proxy - if (!sbp_edge->start_node_->op_node_) { continue; } - // Reset Wait time - for (int32_t i = 0; i < sbp_edge->cost_.size(); ++i) { - for (int32_t j = 0; j < sbp_edge->cost_[i].size(); ++j) { - // If transferring between devices, we need to add wait time. - if (sbp_edge->cost_[i][j] > 0.0) { sbp_edge->cost_[i][j] += sbp_edge->wait_time_; } - } - } - } }); return Maybe::Ok(); } diff --git a/oneflow/core/auto_parallel/sbp_node.cpp b/oneflow/core/auto_parallel/sbp_node.cpp index 286eda47101..4baaf4a5e2b 100644 --- a/oneflow/core/auto_parallel/sbp_node.cpp +++ b/oneflow/core/auto_parallel/sbp_node.cpp @@ -626,13 +626,19 @@ void SbpNode::InitializeCopyCost(bool use_sbp_collector) { // skip it if proxy if (use_sbp_collector && !producer) { continue; } - // look through input blobs for (const std::string& ibn : op_node_->op().input_bns()) { if (producer->op().op_name() == op_node_->SrcNode4Ibn(ibn).op().op_name()) { this_edge->InitializeCopyCost(ibn, use_sbp_collector); } } + // Add Wait time + for (auto& cost_row : this_edge->cost_) { + for (auto& cost_value : cost_row) { + // If transferring between devices, we need to add wait time. + if (cost_value > 0.0) { cost_value += this_edge->wait_time_; } + } + } } } From ad38ff1e37e447131e16dda969a8fa99c8ad8230 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 3 Nov 2022 17:36:34 +0800 Subject: [PATCH 4/6] Remove transfer cost since it is merged into the GetTransferCost() --- oneflow/core/auto_parallel/sbp_collector.cpp | 4 ++-- oneflow/core/auto_parallel/sbp_constructor.cpp | 1 - oneflow/core/auto_parallel/sbp_constructor.h | 1 - oneflow/core/auto_parallel/sbp_graph.cpp | 12 +++--------- oneflow/core/auto_parallel/sbp_graph.h | 5 ----- oneflow/core/auto_parallel/sbp_node.cpp | 9 +++------ oneflow/core/auto_parallel/sbp_node.h | 4 ++-- oneflow/core/job/job_conf.proto | 7 +++---- python/oneflow/nn/graph/graph_config.py | 11 ----------- 9 files changed, 13 insertions(+), 41 deletions(-) diff --git a/oneflow/core/auto_parallel/sbp_collector.cpp b/oneflow/core/auto_parallel/sbp_collector.cpp index 8b19bb87ffc..fb2169263cf 100644 --- a/oneflow/core/auto_parallel/sbp_collector.cpp +++ b/oneflow/core/auto_parallel/sbp_collector.cpp @@ -357,8 +357,8 @@ void SbpCollector::ProxySbpCandidate(const OpGraph& op_graph, // clip this edge if it no longer carries any blob // We don't clip edges before since we have transfer cost // Now we clip edges, which makes the topology simpler - if (edge_found->EmptyLbi() && edge_found->wait_time_ <= 0.0 && edge_found->wait_time_ > -0.5 - && sbp_graph.transfer_cost_ <= 0.0) { + if (edge_found->EmptyLbi() && edge_found->wait_time_ <= 0.0 + && edge_found->wait_time_ > -0.5) { sbp_graph.ClipEdge(edge_found); } } diff --git a/oneflow/core/auto_parallel/sbp_constructor.cpp b/oneflow/core/auto_parallel/sbp_constructor.cpp index f98a2d3826a..a3aa4f9d3ee 100644 --- a/oneflow/core/auto_parallel/sbp_constructor.cpp +++ b/oneflow/core/auto_parallel/sbp_constructor.cpp @@ -381,7 +381,6 @@ Maybe>> SbpConstructor::GetMutableOp void SbpConstructor::PrintSBPGraphDebugInfo() { // sbp constructor information std::cout << "cost_ratio_:" << cost_ratio_ << std::endl; - std::cout << "transfer_cost_:" << sbp_graph_.transfer_cost_ << std::endl; std::cout << "wait_time_:" << sbp_graph_.wait_time_ << std::endl; std::cout << "use_sbp_collector_" << use_sbp_collector_ << std::endl; // test debug diff --git a/oneflow/core/auto_parallel/sbp_constructor.h b/oneflow/core/auto_parallel/sbp_constructor.h index ffa70383c25..40c01ffba7f 100644 --- a/oneflow/core/auto_parallel/sbp_constructor.h +++ b/oneflow/core/auto_parallel/sbp_constructor.h @@ -45,7 +45,6 @@ class SbpConstructor final { && job->job_conf().enable_auto_parallel_sbp_collector()), op_graph_(&op_graph) { sbp_graph_.SetWaitTime(job->job_conf().auto_parallel_wait_time()); - sbp_graph_.SetTransferCost(job->job_conf().auto_parallel_transfer_cost()); CHECK_JUST(Init(op_graph, job)); } ~SbpConstructor() = default; diff --git a/oneflow/core/auto_parallel/sbp_graph.cpp b/oneflow/core/auto_parallel/sbp_graph.cpp index fd7fc5e1507..17352ff2294 100644 --- a/oneflow/core/auto_parallel/sbp_graph.cpp +++ b/oneflow/core/auto_parallel/sbp_graph.cpp @@ -758,8 +758,7 @@ void SbpGraph::FindTrunk(int32_t max_min_layer, for (SbpNode* this_node : node_list_) { this_node->RaiseConsumerNum(op_name2sbp_node); } // Reduce the wait time for tributaries for (SbpNode* this_node : node_list_) { - this_node->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time_, - transfer_cost_); + this_node->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time_); } // Reduce the wait time for trunk from the end to the begin @@ -787,11 +786,11 @@ void SbpGraph::FindTrunk(int32_t max_min_layer, // This code maintains ( acc_tributary_cost + used_tributary_cost ) if (acc_tributary_cost > 0.0) { if (acc_tributary_cost > wait_time_) { - curr_wait_time = transfer_cost_; + curr_wait_time = 0.0; acc_tributary_cost -= wait_time_; used_tributary_cost += wait_time_; } else { - curr_wait_time = transfer_cost_ + wait_time_ - acc_tributary_cost; + curr_wait_time = wait_time_ - acc_tributary_cost; used_tributary_cost += acc_tributary_cost; acc_tributary_cost = 0.0; } @@ -804,12 +803,7 @@ void SbpGraph::FindTrunk(int32_t max_min_layer, } // Set wait time - void SbpGraph::SetWaitTime(double wait_time) { wait_time_ = wait_time; } -// Set transfer cost - -void SbpGraph::SetTransferCost(double transfer_cost) { transfer_cost_ = transfer_cost; } - } // namespace auto_parallel } // namespace oneflow diff --git a/oneflow/core/auto_parallel/sbp_graph.h b/oneflow/core/auto_parallel/sbp_graph.h index db05a771c74..81c87a18f33 100644 --- a/oneflow/core/auto_parallel/sbp_graph.h +++ b/oneflow/core/auto_parallel/sbp_graph.h @@ -91,9 +91,6 @@ class SbpGraph final { // Set wait time void SetWaitTime(double wait_time); - // Set transfer cost - void SetTransferCost(double transfer_cost); - private: friend class SbpCollector; friend class SbpConstructor; @@ -106,8 +103,6 @@ class SbpGraph final { int32_t threshold_ = 100; // Overlayable wait time for copy cost, which occurs before communication between devices. double wait_time_ = 16500.0; - // Uncovered wait time for copy cost, which is already set up somewhere else - double transfer_cost_ = 0.0; // Remove a node from the node list void RemoveFromNodeList(SbpNode* this_node); diff --git a/oneflow/core/auto_parallel/sbp_node.cpp b/oneflow/core/auto_parallel/sbp_node.cpp index 4baaf4a5e2b..45133ae17e7 100644 --- a/oneflow/core/auto_parallel/sbp_node.cpp +++ b/oneflow/core/auto_parallel/sbp_node.cpp @@ -537,7 +537,7 @@ void SbpNode::RaiseConsumerNum(const HashMap& op_name2sbp void SbpNode::SpreadAvailWaitTime(const std::vector& trunk_cost, const std::vector& acc_trunk_cost, const HashMap& op_name2sbp_node, - double wait_time, double transfer_cost) { + double wait_time) { // skip the proxy nodes and the sources if (min_layer_ <= 0) { return; } // Have not finished spreading for consumers or downstream nodes or already visited. @@ -577,15 +577,13 @@ void SbpNode::SpreadAvailWaitTime(const std::vector& trunk_cost, // (1) P->S0->S0->S0->B // (2) p->B->B->B->B // We would use (2) when the tensor is relatively tiny. - this_edge->wait_time_ += transfer_cost; // Do not inherit trunk cost for nodes on the trunk if (!producer->on_trunk_) { // Inherit the minimal of the trunk cost from consumers producer->DropAvailWaitTime(curr_trunk_cost); } producer->counter_--; - producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time, - transfer_cost); + producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time); } // Put the rest the trunk cost in the upstream nodes. for (const auto& ctrl_in_op_name : op_node_->op().op_conf().ctrl_in_op_name()) { @@ -601,8 +599,7 @@ void SbpNode::SpreadAvailWaitTime(const std::vector& trunk_cost, producer->DropAvailWaitTime(curr_trunk_cost); } producer->counter_--; - producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time, - transfer_cost); + producer->SpreadAvailWaitTime(trunk_cost, acc_trunk_cost, op_name2sbp_node, wait_time); } } // Set counter_ to be -1, do not visit it again. diff --git a/oneflow/core/auto_parallel/sbp_node.h b/oneflow/core/auto_parallel/sbp_node.h index 5b1f29b6b99..0d093a3d88c 100644 --- a/oneflow/core/auto_parallel/sbp_node.h +++ b/oneflow/core/auto_parallel/sbp_node.h @@ -119,8 +119,8 @@ class SbpNode final { // Compute the minimal available wait time for producers or upstream nodes void SpreadAvailWaitTime(const std::vector& trunk_cost, const std::vector& acc_trunk_cost, - const HashMap& op_name2sbp_node, double wait_time, - double transfer_cost); + const HashMap& op_name2sbp_node, + double wait_time); // Reduce and set the wait time for op in the trunk void SetTrunkWaitTime(double trunk_wait_time); diff --git a/oneflow/core/job/job_conf.proto b/oneflow/core/job/job_conf.proto index 68dfdde357b..90054ba6fa5 100644 --- a/oneflow/core/job/job_conf.proto +++ b/oneflow/core/job/job_conf.proto @@ -267,10 +267,9 @@ message JobConfigProto { optional bool enable_auto_parallel = 700 [default = false]; optional double auto_parallel_computation_cost_ratio = 701 [default = 0.05]; optional double auto_parallel_wait_time = 702 [default = 1.65e4]; - optional double auto_parallel_transfer_cost = 703 [default = 0.0]; - optional bool enable_auto_parallel_mainstem_algo = 704 [default = true]; - optional bool enable_auto_parallel_sbp_collector = 705 [default = false]; - optional bool enable_auto_parallel_ignore_user_sbp_config = 706 [default = false]; + optional bool enable_auto_parallel_mainstem_algo = 703 [default = true]; + optional bool enable_auto_parallel_sbp_collector = 704 [default = false]; + optional bool enable_auto_parallel_ignore_user_sbp_config = 705 [default = false]; optional StraightenAlgorithmTag straighten_algorithm_tag_in_task_graph = 800 [default = kCompressMemory]; diff --git a/python/oneflow/nn/graph/graph_config.py b/python/oneflow/nn/graph/graph_config.py index 48fab9df220..39ea295885b 100644 --- a/python/oneflow/nn/graph/graph_config.py +++ b/python/oneflow/nn/graph/graph_config.py @@ -342,17 +342,6 @@ def set_auto_parallel_wait_time(self, cost): """ self.proto.auto_parallel_wait_time = cost - def set_auto_parallel_transfer_cost(self, cost): - """ - Set transfer cost for auto-parallel algorithm. - - transfer cost: An auto-parallel parameter. Describe the fixed extra time it will take when - communication between devices occurs. It will be added to the copy cost and can not be reduced. - Default value: 0 - Using a positive number such as 1.65e8 would reduce the frequency of data transfer. - """ - self.proto.auto_parallel_transfer_cost = cost - def enable_auto_parallel_mainstem_algo(self, mode: bool = True): """ Find the mainstem of the SBP graph, then reduce the wait time for tributaries. From 02486affe258117fcebc0137aa4f1978d3529d32 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 3 Nov 2022 17:44:32 +0800 Subject: [PATCH 5/6] Rename mainstem to trunk --- docs/source/auto_parallel.rst | 2 +- oneflow/core/auto_parallel/sbp_constructor.h | 2 +- oneflow/core/graph/straighten_nodes.cpp | 36 +++++++++---------- oneflow/core/job/job_conf.proto | 2 +- python/oneflow/nn/graph/graph_config.py | 6 ++-- .../test/graph/test_alexnet_auto_parallel.py | 4 +-- 6 files changed, 25 insertions(+), 27 deletions(-) diff --git a/docs/source/auto_parallel.rst b/docs/source/auto_parallel.rst index d80c725f438..ffa29712403 100644 --- a/docs/source/auto_parallel.rst +++ b/docs/source/auto_parallel.rst @@ -65,6 +65,6 @@ Configuration API for auto parallelism enable_auto_parallel_ignore_user_sbp_config set_auto_parallel_computation_cost_ratio set_auto_parallel_wait_time - enable_auto_parallel_mainstem_algo + enable_auto_parallel_trunk_algo enable_auto_parallel_sbp_collector diff --git a/oneflow/core/auto_parallel/sbp_constructor.h b/oneflow/core/auto_parallel/sbp_constructor.h index 40c01ffba7f..a92b88a5d77 100644 --- a/oneflow/core/auto_parallel/sbp_constructor.h +++ b/oneflow/core/auto_parallel/sbp_constructor.h @@ -38,7 +38,7 @@ class SbpConstructor final { SbpConstructor() = delete; SbpConstructor(const OpGraph& op_graph, Job* job) : cost_ratio_(job->job_conf().auto_parallel_computation_cost_ratio()), - enable_trunk_algo_(job->job_conf().enable_auto_parallel_mainstem_algo()), + enable_trunk_algo_(job->job_conf().enable_auto_parallel_trunk_algo()), use_sbp_collector_(!Singleton::Get() ->resource() .disable_group_boxing_by_dst_parallel() diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 80a0cf875b2..d113e2df6bf 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -40,7 +40,7 @@ class TopoStruct { TaskNode* node = nullptr; int32_t min_layer = -1; int32_t tributary_layer = -1; - bool on_mainstem = false; + bool on_trunk = false; int32_t counter = 0; int32_t min_distance2transfer = -1; int64_t memory_increment = -1; @@ -55,7 +55,7 @@ class TopoStruct { void SpreadTributaryLayer(HashMap* task_node2topo_struct); - void SpreadMainstem(HashMap* task_node2topo_struct); + void SpreadTrunk(HashMap* task_node2topo_struct); // The minimum computation distance from the beginning of this op to the next transfer int32_t GetMinDistance2Transfer(HashMap* task_node2topo_struct); @@ -180,7 +180,7 @@ void TopoStruct::DropTributaryLayer(int32_t upper_bound) { void TopoStruct::SpreadTributaryLayer(HashMap* task_node2topo_struct) { if (counter || min_layer <= 0) { return; } int32_t producer_max_lay = 0; - if (on_mainstem) { + if (on_trunk) { producer_max_lay = min_layer - 1; } else { // On a tributary, the operator could be run later. @@ -196,15 +196,15 @@ void TopoStruct::SpreadTributaryLayer(HashMap* task_node2 counter--; } -// Judge if this node is on the mainstem +// Judge if this node is on the trunk // If so, judge it for its producer/upstream nodes -void TopoStruct::SpreadMainstem(HashMap* task_node2topo_struct) { +void TopoStruct::SpreadTrunk(HashMap* task_node2topo_struct) { // Skip it if this node is already judged. - if (on_mainstem) { return; } + if (on_trunk) { return; } CHECK_GE(min_layer, 0) << "TopoStruct not initialized!"; - on_mainstem = true; - // If I am in the mainstem, then all the children with (min_layer >= my layer id - 1) would be - // considered as in the mainstem + on_trunk = true; + // If I am in the trunk, then all the children with (min_layer >= my layer id - 1) would be + // considered as in the trunk node->ForEachNodeOnInEdge([&](TaskNode* in) { auto& topo_struct_in = task_node2topo_struct->at(in); if (topo_struct_in.min_layer == min_layer - 1) { @@ -281,25 +281,23 @@ int64_t TopoStruct::GetDecidingParameter(int32_t i) const { return 0; } -// Find the mainstem of the task graph, then reduce the wait time for tributaries -void FindMainstem(HashMap* task_node2topo_struct) { +// Find the trunk of the task graph, then reduce the wait time for tributaries +void FindTrunk(HashMap* task_node2topo_struct) { // Find the maximum layer number int32_t max_min_layer = -1; for (const auto& pair : *task_node2topo_struct) { if (max_min_layer < pair.second.min_layer) { max_min_layer = pair.second.min_layer; } } - // All the nodes with min_layer>=mainstem_end_id would be considered as mainstem nodes - // The last 5 layers would be considered as in mainstem anyway. - int32_t mainstem_end_id = max_min_layer - 4; + // All the nodes with min_layer>=trunk_end_id would be considered as trunk nodes + // The last 5 layers would be considered as in trunk anyway. + int32_t trunk_end_id = max_min_layer - 4; for (auto& pair : *task_node2topo_struct) { auto& topo_struct = pair.second; // Initialize the counter and Tributary Layer topo_struct.counter = pair.first->out_edges().size(); topo_struct.tributary_layer = max_min_layer; - // Find out all the nodes on the mainstem. - if (topo_struct.min_layer >= mainstem_end_id) { - topo_struct.SpreadMainstem(task_node2topo_struct); - } + // Find out all the nodes on the trunk. + if (topo_struct.min_layer >= trunk_end_id) { topo_struct.SpreadTrunk(task_node2topo_struct); } } for (auto& pair : *task_node2topo_struct) { @@ -384,7 +382,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task }); // Generate other parameters in the topological data structure - FindMainstem(&task_node2topo_struct); + FindTrunk(&task_node2topo_struct); // Decide which node should run first InitDecideParameters(GlobalJobDesc().job_conf().straighten_algorithm_tag_in_task_graph()); diff --git a/oneflow/core/job/job_conf.proto b/oneflow/core/job/job_conf.proto index 90054ba6fa5..5ab9e524818 100644 --- a/oneflow/core/job/job_conf.proto +++ b/oneflow/core/job/job_conf.proto @@ -267,7 +267,7 @@ message JobConfigProto { optional bool enable_auto_parallel = 700 [default = false]; optional double auto_parallel_computation_cost_ratio = 701 [default = 0.05]; optional double auto_parallel_wait_time = 702 [default = 1.65e4]; - optional bool enable_auto_parallel_mainstem_algo = 703 [default = true]; + optional bool enable_auto_parallel_trunk_algo = 703 [default = true]; optional bool enable_auto_parallel_sbp_collector = 704 [default = false]; optional bool enable_auto_parallel_ignore_user_sbp_config = 705 [default = false]; diff --git a/python/oneflow/nn/graph/graph_config.py b/python/oneflow/nn/graph/graph_config.py index 39ea295885b..74a140db006 100644 --- a/python/oneflow/nn/graph/graph_config.py +++ b/python/oneflow/nn/graph/graph_config.py @@ -342,11 +342,11 @@ def set_auto_parallel_wait_time(self, cost): """ self.proto.auto_parallel_wait_time = cost - def enable_auto_parallel_mainstem_algo(self, mode: bool = True): + def enable_auto_parallel_trunk_algo(self, mode: bool = True): """ - Find the mainstem of the SBP graph, then reduce the wait time for tributaries. + Find the trunk of the SBP graph, then reduce the wait time for tributaries. """ - self.proto.enable_auto_parallel_mainstem_algo = mode + self.proto.enable_auto_parallel_trunk_algo = mode def enable_auto_parallel_sbp_collector(self, mode: bool = True): """ diff --git a/python/oneflow/test/graph/test_alexnet_auto_parallel.py b/python/oneflow/test/graph/test_alexnet_auto_parallel.py index 0749ba8d0f3..ab99f41cfcc 100644 --- a/python/oneflow/test/graph/test_alexnet_auto_parallel.py +++ b/python/oneflow/test/graph/test_alexnet_auto_parallel.py @@ -125,7 +125,7 @@ def __init__(self): self.add_optimizer(of_sgd) self.config.enable_auto_parallel(True) self.config.enable_auto_parallel_ignore_user_sbp_config(True) - self.config.enable_auto_parallel_mainstem_algo(True) + self.config.enable_auto_parallel_trunk_algo(True) self.config.enable_auto_parallel_sbp_collector(True) def build(self, image, label): @@ -142,7 +142,7 @@ def __init__(self): self.alexnet = alexnet_module self.config.enable_auto_parallel(True) self.config.enable_auto_parallel_ignore_user_sbp_config(True) - self.config.enable_auto_parallel_mainstem_algo(True) + self.config.enable_auto_parallel_trunk_algo(True) self.config.enable_auto_parallel_sbp_collector(True) def build(self, image): From 17956e05be4616e21bd389ef16abb636d211155a Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 3 Nov 2022 18:47:10 +0800 Subject: [PATCH 6/6] Update warning --- .../job_rewriter/optimizer_placement_optimization_pass.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp b/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp index bdc3d8d5686..dc1a27c2c75 100644 --- a/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp +++ b/oneflow/core/job_rewriter/optimizer_placement_optimization_pass.cpp @@ -523,7 +523,8 @@ class OptimizerPlacementOptimizationPass final : public JobPass { } if (job->job_conf().enable_auto_parallel() && job->job_conf().enable_auto_parallel_ignore_user_sbp_config()) { - LOG(WARNING) << "ZeRO optimization will be ignored when enabling AutoParallel"; + LOG(WARNING) << "ZeRO optimization will be ignored when enabling AutoParallel to ignore user " + "sbp configuration"; return Maybe::Ok(); } const std::string& mode = ctx->job_desc().job_conf().optimizer_placement_optimization_mode();