Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync data for to_global function #10433

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions oneflow/api/python/framework/tensor_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,14 +694,17 @@ static PyObject* PyTensorObject_local_to_global(PyObject* self, PyObject* args,
PyObject* placement_obj = Py_None;
PyObject* sbp_obj = Py_None;
PyObject* check_meta_obj = Py_True;
PyObject* sync_data_obj = Py_True;
PyObject* copy_obj = Py_False;
static const char* keywords[5] = {"placement", "sbp", "check_meta", "copy", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$O!O!:local_to_global",
static const char* keywords[6] = {"placement", "sbp", "check_meta", "sync_data", "copy", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$O!O!O!:local_to_global",
const_cast<char**>(keywords), &placement_obj, &sbp_obj,
&PyBool_Type, &check_meta_obj, &PyBool_Type, &copy_obj)) {
&PyBool_Type, &check_meta_obj, &PyBool_Type, &sync_data_obj,
&PyBool_Type, &copy_obj)) {
return NULL;
}
const bool check_meta = (check_meta_obj == Py_True);
const bool sync_data = (sync_data_obj == Py_True);
const bool copy = (copy_obj == Py_True);

CHECK_OR_THROW(placement_obj != Py_None && sbp_obj != Py_None)
Expand All @@ -720,8 +723,9 @@ static PyObject* PyTensorObject_local_to_global(PyObject* self, PyObject* args,
<< functional::PyStringAsString(PyObject_Str((PyObject*)Py_TYPE(sbp_obj)));
sbp = functional::PyUnpackSbpParallelSequence(sbp_obj);
}
return PyTensor_New(ASSERT_PTR(functional::ToGlobal(
tensor, functional::PyUnpackParallelDesc(placement_obj), sbp, {}, check_meta, copy)));
return PyTensor_New(
ASSERT_PTR(functional::ToGlobal(tensor, functional::PyUnpackParallelDesc(placement_obj), sbp,
{}, check_meta, sync_data, copy)));
END_HANDLE_ERRORS
}

Expand All @@ -736,14 +740,18 @@ static PyObject* PyTensorObject_global_to_global(PyObject* self, PyObject* args,
std::vector<Symbol<SbpParallel>> sbp;
std::vector<Symbol<SbpParallel>> grad_sbp;
PyObject* check_meta_obj = Py_False;
PyObject* sync_data_obj = Py_True;
PyObject* copy_obj = Py_False;
static const char* keywords[6] = {"placement", "sbp", "grad_sbp", "check_meta", "copy", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$OO!O!:global_to_global",
static const char* keywords[7] = {"placement", "sbp", "grad_sbp", "check_meta",
"sync_data", "copy", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$OO!O!O!:global_to_global",
const_cast<char**>(keywords), &placement_obj, &sbp_obj,
&grad_sbp_obj, &PyBool_Type, &check_meta_obj, &copy_obj)) {
&grad_sbp_obj, &PyBool_Type, &check_meta_obj, &PyBool_Type,
&sync_data_obj, &PyBool_Type, &copy_obj)) {
return NULL;
}
const bool check_meta = (check_meta_obj == Py_True);
const bool sync_data = (sync_data_obj == Py_True);
const bool copy = (copy_obj == Py_True);

// sbp
Expand Down Expand Up @@ -780,8 +788,8 @@ static PyObject* PyTensorObject_global_to_global(PyObject* self, PyObject* args,
} else if (functional::PySbpParallelSequenceCheck(grad_sbp_obj)) {
grad_sbp = functional::PyUnpackSbpParallelSequence(grad_sbp_obj);
}
return PyTensor_New(
ASSERT_PTR(functional::ToGlobal(tensor, placement, sbp, grad_sbp, check_meta, copy)));
return PyTensor_New(ASSERT_PTR(
functional::ToGlobal(tensor, placement, sbp, grad_sbp, check_meta, sync_data, copy)));
END_HANDLE_ERRORS
}

Expand Down Expand Up @@ -845,8 +853,8 @@ static PyObject* PyTensorObject_type_as(PyObject* self, PyObject* args, PyObject
for (int32_t i = 0; i < ndsbp->sbp_parallel_size(); i++) {
sbp.emplace_back(ndsbp->sbp_parallel(i));
}
return PyTensor_New(
ASSERT_PTR(functional::ToGlobal(value_tensor, placement, sbp, {}, true, /*copy=*/false)));
return PyTensor_New(ASSERT_PTR(
functional::ToGlobal(value_tensor, placement, sbp, {}, true, true, /*copy=*/false)));
END_HANDLE_ERRORS
}

