From 53598522b5a075a704cee8f9c0eded58010992e9 Mon Sep 17 00:00:00 2001 From: Sheng Fu Date: Fri, 13 Dec 2024 11:09:27 -0800 Subject: [PATCH] Record min/max of integral tensor in ET (#143088) Summary: X-link: https://github.com/facebookresearch/param/pull/191 In et-replay, random data is used to run the operators. However, it does not work well for the op that uses index to access tensor. For example, embedding ops, which use the indices to look up the embedding table. If random data is used for these index ops, et-replay usually runs into invalid memory access issue. To fix it, ET provides an environment variable "ENABLE_PYTORCH_EXECUTION_TRACE_INTEGRAL_TENSOR_RANGE", if it is set, ET will capture the min/max value of the flattened integral tensor. Then in et_replay, the min/max is used to generate the random tensor within that range. It fixed invalid memory access issue. Test Plan: buck2 run mode/opt caffe2/test:test_profiler_cuda -- profiler.test_execution_trace.TestExecutionTraceCUDA.test_execution_trace_record_integral_tensor_range_cuda Differential Revision: D66666931 --- test/profiler/test_execution_trace.py | 33 ++++++ .../standalone/execution_trace_observer.cpp | 104 +++++++++++++++--- 2 files changed, 123 insertions(+), 14 deletions(-) diff --git a/test/profiler/test_execution_trace.py b/test/profiler/test_execution_trace.py index 427bc9e27f4f97..2f8981742a7f33 100644 --- a/test/profiler/test_execution_trace.py +++ b/test/profiler/test_execution_trace.py @@ -14,6 +14,7 @@ pass import json +import os import sys import tempfile import unittest @@ -411,6 +412,38 @@ def fn(nt): found_cos = True assert found_cos + @unittest.skipIf( + not TEST_CUDA, + "need CUDA device availability to run", + ) + def test_execution_trace_record_integral_tensor_range(self): + fp = tempfile.NamedTemporaryFile("w+t", suffix=".et.json", delete=False) + fp.close() + + os.environ["ENABLE_PYTORCH_EXECUTION_TRACE_INTEGRAL_TENSOR_RANGE"] = "1" + t1 = torch.tensor([[1, 2], [3, 4]]).cuda() + t2 = torch.tensor([[0, 0], [1, 0]]).cuda() + with profile( + activities=supported_activities(), + schedule=torch.profiler.schedule( + skip_first=0, wait=0, warmup=0, active=1, repeat=1 + ), + record_shapes=True, + execution_trace_observer=( + ExecutionTraceObserver().register_callback(fp.name) + ), + ) as p: + torch.gather(t1, 1, t2) + p.step() + + nodes = self.get_execution_trace_root(fp.name) + for n in nodes: + assert "name" in n + if "aten::gather" in n["name"]: + for attr in n["attrs"]: + if attr["name"] == "tensor_range": + assert attr["value"] == '{"0":[1,4],"1":[0,1]}' + devices = ["cpu", "cuda"] if TEST_XPU: diff --git a/torch/csrc/profiler/standalone/execution_trace_observer.cpp b/torch/csrc/profiler/standalone/execution_trace_observer.cpp index 4d1fc62857063f..1f6953667c99ca 100644 --- a/torch/csrc/profiler/standalone/execution_trace_observer.cpp +++ b/torch/csrc/profiler/standalone/execution_trace_observer.cpp @@ -153,6 +153,8 @@ struct TORCH_API ExecutionTraceObserver { // NOLINT state_ = newState; } + bool record_integral_tensor_range{false}; + private: static bool callbackShouldBeEnabled(RunState run_state) { return run_state == ExecutionTraceObserver::RunState::enabled; @@ -189,6 +191,28 @@ struct FunctionCallContext : public ObserverContext { // NOLINT std::vector inputShapes; std::vector inputStrides; std::vector inputValues; + std::map> tensor_index_min_max_map; + + std::string get_string_for_tensor_range() { + if (tensor_index_min_max_map.empty()) { + return ""; + } + + std::string result = "{"; + unsigned int i = 0; + for (auto const& [key, value] : tensor_index_min_max_map) { + if (i == tensor_index_min_max_map.size() - 1) { + result += json_str_escape( + fmt::format("\"{}\":[{},{}]", key, value.first, value.second)); + } else { + result += json_str_escape( + fmt::format("\"{}\":[{},{}],", key, value.first, value.second)); + } + i++; + } + result += "}"; + return result; + } }; // Opens the json file to write the execution trace. @@ -240,6 +264,7 @@ static void writeJsonNode( const std::string& operator_schema = "", const std::string& kernelBackend = "", const std::string& kernelFile = "", + const std::string& tensor_range = "", const std::string& additiona_attrs = "") { out << fmt::format( R"JSON( @@ -247,7 +272,7 @@ static void writeJsonNode( "id": {}, "name": "{}", "ctrl_deps": {}, "inputs": {{"values": {}, "shapes": {}, "types": {}, "strides": {}}}, "outputs": {{"values": {}, "shapes": {}, "types": {}, "strides": {}}}, - "attrs": [{{"name": "rf_id", "type": "uint64", "value": {}}},{{"name": "fw_parent", "type": "uint64", "value": {}}},{{"name": "seq_id", "type": "int64", "value": {}}},{{"name": "scope", "type": "uint64", "value": {}}},{{"name": "tid", "type": "uint64", "value": {}}},{{"name": "fw_tid", "type": "uint64", "value": {}}},{{"name": "op_schema", "type": "string", "value": "{}"}},{{"name": "kernel_backend", "type": "string", "value": "{}"}},{{"name": "kernel_file", "type": "string", "value": "{}"}}{}] + "attrs": [{{"name": "rf_id", "type": "uint64", "value": {}}},{{"name": "fw_parent", "type": "uint64", "value": {}}},{{"name": "seq_id", "type": "int64", "value": {}}},{{"name": "scope", "type": "uint64", "value": {}}},{{"name": "tid", "type": "uint64", "value": {}}},{{"name": "fw_tid", "type": "uint64", "value": {}}},{{"name": "op_schema", "type": "string", "value": "{}"}},{{"name": "kernel_backend", "type": "string", "value": "{}"}},{{"name": "kernel_file", "type": "string", "value": "{}"}},{{"name": "tensor_range", "type": "string", "value": "{}"}}{}] }})JSON", id, name, @@ -269,6 +294,7 @@ static void writeJsonNode( operator_schema, kernelBackend, kernelFile, + tensor_range, additiona_attrs); } @@ -354,6 +380,9 @@ static ExecutionTraceObserver::ID getObjectID( static std::tuple convertIValue( ExecutionTraceObserver& ob, + int& tensorIndex, + std::map>& tensor_index_min_max_map, + bool isInput, const c10::IValue& val, const bool baseType = true, const size_t maxArrayLen = kMaxNumElements) { @@ -391,7 +420,18 @@ convertIValue( numel = tensor_impl->numel(); itemsize = tensor_impl->itemsize(); device_str = tensor_impl->device().str(); + + if (ob.record_integral_tensor_range && isInput && + at::isIntegralType(tensor.scalar_type(), false) && + tensor.numel() != 0) { + enableRecordFunction(false); + long min = tensor.min().item().toLong(); + long max = tensor.max().item().toLong(); + enableRecordFunction(true); + tensor_index_min_max_map[tensorIndex] = std::make_pair(min, max); + } } + tensorIndex++; tensor_value = fmt::format( "[{},{},{},{},{},\"{}\"]", tensor_id, @@ -410,7 +450,14 @@ convertIValue( std::vector type_array; std::vector value_array; for (const auto j : c10::irange(tuple_size)) { - auto tuple = convertIValue(ob, val_tuple[j], false, maxArrayLen); + auto tuple = convertIValue( + ob, + tensorIndex, + tensor_index_min_max_map, + isInput, + val_tuple[j], + false, + maxArrayLen); shape_array.push_back(std::get<0>(tuple)); stride_array.push_back(std::get<1>(tuple)); type_array.push_back(std::get<2>(tuple)); @@ -431,7 +478,14 @@ convertIValue( std::vector type_array; std::vector value_array; for (const auto j : c10::irange(list_size)) { - auto tuple = convertIValue(ob, val_list.get(j), false, maxArrayLen); + auto tuple = convertIValue( + ob, + tensorIndex, + tensor_index_min_max_map, + isInput, + val_list.get(j), + false, + maxArrayLen); shape_array.push_back(std::get<0>(tuple)); stride_array.push_back(std::get<1>(tuple)); type_array.push_back(std::get<2>(tuple)); @@ -462,13 +516,16 @@ convertIValue( static void appendValueInfo( ExecutionTraceObserver& ob, + int& tensorIndex, + std::map>& tensor_index_min_max_map, + bool isInput, const c10::IValue& val, std::vector& shapes, std::vector& strides, std::vector& types, std::vector& values) { - auto tuple = convertIValue(ob, val, true); - + auto tuple = convertIValue( + ob, tensorIndex, tensor_index_min_max_map, isInput, val, true); shapes.push_back(std::get<0>(tuple)); strides.push_back(std::get<1>(tuple)); types.push_back(std::get<2>(tuple)); @@ -529,9 +586,10 @@ inline std::string getCommsNodeAttrs(const RecordFunction& fn) { // NOLINT } // get NcclMeta from record function, this used ParamCommsDebugInfo above - // since we currently have this read called in onFunctionExit flow, we should - // only introspect output tensors to prevent an INTERNAL ASSERT FAILED in - // RecordFunction when we try to read input in RecordFunction exit methods. + // since we currently have this read called in onFunctionExit flow, we + // should only introspect output tensors to prevent an INTERNAL ASSERT + // FAILED in RecordFunction when we try to read input in RecordFunction exit + // methods. auto meta = saveNcclMeta(fn, SaveNcclMetaConfig(false, true, false, true)); auto addAttr = @@ -577,7 +635,8 @@ static void recordOperatorStart( { const std::lock_guard lock(ob.gMutex); - // if current thread stack is empty, push the root node to the stack first + // if current thread stack is empty, push the root node to the stack + // first if (ob.opStack[tid].empty()) { auto thread_node_id = ob.getNewID(); ob.opStack[tid].push(thread_node_id); @@ -605,10 +664,15 @@ static void recordOperatorStart( const auto inputs = fn.inputs(); // need to account for Stack mode where the inputs are at the end. size_t input_start = inputs.size() - num_inputs; - + // tensor_index is the index of the flattened tensor list for all input + // tensors + int tensor_index = 0; for (const auto i : c10::irange(input_start, inputs.size())) { appendValueInfo( ob, + tensor_index, + fc.tensor_index_min_max_map, + true, inputs[i], fc.inputShapes, fc.inputStrides, @@ -623,8 +687,8 @@ static void recordOperatorStart( fc.parentId = ob.opStack[tid].top(); // get parent id from the forward stack, this can be different for - // autograd ops, which may execute on a different thread than the original - // thread (which should have the parent op on the stack). + // autograd ops, which may execute on a different thread than the + // original thread (which should have the parent op on the stack). auto fw_tid = fn.forwardThreadId(); if (fw_tid != 0) { fc.fwParentId = ob.opStack[fw_tid].top(); @@ -706,9 +770,13 @@ static void onFunctionExit(const RecordFunction& fn, ObserverContext* ctx_ptr) { std::vector output_shapes; std::vector output_values; try { + int tensor_index = 0; for (const auto i : c10::irange(output_start, outputs.size())) { appendValueInfo( *ob, + tensor_index, + fc.tensor_index_min_max_map, + false, outputs.at(i), output_shapes, output_strides, @@ -752,6 +820,7 @@ static void onFunctionExit(const RecordFunction& fn, ObserverContext* ctx_ptr) { op_schema_str, fc.kernelBackend, fc.kernelFile, + fc.get_string_for_tensor_range(), additiona_attrs); ob->out << ","; } @@ -762,8 +831,8 @@ static void onFunctionExit(const RecordFunction& fn, ObserverContext* ctx_ptr) { } } -// Add execution trace observer callback functions to the RecordFunction global -// observers. +// Add execution trace observer callback functions to the RecordFunction +// global observers. bool addExecutionTraceObserver(const std::string& output_file_path) { // Check if the observer is already initialized. if (ObserverManager::get() == nullptr) { @@ -776,6 +845,13 @@ bool addExecutionTraceObserver(const std::string& output_file_path) { return false; } + // check if the environment variable is set to force recording integer + // tensors + auto env_variable = + getenv("ENABLE_PYTORCH_EXECUTION_TRACE_INTEGRAL_TENSOR_RANGE"); + if (env_variable != nullptr) { + ob.record_integral_tensor_range = true; + } ob.cbHandle = addGlobalCallback( RecordFunctionCallback(&onFunctionEnter, &onFunctionExit) .needsInputs(true)