Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Chakra Converter Handling of NCCL All-to-All, Logging Consistency, and HTA Dependency Issues #160

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
28 changes: 23 additions & 5 deletions src/converter/pytorch_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
COMM_RECV_NODE,
COMM_SEND_NODE,
COMP_NODE,
METADATA_NODE,
REDUCE_SCATTER,
GlobalMetadata,
)
Expand Down Expand Up @@ -338,6 +339,8 @@ def get_protobuf_node_type_from_json_node(
Returns:
int: The corresponding Chakra node type.
"""
if json_node.is_metadata_op():
return METADATA_NODE
if json_node.is_gpu_op():
if "ncclDevKernel_SendRecv" in json_node.name:
parent_node = json_node_map[json_node.parent]
Expand All @@ -346,10 +349,17 @@ def get_protobuf_node_type_from_json_node(
if parent_node.name == "record_param_comms"
else parent_node.name
)
if parent_node.name == "record_param_comms" and parent_node.pg_name != "":
json_node.pg_name = parent_node.pg_name
if "send" in keyword:
return COMM_SEND_NODE
if "recv" in keyword:
return COMM_RECV_NODE
# In NCCL, all-to-all communication is implemented using point-to-point
# communications. More details can be found here:
# https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/p2p.html
if "nccl:all_to_all" in keyword:
return COMM_COLL_NODE
if "ncclKernel" in json_node.name or "ncclDevKernel" in json_node.name:
return COMM_COLL_NODE
return COMP_NODE
Expand Down Expand Up @@ -379,6 +389,10 @@ def get_collective_comm_type(self, name: str) -> int:
for key in comm_type_mapping:
if key in normalized_name:
return comm_type_mapping[key]
# If both COMM_COLL_NAME and ncclDevKernel_SendRecv are present, this is nccl:all_to_all.
if "ncclDevKernel_SendRecv" in name:
return comm_type_mapping["alltoall"]

raise ValueError(
f"The name '{name}' does not correspond to a recognized collective communication type. "
"The converter determines collective communication types based on the node name of a GPU operator. "
Expand Down Expand Up @@ -460,11 +474,15 @@ def convert_ctrl_dep_to_data_dep(
if json_node.sync_dep:
for sync_dep in json_node.sync_dep:
if sync_dep not in current_node.data_deps:
current_node.data_deps.append(sync_dep)
logging.info(
f"Node ID {current_node.id} now has an synchonization dependency on Node ID {sync_dep}"
)

# Found a bug encoding false dependency HTA.
# Compare start_time to eliminate false sync dependency.
prior_node = protobuf_node_map.get(sync_dep)
if prior_node is not None and prior_node.start_time_micros < current_node.start_time_micros:
current_node.data_deps.append(sync_dep)
logging.debug(
f"Node ID {current_node.id} now has an synchonization dependency on Node ID "
f"{sync_dep}"
)
# Add children to the stack
children_chakra_ids = [child.id for child in json_node.children]
for child_chakra_id in sorted(children_chakra_ids, reverse=True):
Expand Down
13 changes: 11 additions & 2 deletions src/converter/pytorch_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class PyTorchNode:
pg_name (str): Process Group name for the inter-GPU communication.
"""

SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4"]
SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4", "1.1.1-chakra.0.0.4"]

def __init__(self, schema: str, node_data: Dict[str, Any]) -> None:
"""
Expand Down Expand Up @@ -86,7 +86,7 @@ def parse_data(self, node_data: Dict[str, Any]) -> None:
node_data (Dict[str, Any]): The node data to be parsed.
"""
if self.schema in self.SUPPORTED_VERSIONS:
if self.schema in ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4"]:
if self.schema in ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4", "1.1.1-chakra.0.0.4"]:
self._parse_data_1_0_3_chakra_0_0_4(node_data)
else:
raise ValueError(
Expand Down Expand Up @@ -137,6 +137,15 @@ def get_op_type(self) -> PyTorchNodeType:
else:
return PyTorchNodeType.LABEL

def is_metadata_op(self) -> bool:
"""
Check if the node is a METADATA operator.

Returns
bool: True if the node is a METADATA operator, False otherwise.
"""
return self.get_op_type() == PyTorchNodeType.METADATA

def is_cpu_op(self) -> bool:
"""
Check if the node is a CPU operator.
Expand Down
54 changes: 54 additions & 0 deletions src/feeder/et_feeder_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ ETFeederNode::ETFeederNode(std::shared_ptr<ChakraProtoMsg::Node> node) {
this->runtime_ = node->duration_micros();
this->is_cpu_op_ = 1;

if (node->has_inputs()) {
this->inputs_values_ = static_cast<string>(node->inputs().values());
this->inputs_shapes_ = static_cast<string>(node->inputs().shapes());
this->inputs_types_ = static_cast<string>(node->inputs().types());
}

if (node->has_outputs()) {
this->outputs_values_ = static_cast<string>(node->outputs().values());
this->outputs_shapes_ = static_cast<string>(node->outputs().shapes());
this->outputs_types_ = static_cast<string>(node->outputs().types());
}

for (const auto& attr : node->attr()) {
const string& attr_name = attr.name();

Expand Down Expand Up @@ -144,3 +156,45 @@ uint32_t ETFeederNode::comm_tag() {
string ETFeederNode::pg_name() {
return pg_name_;
}

string ETFeederNode::get_inputs_values() const {
if (node_->has_inputs()) {
return inputs_values_;
}
return "";
}

string ETFeederNode::get_inputs_shapes() const {
if (node_->has_inputs()) {
return inputs_shapes_;
}
return "";
}

string ETFeederNode::get_inputs_types() const {
if (node_->has_inputs()) {
return inputs_types_;
}
return "";
}

string ETFeederNode::get_outputs_values() const {
if (node_->has_outputs()) {
return outputs_values_;
}
return "";
}

string ETFeederNode::get_outputs_shapes() const {
if (node_->has_outputs()) {
return outputs_shapes_;
}
return "";
}

string ETFeederNode::get_outputs_types() const {
if (node_->has_outputs()) {
return outputs_types_;
}
return "";
}
12 changes: 12 additions & 0 deletions src/feeder/et_feeder_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class ETFeederNode {
uint32_t comm_dst();
uint32_t comm_tag();
std::string pg_name();
std::string get_inputs_values() const;
std::string get_inputs_shapes() const;
std::string get_inputs_types() const;
std::string get_outputs_values() const;
std::string get_outputs_shapes() const;
std::string get_outputs_types() const;

private:
void assign_attr_val(
Expand Down Expand Up @@ -67,6 +73,12 @@ class ETFeederNode {
uint32_t comm_dst_;
uint32_t comm_tag_;
std::string pg_name_;
std::string inputs_values_;
std::string inputs_shapes_;
std::string inputs_types_;
std::string outputs_values_;
std::string outputs_shapes_;
std::string outputs_types_;
};

} // namespace Chakra
89 changes: 41 additions & 48 deletions src/trace_link/trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,17 @@ def load_sync_dependencies(
absolute_kineto_file = os.path.abspath(kineto_file)
trace_dir = os.path.dirname(absolute_kineto_file)
trace_analysis = TraceAnalysis(trace_dir=trace_dir, trace_files={rank: kineto_file})
cp_graph, success = trace_analysis.critical_path_analysis(
rank=rank, annotation=annotation, instance_id=instance_id
)
if not success:
logging.error("Failed to load Critical Path Graph")
try:
cp_graph, success = trace_analysis.critical_path_analysis(
rank=rank, annotation=annotation, instance_id=instance_id
)
if not success:
logging.error("Critical path analysis completed but failed to load Critical Path Graph.")
return sync_dependencies

except ValueError as e:
logging.error("Critical path analysis encountered an invalid graph structure: %s", e)
# Optionally, you could log more details or include rank-specific information if relevant
return sync_dependencies

raw_events = trace_analysis.t.get_raw_trace_for_one_rank(rank=rank)["traceEvents"]
Expand Down Expand Up @@ -541,7 +547,7 @@ def map_host_to_device_ops(
]:
"""Map Chakra host operators to corresponding device operators."""
logging.debug("Mapping Charka host operators to corresponding device operators.")
cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers(
cpu_external_id_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers(
kineto_gpu_ops, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts
)

Expand Down Expand Up @@ -569,7 +575,7 @@ def map_host_to_device_ops(
) = self.link_ops(
host_op,
kineto_op,
cpu_ev_idx_to_gpu_ops_map,
cpu_external_id_to_gpu_ops_map,
kineto_rf_id_to_device_op_map,
kineto_external_id_to_kineto_op_map,
)
Expand All @@ -593,7 +599,7 @@ def group_gpu_ops_by_cpu_launchers(
"""
Group GPU operators based on their corresponding CPU launchers.

This is determined by the 'ev_idx' which links GPU operators to their initiating CPU launcher events.
This is determined by the 'external_id' which links GPU operators to their initiating CPU launcher events.

Args:
kineto_gpu_ops (List[KinetoOperator]): List of Kineto GPU operators.
Expand All @@ -607,9 +613,9 @@ def group_gpu_ops_by_cpu_launchers(
Dict[int, List[KinetoOperator]]: Mapping from CPU launch event indices to GPU operators.

Raises:
ValueError: If 'ev_idx' is missing for any GPU operator.
ValueError: If 'external_id' is missing for any GPU operator.
"""
cpu_ev_idx_to_gpu_ops_map = {}
cpu_external_id_to_gpu_ops_map = {}
for gpu_op in kineto_gpu_ops:
parent_cpu_op = self.find_parent_cpu_op(
gpu_op, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts
Expand All @@ -619,19 +625,19 @@ def group_gpu_ops_by_cpu_launchers(
logging.warning(warning_msg)
continue

if parent_cpu_op.ev_idx == "":
if parent_cpu_op.external_id == "":
error_msg = (
f"Missing 'ev_idx' for CPU operator {parent_cpu_op.name}. "
f"Missing 'external_id' for CPU operator {parent_cpu_op.name}. "
f"Cannot link GPU op {gpu_op.name} to {parent_cpu_op.name}."
)
logging.warning(error_msg)
continue

logging.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'")

cpu_ev_idx_to_gpu_ops_map.setdefault(parent_cpu_op.ev_idx, []).append(gpu_op)
cpu_external_id_to_gpu_ops_map.setdefault(parent_cpu_op.external_id, []).append(gpu_op)

return cpu_ev_idx_to_gpu_ops_map
return cpu_external_id_to_gpu_ops_map

def find_parent_cpu_op(
self,
Expand Down Expand Up @@ -713,48 +719,35 @@ def find_closest_op(
Returns:
Optional[KinetoOperator]: The closest Kineto operator if found.
"""
# Searching for the closest timestamp index
# Step 1: Find the initial closest index
index = bisect.bisect_left(sorted_kineto_cpu_op_ts, ts)

if index == 0:
# All operators are later than the timestamp
return None
else:
# The operator immediately before the index is the closest one before the timestamp
closest_op = sorted_kineto_cpu_ops[index - 1]

# Check for NCCL specifics: if it's an NCCL operation and 'nccl:coalesced' should be skipped
if "nccl" in kineto_gpu_op.name.lower() and closest_op.name == "nccl:coalesced":
# Move back to find a non-'nccl:coalesced' operator, if available
for new_index in range(index - 2, -1, -1):
potential_op = sorted_kineto_cpu_ops[new_index]
if potential_op.tid == kineto_gpu_op.tid and potential_op.name != "nccl:coalesced":
return potential_op
# If no valid alternative found before 'nccl:coalesced', continue search forward
index = index - 1 # Adjust index to skip 'nccl:coalesced'

# After skipping 'nccl:coalesced', verify that the closest operation is on the same thread
# as the GPU operation
if closest_op.tid == kineto_gpu_op.tid:
return closest_op

# If the tids do not match, search forward to find the closest matching tid
for i in range(index - 1, -1, -1):
op = sorted_kineto_cpu_ops[i]
if op.tid == kineto_gpu_op.tid:
if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced":
continue # Skip 'nccl:coalesced' if it's an NCCL-related GPU operation
if op.timestamp <= ts:
return op

# If no matching tid is found going forward, return None
return None

# Step 2: Find the closest operator
tid_only_match = None # Track the best operator with matching tid
for i in range(index - 1, -1, -1):
op = sorted_kineto_cpu_ops[i]
# Skip 'nccl:coalesced' for NCCL-related GPU operations
if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced":
continue
# Return the operator matching both tid and external_id
if op.tid == kineto_gpu_op.tid and op.external_id == kineto_gpu_op.external_id:
return op
# Track the tid_only_match operator with matching tid if no full match is found
if tid_only_match is None and op.tid == kineto_gpu_op.tid:
tid_only_match = op

# Step 3: Return the best match or None if no match is found
return tid_only_match

def link_ops(
self,
host_op: PyTorchOperator,
kineto_op: KinetoOperator,
cpu_ev_idx_to_gpu_ops_map: Dict[int, List[KinetoOperator]],
cpu_external_id_to_gpu_ops_map: Dict[int, List[KinetoOperator]],
kineto_rf_id_to_device_op_map: Dict[int, KinetoOperator],
kineto_external_id_to_kineto_op_map: Dict[int, KinetoOperator],
) -> Tuple[List[KinetoOperator], int, int, int, Optional[int]]:
Expand All @@ -764,7 +757,7 @@ def link_ops(
Args:
host_op (PyTorchOperator): Chakra host operator to link.
kineto_op (KinetoOperator): Corresponding Kineto operator.
cpu_ev_idx_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping.
cpu_external_id_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping.
kineto_rf_id_to_device_op_map (Dict[int, KinetoOperator]): Kineto operator mapping.
kineto_external_id_to_kineto_op_map (Dict[int, KinetoOperator]): Mapping from external id to
KinetoOperators.
Expand All @@ -779,7 +772,7 @@ def link_ops(
- List[int]: List of synchronization dependency IDs.
"""
kineto_op.host_op = host_op
linked_gpu_ops = cpu_ev_idx_to_gpu_ops_map.get(kineto_op.ev_idx, [])
linked_gpu_ops = cpu_external_id_to_gpu_ops_map.get(kineto_op.external_id, [])
inclusive_dur = kineto_op.inclusive_dur
exclusive_dur = kineto_op.exclusive_dur
timestamp = kineto_op.timestamp
Expand Down
11 changes: 7 additions & 4 deletions tests/converter/test_pytorch_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
BROADCAST,
COMM_COLL_NODE,
COMP_NODE,
METADATA_NODE,
REDUCE_SCATTER,
)
from chakra.schema.protobuf.et_def_pb2 import Node as ChakraNode
Expand Down Expand Up @@ -167,17 +168,19 @@ def test_write_chakra_et(mock_file: MagicMock, sample_pytorch_data: Dict) -> Non
@pytest.mark.parametrize(
"pytorch_node_data, expected_type",
[
({"name": "ncclKernel", "is_gpu_op": True}, COMM_COLL_NODE),
({"name": "ncclDevKernel", "is_gpu_op": True}, COMM_COLL_NODE),
({"name": "c10d::all_reduce", "is_gpu_op": True}, COMP_NODE),
({"name": "other_op", "is_gpu_op": False}, COMP_NODE),
({"name": "process_group:init", "is_gpu_op": False, "is_metadata_op": True}, METADATA_NODE),
({"name": "ncclKernel", "is_gpu_op": True, "is_metadata_op": False}, COMM_COLL_NODE),
({"name": "ncclDevKernel", "is_gpu_op": True, "is_metadata_op": False}, COMM_COLL_NODE),
({"name": "c10d::all_reduce", "is_gpu_op": True, "is_metadata_op": False}, COMP_NODE),
({"name": "other_op", "is_gpu_op": False, "is_metadata_op": False}, COMP_NODE),
],
)
def test_get_protobuf_node_type_from_json_node(pytorch_node_data: Dict, expected_type: int) -> None:
# Create a mock PyTorchNode with the required attributes
pytorch_node = MagicMock(spec=PyTorchNode)
pytorch_node.name = pytorch_node_data["name"]
pytorch_node.is_gpu_op = MagicMock(return_value=pytorch_node_data["is_gpu_op"])
pytorch_node.is_metadata_op = MagicMock(return_value=pytorch_node_data["is_metadata_op"])

# Create a mock json_node_map dictionary with actual PyTorchNode instances
mock_pytorch_node_data = {
Expand Down
Loading