Expand Down Expand Up @@ -945,7 +953,7 @@ int PyTensorObject_setitem(PyObject* self, PyObject* item, PyObject* value) {
<< Error::RuntimeError()
<< "tensor_setitem(): value must be a global tensor when self is global";
value_tensor = ASSERT_PTR(
functional::ToGlobal(value_tensor, placement, sbp, {}, true, /*copy=*/false));
functional::ToGlobal(value_tensor, placement, sbp, {}, true, true, /*copy=*/false));
}
} else {
if (functional::PyScalarCheck(value)) {
Expand Down
9 changes: 5 additions & 4 deletions oneflow/api/python/utils/tensor_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ Maybe<Tensor> MakeGlobalTensorFromData(PyObject* data, const Optional<Symbol<DTy
std::vector<Symbol<SbpParallel>> grad_sbp_tuple;
auto global_tensor =
JUST(functional::ToGlobal(broadcast_tensor, placement, sbp_tuple, grad_sbp_tuple,
/* check_meta */ false, /*copy=*/false));
/* check_meta */ false, /* sync_data */ true, /*copy=*/false));
JUST(global_tensor->set_requires_grad(requires_grad));
return global_tensor;
}
Expand All @@ -282,7 +282,7 @@ Maybe<Tensor> MakeTensorFromOtherTensor(const std::shared_ptr<Tensor>& other,
std::vector<Symbol<SbpParallel>> grad_sbp_tuple;
// TODO:(zhaoluyang) global case support pin_memory
return functional::ToGlobal(other, JUST(other->parallel_desc()), sbp_tuple, grad_sbp_tuple,
/* check_meta */ false, /*copy=*/false);
/* check_meta */ false, /* sync_data */ true, /*copy=*/false);
}
}

