Skip to content

Commit

Permalink
Record min/max of integral tensor in ET (pytorch#143088)
Browse files Browse the repository at this point in the history
Summary:
X-link: facebookresearch/param#191


In et-replay, random data is used to run the operators. However, it does not work well for the op that uses index to access tensor. For example, embedding ops, which use the indices to look up the embedding table. If random data is used for these index ops, et-replay usually runs into invalid memory access issue.

To fix it, ET provides an environment variable "ENABLE_PYTORCH_EXECUTION_TRACE_INTEGRAL_TENSOR_RANGE", if it is set, ET will capture the min/max value of the flattened integral tensor. Then in et_replay, the min/max is used to generate the random tensor within that range. It fixed invalid memory access issue.

Test Plan: buck2 run mode/opt caffe2/test:test_profiler_cuda -- profiler.test_execution_trace.TestExecutionTraceCUDA.test_execution_trace_record_integral_tensor_range_cuda

Differential Revision: D66666931
  • Loading branch information
shengfukevin authored and facebook-github-bot committed Dec 13, 2024
1 parent bf711a9 commit 5359852
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 14 deletions.
33 changes: 33 additions & 0 deletions test/profiler/test_execution_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
pass

import json
import os
import sys
import tempfile
import unittest
Expand Down Expand Up @@ -411,6 +412,38 @@ def fn(nt):
found_cos = True
assert found_cos

@unittest.skipIf(
not TEST_CUDA,
"need CUDA device availability to run",
)
def test_execution_trace_record_integral_tensor_range(self):
fp = tempfile.NamedTemporaryFile("w+t", suffix=".et.json", delete=False)
fp.close()

os.environ["ENABLE_PYTORCH_EXECUTION_TRACE_INTEGRAL_TENSOR_RANGE"] = "1"
t1 = torch.tensor([[1, 2], [3, 4]]).cuda()
t2 = torch.tensor([[0, 0], [1, 0]]).cuda()
with profile(
activities=supported_activities(),
schedule=torch.profiler.schedule(
skip_first=0, wait=0, warmup=0, active=1, repeat=1
),
record_shapes=True,
execution_trace_observer=(
ExecutionTraceObserver().register_callback(fp.name)
),
) as p:
torch.gather(t1, 1, t2)
p.step()

nodes = self.get_execution_trace_root(fp.name)
for n in nodes:
assert "name" in n
if "aten::gather" in n["name"]:
for attr in n["attrs"]:
if attr["name"] == "tensor_range":
assert attr["value"] == '{"0":[1,4],"1":[0,1]}'


devices = ["cpu", "cuda"]
if TEST_XPU:
Expand Down
104 changes: 90 additions & 14 deletions torch/csrc/profiler/standalone/execution_trace_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ struct TORCH_API ExecutionTraceObserver { // NOLINT
state_ = newState;
}

bool record_integral_tensor_range{false};

private:
static bool callbackShouldBeEnabled(RunState run_state) {
return run_state == ExecutionTraceObserver::RunState::enabled;
Expand Down Expand Up @@ -189,6 +191,28 @@ struct FunctionCallContext : public ObserverContext { // NOLINT
std::vector<std::string> inputShapes;
std::vector<std::string> inputStrides;
std::vector<std::string> inputValues;
std::map<int, std::pair<long, long>> tensor_index_min_max_map;

std::string get_string_for_tensor_range() {
if (tensor_index_min_max_map.empty()) {
return "";
}

std::string result = "{";
unsigned int i = 0;
for (auto const& [key, value] : tensor_index_min_max_map) {
if (i == tensor_index_min_max_map.size() - 1) {
result += json_str_escape(
fmt::format("\"{}\":[{},{}]", key, value.first, value.second));
} else {
result += json_str_escape(
fmt::format("\"{}\":[{},{}],", key, value.first, value.second));
}
i++;
}
result += "}";
return result;
}
};

// Opens the json file to write the execution trace.
Expand Down Expand Up @@ -240,14 +264,15 @@ static void writeJsonNode(
const std::string& operator_schema = "",
const std::string& kernelBackend = "",
const std::string& kernelFile = "",
const std::string& tensor_range = "",
const std::string& additiona_attrs = "") {
out << fmt::format(
R"JSON(
{{
"id": {}, "name": "{}", "ctrl_deps": {},
"inputs": {{"values": {}, "shapes": {}, "types": {}, "strides": {}}},
"outputs": {{"values": {}, "shapes": {}, "types": {}, "strides": {}}},
"attrs": [{{"name": "rf_id", "type": "uint64", "value": {}}},{{"name": "fw_parent", "type": "uint64", "value": {}}},{{"name": "seq_id", "type": "int64", "value": {}}},{{"name": "scope", "type": "uint64", "value": {}}},{{"name": "tid", "type": "uint64", "value": {}}},{{"name": "fw_tid", "type": "uint64", "value": {}}},{{"name": "op_schema", "type": "string", "value": "{}"}},{{"name": "kernel_backend", "type": "string", "value": "{}"}},{{"name": "kernel_file", "type": "string", "value": "{}"}}{}]
"attrs": [{{"name": "rf_id", "type": "uint64", "value": {}}},{{"name": "fw_parent", "type": "uint64", "value": {}}},{{"name": "seq_id", "type": "int64", "value": {}}},{{"name": "scope", "type": "uint64", "value": {}}},{{"name": "tid", "type": "uint64", "value": {}}},{{"name": "fw_tid", "type": "uint64", "value": {}}},{{"name": "op_schema", "type": "string", "value": "{}"}},{{"name": "kernel_backend", "type": "string", "value": "{}"}},{{"name": "kernel_file", "type": "string", "value": "{}"}},{{"name": "tensor_range", "type": "string", "value": "{}"}}{}]
}})JSON",
id,
name,
Expand All @@ -269,6 +294,7 @@ static void writeJsonNode(
operator_schema,
kernelBackend,
kernelFile,
tensor_range,
additiona_attrs);
}

Expand Down Expand Up @@ -354,6 +380,9 @@ static ExecutionTraceObserver::ID getObjectID(
static std::tuple<std::string, std::string, std::string, std::string>
convertIValue(
ExecutionTraceObserver& ob,
int& tensorIndex,
std::map<int, std::pair<long, long>>& tensor_index_min_max_map,
bool isInput,
const c10::IValue& val,
const bool baseType = true,
const size_t maxArrayLen = kMaxNumElements) {
Expand Down Expand Up @@ -391,7 +420,18 @@ convertIValue(
numel = tensor_impl->numel();
itemsize = tensor_impl->itemsize();
device_str = tensor_impl->device().str();

if (ob.record_integral_tensor_range && isInput &&
at::isIntegralType(tensor.scalar_type(), false) &&
tensor.numel() != 0) {
enableRecordFunction(false);
long min = tensor.min().item().toLong();
long max = tensor.max().item().toLong();
enableRecordFunction(true);
tensor_index_min_max_map[tensorIndex] = std::make_pair(min, max);
}
}
tensorIndex++;
tensor_value = fmt::format(
"[{},{},{},{},{},\"{}\"]",
tensor_id,
Expand All @@ -410,7 +450,14 @@ convertIValue(
std::vector<std::string> type_array;
std::vector<std::string> value_array;
for (const auto j : c10::irange(tuple_size)) {
auto tuple = convertIValue(ob, val_tuple[j], false, maxArrayLen);
auto tuple = convertIValue(
ob,
tensorIndex,
tensor_index_min_max_map,
isInput,
val_tuple[j],
false,
maxArrayLen);
shape_array.push_back(std::get<0>(tuple));
stride_array.push_back(std::get<1>(tuple));
type_array.push_back(std::get<2>(tuple));
Expand All @@ -431,7 +478,14 @@ convertIValue(
std::vector<std::string> type_array;
std::vector<std::string> value_array;
for (const auto j : c10::irange(list_size)) {
auto tuple = convertIValue(ob, val_list.get(j), false, maxArrayLen);
auto tuple = convertIValue(
ob,
tensorIndex,
tensor_index_min_max_map,
isInput,
val_list.get(j),
false,
maxArrayLen);
shape_array.push_back(std::get<0>(tuple));
stride_array.push_back(std::get<1>(tuple));
type_array.push_back(std::get<2>(tuple));
Expand Down Expand Up @@ -462,13 +516,16 @@ convertIValue(

static void appendValueInfo(
ExecutionTraceObserver& ob,
int& tensorIndex,
std::map<int, std::pair<long, long>>& tensor_index_min_max_map,
bool isInput,
const c10::IValue& val,
std::vector<std::string>& shapes,
std::vector<std::string>& strides,
std::vector<std::string>& types,
std::vector<std::string>& values) {
auto tuple = convertIValue(ob, val, true);

auto tuple = convertIValue(
ob, tensorIndex, tensor_index_min_max_map, isInput, val, true);
shapes.push_back(std::get<0>(tuple));
strides.push_back(std::get<1>(tuple));
types.push_back(std::get<2>(tuple));
Expand Down Expand Up @@ -529,9 +586,10 @@ inline std::string getCommsNodeAttrs(const RecordFunction& fn) { // NOLINT
}

// get NcclMeta from record function, this used ParamCommsDebugInfo above
// since we currently have this read called in onFunctionExit flow, we should
// only introspect output tensors to prevent an INTERNAL ASSERT FAILED in
// RecordFunction when we try to read input in RecordFunction exit methods.
// since we currently have this read called in onFunctionExit flow, we
// should only introspect output tensors to prevent an INTERNAL ASSERT
// FAILED in RecordFunction when we try to read input in RecordFunction exit
// methods.
auto meta = saveNcclMeta(fn, SaveNcclMetaConfig(false, true, false, true));

auto addAttr =
Expand Down Expand Up @@ -577,7 +635,8 @@ static void recordOperatorStart(
{
const std::lock_guard<std::recursive_mutex> lock(ob.gMutex);

// if current thread stack is empty, push the root node to the stack first
// if current thread stack is empty, push the root node to the stack
// first
if (ob.opStack[tid].empty()) {
auto thread_node_id = ob.getNewID();
ob.opStack[tid].push(thread_node_id);
Expand Down Expand Up @@ -605,10 +664,15 @@ static void recordOperatorStart(
const auto inputs = fn.inputs();
// need to account for Stack mode where the inputs are at the end.
size_t input_start = inputs.size() - num_inputs;

// tensor_index is the index of the flattened tensor list for all input
// tensors
int tensor_index = 0;
for (const auto i : c10::irange(input_start, inputs.size())) {
appendValueInfo(
ob,
tensor_index,
fc.tensor_index_min_max_map,
true,
inputs[i],
fc.inputShapes,
fc.inputStrides,
Expand All @@ -623,8 +687,8 @@ static void recordOperatorStart(

fc.parentId = ob.opStack[tid].top();
// get parent id from the forward stack, this can be different for
// autograd ops, which may execute on a different thread than the original
// thread (which should have the parent op on the stack).
// autograd ops, which may execute on a different thread than the
// original thread (which should have the parent op on the stack).
auto fw_tid = fn.forwardThreadId();
if (fw_tid != 0) {
fc.fwParentId = ob.opStack[fw_tid].top();
Expand Down Expand Up @@ -706,9 +770,13 @@ static void onFunctionExit(const RecordFunction& fn, ObserverContext* ctx_ptr) {
std::vector<std::string> output_shapes;
std::vector<std::string> output_values;
try {
int tensor_index = 0;
for (const auto i : c10::irange(output_start, outputs.size())) {
appendValueInfo(
*ob,
tensor_index,
fc.tensor_index_min_max_map,
false,
outputs.at(i),
output_shapes,
output_strides,
Expand Down Expand Up @@ -752,6 +820,7 @@ static void onFunctionExit(const RecordFunction& fn, ObserverContext* ctx_ptr) {
op_schema_str,
fc.kernelBackend,
fc.kernelFile,
fc.get_string_for_tensor_range(),
additiona_attrs);
ob->out << ",";
}
Expand All @@ -762,8 +831,8 @@ static void onFunctionExit(const RecordFunction& fn, ObserverContext* ctx_ptr) {
}
}

// Add execution trace observer callback functions to the RecordFunction global
// observers.
// Add execution trace observer callback functions to the RecordFunction
// global observers.
bool addExecutionTraceObserver(const std::string& output_file_path) {
// Check if the observer is already initialized.
if (ObserverManager::get() == nullptr) {
Expand All @@ -776,6 +845,13 @@ bool addExecutionTraceObserver(const std::string& output_file_path) {
return false;
}

// check if the environment variable is set to force recording integer
// tensors
auto env_variable =
getenv("ENABLE_PYTORCH_EXECUTION_TRACE_INTEGRAL_TENSOR_RANGE");
if (env_variable != nullptr) {
ob.record_integral_tensor_range = true;
}
ob.cbHandle = addGlobalCallback(
RecordFunctionCallback(&onFunctionEnter, &onFunctionExit)
.needsInputs(true)
Expand Down

0 comments on commit 5359852

Please sign in to comment.