Expand Down Expand Up @@ -318,8 +318,9 @@ Maybe<Tensor> MakeTensorFromOtherTensor(const std::shared_ptr<Tensor>& other,
const bool requires_grad) {
std::vector<Symbol<SbpParallel>> grad_sbp_tuple;
bool check_meta = other->is_global() ? false : true;
std::shared_ptr<Tensor> tensor = JUST(functional::ToGlobal(
other, placement, sbp_tuple, grad_sbp_tuple, check_meta, /*copy=*/false));
std::shared_ptr<Tensor> tensor =
JUST(functional::ToGlobal(other, placement, sbp_tuple, grad_sbp_tuple, check_meta,
/* sync_data */ true, /*copy=*/false));
if (dtype) {
const Symbol<DType>& dtype_ = JUST(dtype);
if (tensor->dtype() != dtype_) {
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/autograd/autograd_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ Maybe<void> FunctionNode::AccGrad4LeafTensor(bool create_graph) {
const auto& nd_sbp = JUST(tensor_info.sbp());
JUST(out->set_acc_grad(
JUST(functional::ToGlobal(acc_grad, placement, *JUST(GetSbpList(nd_sbp)),
GetNoneSbpList(), /* check_meta */ false, /*copy=*/false))));
GetNoneSbpList(), /* check_meta */ false, /*sync_data*/ true,
/*copy=*/false))));
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions oneflow/core/autograd/gradient_funcs/global_cast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ class LocalToGlobal : public OpExprGradFunction<CastGlobalCaptureState> {
{
Symbol<NdSbp> nd_sbp_constraint = ctx->nd_sbp;
Symbol<ParallelDesc> parallel_desc_constraint = ctx->parallel_desc;
out_grad = JUST(functional::ToGlobal(out_grad, parallel_desc_constraint,
*JUST(GetSbpList(nd_sbp_constraint)), GetNoneSbpList(),
/* check_meta */ false, /*copy=*/false));
out_grad =
JUST(functional::ToGlobal(out_grad, parallel_desc_constraint,
*JUST(GetSbpList(nd_sbp_constraint)), GetNoneSbpList(),
/* check_meta */ false, /* sync_data */ true, /*copy=*/false));
}
in_grads->at(0) = JUST(OpInterpUtil::Dispatch<Tensor>(*grad_op_, {out_grad}));
return Maybe<void>::Ok();
Expand Down
8 changes: 5 additions & 3 deletions oneflow/core/autograd/gradient_funcs/global_to_global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ class GlobalToGlobalGradFunction : public OpExprGradFunction<GlobalToGlobalState
const auto& grad_sbp_list = JUST(GetSbpList(grad_nd_sbp));

if (LazyMode::is_enabled()) {
(*in_grads)[0] = JUST(one::functional::ToGlobal(out_grad, ctx->parallel_desc, *grad_sbp_list,
{}, /* check_meta */ false, /*copy=*/false));
(*in_grads)[0] =
JUST(one::functional::ToGlobal(out_grad, ctx->parallel_desc, *grad_sbp_list, {},
/* check_meta */ false, /* sync_data */ true,
/*copy=*/false));
} else {
const auto& grad_grad_sbp_list = JUST(GetSbpList(ctx->nd_sbp));
(*in_grads)[0] = JUST(one::functional::ToGlobal(out_grad, ctx->parallel_desc, *grad_sbp_list,
*grad_grad_sbp_list, /* check_meta */ false,
/*copy=*/false));
/* sync_data */ true, /*copy=*/false));
}
return Maybe<void>::Ok();
}
Expand Down
5 changes: 3 additions & 2 deletions oneflow/core/framework/nn_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ Maybe<void> NNGraph::GetVariableRealBlobAfterSyncPlan() {
// To consistent from a local or global tensor.
bool check_meta = load_tensor_iter->second->is_global() ? false : true;
tensor = JUST(one::functional::ToGlobal(load_tensor_iter->second, placement, *sbp_tuple,
grad_sbp_tuple, check_meta, /*copy=*/false));
grad_sbp_tuple, check_meta, /* sync_data */ true,
/*copy=*/false));
JUST(vm::CurrentRankSync());
VLOG(2) << "Lazy nn.Graph name " << name_ << " op: " << op_attribute.op_conf().name()
<< " created in JobPass, nn.Graph has loaded the tensor from state dict for this "
Expand Down Expand Up @@ -782,7 +783,7 @@ Maybe<void> NNGraph::GetVariableRealBlobAfterSyncPlan() {
auto lazy_mode_disabled_guard = LazyMode::Guard(/* is_enabled */ false);
const auto& new_tensor = JUST(one::functional::ToGlobal(
tensor, JUST(tensor->parallel_desc()), optimized_sbp_parallels, {},
/* check_meta */ false, /*copy=*/false));
/* check_meta */ false, /* sync_data */ true, /*copy=*/false));
JUST(vm::CurrentRankSync());
// Use tensor.set_data inferface and make new TensorImpl instead of the old one.
JUST(tensor->set_data(new_tensor));
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/framework/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ Maybe<Tensor> GlobalTensor::clone() const {
std::shared_ptr<Tensor> input = std::const_pointer_cast<Tensor>(shared_from_this());
DisableCheckGlobalTensorMetaScope disable_meta_check{};
return JUST(functional::ToGlobal(input, JUST(parallel_desc()), *JUST(GetSbpList(JUST(nd_sbp()))),
/*grad_sbp_parallels=*/{}, /* sync_data */ true, /*copy=*/true));
/*grad_sbp_parallels=*/{}, /* check_meta */ true,
/* sync_data */ true, /*copy=*/true));
}

Maybe<GlobalTensor> GlobalTensor::MakeTensor(const std::shared_ptr<const Shape>& shape,
Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/framework/tensor_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Maybe<void> GetItemInScalarTensor(const std::shared_ptr<Tensor>& scalar_tensor,
}
const auto& broadcast_sbp = JUST(MakeBroadcastSbpParallel());
tensor = JUST(functional::ToGlobal(tensor, parallel_desc, {broadcast_sbp}, /*grad_sbp=*/{},
/*check_meta=*/false, /*copy=*/false));
/*check_meta=*/false, /*sync_data=*/true, /*copy=*/false));
tensor = JUST(functional::GlobalToLocal(tensor, /*copy=*/false));
}
local_tensor = JUST(tensor->AsLocalTensor());
Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/functional/functional_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,7 @@
bind_python: False

- name: "to_global"
signature: "Tensor (Tensor x, Placement placement, SbpList sbp, SbpList grad_sbp, Bool check_meta, Bool copy=False) => ToGlobal"
signature: "Tensor (Tensor x, Placement placement, SbpList sbp, SbpList grad_sbp, Bool check_meta, Bool sync_data, Bool copy=False) => ToGlobal"
bind_python: True

- name: "to_local"
Expand Down
4 changes: 2 additions & 2 deletions oneflow/core/functional/impl/array_functor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class GlobalTensorConstantFunctor {
const auto& fixed_sbp_tuple = JUST(NdSbpReplacePartialByBroadcast(sbp_tuple));
const auto& tensor = JUST(dispatch_constant(*fixed_sbp_tuple));
return functional::ToGlobal(tensor, placement, sbp_tuple, {}, /* check_meta */ false,
/*copy*/ false);
/* sync_data */ true, /*copy*/ false);
} else {
return dispatch_constant(sbp_tuple);
}
Expand Down Expand Up @@ -237,7 +237,7 @@ class GlobalConstantFunctor {
const auto& fixed_sbp_tuple = JUST(NdSbpReplacePartialByBroadcast(sbp_tuple));
const auto& tensor = JUST(dispatch_constant(*fixed_sbp_tuple));
return functional::ToGlobal(tensor, placement, sbp_tuple, {}, /* check_meta */ false,
/*copy*/ false);
/* sync_data */ true, /*copy*/ false);
} else {
return dispatch_constant(sbp_tuple);
}
Expand Down
9 changes: 5 additions & 4 deletions oneflow/core/functional/impl/global_cast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ class ToGlobalFunctor {
Symbol<ParallelDesc> parallel_desc,
const std::vector<Symbol<SbpParallel>>& sbp_parallels,
const std::vector<Symbol<SbpParallel>>& grad_sbp_parallels,
bool check_meta, bool copy) const {
bool check_meta, bool sync_data, bool copy) const {
JUST(CheckDeviceIdsIsValid(parallel_desc));
NonRecursiveMetaInfoConsistencyCheckScope scope;
JUST(MetaInfoConsistencyCheck(parallel_desc, sbp_parallels, grad_sbp_parallels, 1,
Expand All @@ -531,8 +531,9 @@ class ToGlobalFunctor {
} else {
DeviceType device_type = parallel_desc->device_type();
if (ccl::IsBroadcastRegistered(device_type)) {
tensor = JUST(LocalToGlobal(x, parallel_desc, sbp_parallels, NullOpt, NullOpt,
local_to_global_op_, check_meta, /* sync_data */ true, copy));
tensor =
JUST(LocalToGlobal(x, parallel_desc, sbp_parallels, NullOpt, NullOpt,
local_to_global_op_, check_meta, /* sync_data */ sync_data, copy));
} else {
// Assuming that the newly adapted hardware device does not support collective
// communication, since local to global may need to synchronize data (through the
Expand All @@ -543,7 +544,7 @@ class ToGlobalFunctor {
JUST(ReplaceDeviceType(parallel_desc, DeviceType::kCPU));
std::shared_ptr<Tensor> cpu_tensor =
JUST(LocalToGlobal(x, cpu_parallel_desc, sbp_parallels, NullOpt, NullOpt,
local_to_global_op_, check_meta, /* sync_data */ true, copy));
local_to_global_op_, check_meta, /* sync_data */ sync_data, copy));
tensor =
JUST(GlobalToGlobal(cpu_tensor, parallel_desc, sbp_parallels, GetNoneSbpList(), copy));
}
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/functional/impl/math_functor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,8 @@ class GlobalHannWindowFunctor {
result = JUST(ScalarDiv(JUST(ScalarSub(1, JUST(Cos(div_result)), 1)), 2));
}
}
result = JUST(ToGlobal(result, placement, sbp, {}, true, /*copy=*/false));
result = JUST(ToGlobal(result, placement, sbp, {}, /* check_meta */ true,
/* sync_data */ true, /*copy=*/false));
JUST(result->set_requires_grad(requires_grad));
return result;
}
Expand Down
9 changes: 5 additions & 4 deletions oneflow/core/functional/impl/nn_functor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2316,10 +2316,10 @@ class SparseSoftmaxCrossEntropyFunctor {
s0s1_sbp_parallels.emplace_back(logits_nd_sbp.sbp_parallel(1));
max_global_stage_input0 = JUST(functional::ToGlobal(
(*max_device_stage)[0], JUST((*max_device_stage)[0]->parallel_desc()), new_sbp_parallels,
s0s1_sbp_parallels, /* check_meta */ false, /*copy=*/false));
s0s1_sbp_parallels, /* check_meta */ false, /* sync_data */ true, /*copy=*/false));
max_global_stage_input1 = JUST(functional::ToGlobal(
(*max_device_stage)[2], JUST((*max_device_stage)[0]->parallel_desc()), new_sbp_parallels,
s0s1_sbp_parallels, /* check_meta */ false, /*copy=*/false));
s0s1_sbp_parallels, /* check_meta */ false, /* sync_data */ true, /*copy=*/false));
}
// op_reduce_max_global_stage_
auto& reduce_max_global_attrs = THREAD_CACHED_MUTABLE_ATTR_MAP("axis", "keepdims");
Expand All @@ -2331,7 +2331,7 @@ class SparseSoftmaxCrossEntropyFunctor {
if (logits_nd_sbp.sbp_parallel_size() == 2) {
broadcast_sub_input = JUST(functional::ToGlobal(
broadcast_sub_input, JUST((*max_device_stage)[0]->parallel_desc()), new_sbp_parallels,
new_sbp_parallels, /* check_meta */ false, /*copy=*/false));
new_sbp_parallels, /* check_meta */ false, /* sync_data */ true, /*copy=*/false));
}
// op_broadcast_sub_
const auto& output_broadcast_sub = JUST(
Expand All @@ -2349,7 +2349,8 @@ class SparseSoftmaxCrossEntropyFunctor {
std::vector<Symbol<SbpParallel>> empty_grad_sbp_parallels;
broadcast_div_input1 = JUST(functional::ToGlobal(
(*output_reduce_sum)[0], JUST((*output_reduce_sum)[0]->parallel_desc()),
new_sbp_parallels, new_sbp_parallels, /* check_meta */ false, /*copy=*/false));
new_sbp_parallels, new_sbp_parallels, /* check_meta */ false, /* sync_data */ true,
/*copy=*/false));
}
// op_broadcast_div_
const auto& predictions = JUST(OpInterpUtil::Dispatch<TensorTuple>(
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/functional/tensor_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ Maybe<void> UnifyInputAndIndicesOnDevice(const std::shared_ptr<Tensor>& x,
LazyMode::Guard lazy_mode_disabled_guard(/*is_enabled*/ false);
tensor_indices[i] = JUST(ToGlobal(tensor_index, placement,
std::vector<Symbol<SbpParallel>>(n, broadcast_sbp),
grad_sbp_tuple, /*check_meta=*/false, /*copy=*/false));
grad_sbp_tuple, /*check_meta=*/false, /*sync_data*/ true,
/*copy=*/false));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions python/oneflow/framework/docstr/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@
add_docstr(
oneflow.Tensor.local_to_global,
"""
Tensor.local_to_global(placement=None, sbp=None, *, check_meta=True, copy=False) -> Tensor
Tensor.local_to_global(placement=None, sbp=None, *, check_meta=True, sync_data=True, copy=False) -> Tensor

Creates a global tensor from a local tensor.

Expand Down Expand Up @@ -426,7 +426,7 @@
>>> # Run on 2 ranks respectively
>>> import oneflow as flow
>>> input = flow.tensor([0., 1.], dtype=flow.float32) # doctest: +SKIP
>>> output = input.local_to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False) # doctest: +SKIP
>>> output = input.local_to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False, sync_data=True) # doctest: +SKIP
>>> print(output.size()) # doctest: +SKIP
>>> print(output) # doctest: +SKIP

Expand Down Expand Up @@ -539,7 +539,7 @@
>>> # Run on 2 ranks respectively
>>> import oneflow as flow
>>> input = flow.tensor([0., 1.], dtype=flow.float32) # doctest: +SKIP
>>> output = input.to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False) # doctest: +SKIP
>>> output = input.to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False, sync_data=True) # doctest: +SKIP
>>> print(output.size()) # doctest: +SKIP
>>> print(output) # doctest: +SKIP

Expand Down
Loading