From 4a1cb98647a41427ec68097a2d76284eb8cd094e Mon Sep 17 00:00:00 2001 From: Joongun Park <8554137+JoongunPark@users.noreply.github.com> Date: Tue, 8 Oct 2024 17:48:08 -0400 Subject: [PATCH 1/7] Fix mishandling All-to-All communication --- src/converter/pytorch_converter.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index ea383a5..01498b1 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -350,6 +350,11 @@ def get_protobuf_node_type_from_json_node( return COMM_SEND_NODE if "recv" in keyword: return COMM_RECV_NODE + # In NCCL, all-to-all communication is implemented using point-to-point + # communications. More details can be found here: + # https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/p2p.html + if "nccl:all_to_all" in keyword: + return COMM_COLL_NODE if "ncclKernel" in json_node.name or "ncclDevKernel" in json_node.name: return COMM_COLL_NODE return COMP_NODE @@ -379,6 +384,10 @@ def get_collective_comm_type(self, name: str) -> int: for key in comm_type_mapping: if key in normalized_name: return comm_type_mapping[key] + # If both COMM_COLL_NAME and ncclDevKernel_SendRecv are present, this is nccl:all_to_all. + if "ncclDevKernel_SendRecv" in name: + return comm_type_mapping["alltoall"] + raise ValueError( f"The name '{name}' does not correspond to a recognized collective communication type. " "The converter determines collective communication types based on the node name of a GPU operator. " From 087adff34b24e48728cd872abbf8f80f4eae6ae7 Mon Sep 17 00:00:00 2001 From: Joongun Park <8554137+JoongunPark@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:15:42 -0400 Subject: [PATCH 2/7] Update logging.info to logging.debug to make it consistent --- src/converter/pytorch_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index 01498b1..400d94a 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -470,7 +470,7 @@ def convert_ctrl_dep_to_data_dep( for sync_dep in json_node.sync_dep: if sync_dep not in current_node.data_deps: current_node.data_deps.append(sync_dep) - logging.info( + logging.debug( f"Node ID {current_node.id} now has an synchonization dependency on Node ID {sync_dep}" ) From a8692299bfb31ec5387c57081a3b256ee631f61b Mon Sep 17 00:00:00 2001 From: Joongun Park <8554137+JoongunPark@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:16:23 -0400 Subject: [PATCH 3/7] Eliminate false positive sync dependency --- src/converter/pytorch_converter.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index 400d94a..b9ade2b 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -469,11 +469,15 @@ def convert_ctrl_dep_to_data_dep( if json_node.sync_dep: for sync_dep in json_node.sync_dep: if sync_dep not in current_node.data_deps: - current_node.data_deps.append(sync_dep) - logging.debug( - f"Node ID {current_node.id} now has an synchonization dependency on Node ID {sync_dep}" - ) - + # Found a bug encoding false dependency HTA. + # Compare start_time to eliminate false sync dependency. + prior_node = protobuf_node_map.get(sync_dep) + if prior_node is not None and prior_node.start_time_micros < current_node.start_time_micros: + current_node.data_deps.append(sync_dep) + logging.debug( + f"Node ID {current_node.id} now has an synchonization dependency on Node ID " + f"{sync_dep}" + ) # Add children to the stack children_chakra_ids = [child.id for child in json_node.children] for child_chakra_id in sorted(children_chakra_ids, reverse=True): From 2343ab9934d415fed1ecac85e1211642716aa156 Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Thu, 14 Nov 2024 01:23:07 -0500 Subject: [PATCH 4/7] Get pg_name from record_param_comms for collectives --- src/converter/pytorch_converter.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index b9ade2b..48f307c 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -346,6 +346,8 @@ def get_protobuf_node_type_from_json_node( if parent_node.name == "record_param_comms" else parent_node.name ) + if parent_node.name == "record_param_comms" and parent_node.pg_name != "": + json_node.pg_name = parent_node.pg_name if "send" in keyword: return COMM_SEND_NODE if "recv" in keyword: From 3fdb977fbb4365a398567dcece056b88028e057c Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Thu, 14 Nov 2024 01:25:34 -0500 Subject: [PATCH 5/7] Update trace_linker to use external_id for finding GPU op's parent CPU op --- src/trace_link/trace_linker.py | 73 +++++++++++---------------- tests/trace_link/test_trace_linker.py | 18 +++---- 2 files changed, 39 insertions(+), 52 deletions(-) diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index 1123e45..2f423a9 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -541,7 +541,7 @@ def map_host_to_device_ops( ]: """Map Chakra host operators to corresponding device operators.""" logging.debug("Mapping Charka host operators to corresponding device operators.") - cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers( + cpu_external_id_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers( kineto_gpu_ops, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts ) @@ -569,7 +569,7 @@ def map_host_to_device_ops( ) = self.link_ops( host_op, kineto_op, - cpu_ev_idx_to_gpu_ops_map, + cpu_external_id_to_gpu_ops_map, kineto_rf_id_to_device_op_map, kineto_external_id_to_kineto_op_map, ) @@ -593,7 +593,7 @@ def group_gpu_ops_by_cpu_launchers( """ Group GPU operators based on their corresponding CPU launchers. - This is determined by the 'ev_idx' which links GPU operators to their initiating CPU launcher events. + This is determined by the 'external_id' which links GPU operators to their initiating CPU launcher events. Args: kineto_gpu_ops (List[KinetoOperator]): List of Kineto GPU operators. @@ -607,9 +607,9 @@ def group_gpu_ops_by_cpu_launchers( Dict[int, List[KinetoOperator]]: Mapping from CPU launch event indices to GPU operators. Raises: - ValueError: If 'ev_idx' is missing for any GPU operator. + ValueError: If 'external_id' is missing for any GPU operator. """ - cpu_ev_idx_to_gpu_ops_map = {} + cpu_external_id_to_gpu_ops_map = {} for gpu_op in kineto_gpu_ops: parent_cpu_op = self.find_parent_cpu_op( gpu_op, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts @@ -619,9 +619,9 @@ def group_gpu_ops_by_cpu_launchers( logging.warning(warning_msg) continue - if parent_cpu_op.ev_idx == "": + if parent_cpu_op.external_id == "": error_msg = ( - f"Missing 'ev_idx' for CPU operator {parent_cpu_op.name}. " + f"Missing 'external_id' for CPU operator {parent_cpu_op.name}. " f"Cannot link GPU op {gpu_op.name} to {parent_cpu_op.name}." ) logging.warning(error_msg) @@ -629,9 +629,9 @@ def group_gpu_ops_by_cpu_launchers( logging.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'") - cpu_ev_idx_to_gpu_ops_map.setdefault(parent_cpu_op.ev_idx, []).append(gpu_op) + cpu_external_id_to_gpu_ops_map.setdefault(parent_cpu_op.external_id, []).append(gpu_op) - return cpu_ev_idx_to_gpu_ops_map + return cpu_external_id_to_gpu_ops_map def find_parent_cpu_op( self, @@ -713,48 +713,35 @@ def find_closest_op( Returns: Optional[KinetoOperator]: The closest Kineto operator if found. """ - # Searching for the closest timestamp index + # Step 1: Find the initial closest index index = bisect.bisect_left(sorted_kineto_cpu_op_ts, ts) if index == 0: # All operators are later than the timestamp return None - else: - # The operator immediately before the index is the closest one before the timestamp - closest_op = sorted_kineto_cpu_ops[index - 1] - - # Check for NCCL specifics: if it's an NCCL operation and 'nccl:coalesced' should be skipped - if "nccl" in kineto_gpu_op.name.lower() and closest_op.name == "nccl:coalesced": - # Move back to find a non-'nccl:coalesced' operator, if available - for new_index in range(index - 2, -1, -1): - potential_op = sorted_kineto_cpu_ops[new_index] - if potential_op.tid == kineto_gpu_op.tid and potential_op.name != "nccl:coalesced": - return potential_op - # If no valid alternative found before 'nccl:coalesced', continue search forward - index = index - 1 # Adjust index to skip 'nccl:coalesced' - - # After skipping 'nccl:coalesced', verify that the closest operation is on the same thread - # as the GPU operation - if closest_op.tid == kineto_gpu_op.tid: - return closest_op - - # If the tids do not match, search forward to find the closest matching tid - for i in range(index - 1, -1, -1): - op = sorted_kineto_cpu_ops[i] - if op.tid == kineto_gpu_op.tid: - if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced": - continue # Skip 'nccl:coalesced' if it's an NCCL-related GPU operation - if op.timestamp <= ts: - return op - - # If no matching tid is found going forward, return None - return None + + # Step 2: Find the closest operator + tid_only_match = None # Track the best operator with matching tid + for i in range(index - 1, -1, -1): + op = sorted_kineto_cpu_ops[i] + # Skip 'nccl:coalesced' for NCCL-related GPU operations + if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced": + continue + # Return the operator matching both tid and external_id + if op.tid == kineto_gpu_op.tid and op.external_id == kineto_gpu_op.external_id: + return op + # Track the tid_only_match operator with matching tid if no full match is found + if tid_only_match is None and op.tid == kineto_gpu_op.tid: + tid_only_match = op + + # Step 3: Return the best match or None if no match is found + return tid_only_match def link_ops( self, host_op: PyTorchOperator, kineto_op: KinetoOperator, - cpu_ev_idx_to_gpu_ops_map: Dict[int, List[KinetoOperator]], + cpu_external_id_to_gpu_ops_map: Dict[int, List[KinetoOperator]], kineto_rf_id_to_device_op_map: Dict[int, KinetoOperator], kineto_external_id_to_kineto_op_map: Dict[int, KinetoOperator], ) -> Tuple[List[KinetoOperator], int, int, int, Optional[int]]: @@ -764,7 +751,7 @@ def link_ops( Args: host_op (PyTorchOperator): Chakra host operator to link. kineto_op (KinetoOperator): Corresponding Kineto operator. - cpu_ev_idx_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping. + cpu_external_id_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping. kineto_rf_id_to_device_op_map (Dict[int, KinetoOperator]): Kineto operator mapping. kineto_external_id_to_kineto_op_map (Dict[int, KinetoOperator]): Mapping from external id to KinetoOperators. @@ -779,7 +766,7 @@ def link_ops( - List[int]: List of synchronization dependency IDs. """ kineto_op.host_op = host_op - linked_gpu_ops = cpu_ev_idx_to_gpu_ops_map.get(kineto_op.ev_idx, []) + linked_gpu_ops = cpu_external_id_to_gpu_ops_map.get(kineto_op.external_id, []) inclusive_dur = kineto_op.inclusive_dur exclusive_dur = kineto_op.exclusive_dur timestamp = kineto_op.timestamp diff --git a/tests/trace_link/test_trace_linker.py b/tests/trace_link/test_trace_linker.py index a0441ae..8430867 100644 --- a/tests/trace_link/test_trace_linker.py +++ b/tests/trace_link/test_trace_linker.py @@ -381,14 +381,14 @@ def test_group_gpu_ops_by_cpu_launchers(trace_linker): kineto_gpu_op2.tid = 2 kineto_runtime_op1 = MagicMock(spec=KinetoOperator) - kineto_runtime_op1.ev_idx = "cpu_op1" + kineto_runtime_op1.external_id = "cpu_op1" kineto_runtime_op1.timestamp = 100 kineto_runtime_op1.tid = 1 kineto_runtime_op1.name = "runtime_op1" kineto_runtime_op1.correlation = 123 kineto_runtime_op2 = MagicMock(spec=KinetoOperator) - kineto_runtime_op2.ev_idx = "cpu_op2" + kineto_runtime_op2.external_id = "cpu_op2" kineto_runtime_op2.timestamp = 200 kineto_runtime_op2.tid = 2 kineto_runtime_op2.name = "runtime_op2" @@ -445,7 +445,7 @@ def test_find_parent_cpu_op(mock_find_closest_op, trace_linker): MagicMock(spec=PyTorchOperator, id=1), MagicMock( spec=KinetoOperator, - ev_idx="1", + external_id="1", inclusive_dur=100, exclusive_dur=50, timestamp=123456, @@ -461,7 +461,7 @@ def test_find_parent_cpu_op(mock_find_closest_op, trace_linker): MagicMock(spec=PyTorchOperator, id=2), MagicMock( spec=KinetoOperator, - ev_idx="2", + external_id="2", inclusive_dur=200, exclusive_dur=150, timestamp=223456, @@ -491,7 +491,7 @@ def test_link_ops( ): mock_get_inter_thread_dep.return_value = expected_inter_thread_dep - cpu_ev_idx_to_gpu_ops_map = {kineto_op.ev_idx: expected_linked_gpu_ops} + cpu_external_id_to_gpu_ops_map = {kineto_op.external_id: expected_linked_gpu_ops} kineto_rf_id_to_kineto_op_map = {1: MagicMock(spec=KinetoOperator, host_op=MagicMock(id=42))} kineto_external_id_to_kineto_op_map = { 2: MagicMock(spec=KinetoOperator, host_op=MagicMock(id=3)), @@ -501,7 +501,7 @@ def test_link_ops( result = trace_linker.link_ops( host_op, kineto_op, - cpu_ev_idx_to_gpu_ops_map, + cpu_external_id_to_gpu_ops_map, kineto_rf_id_to_kineto_op_map, kineto_external_id_to_kineto_op_map, ) @@ -520,7 +520,7 @@ def test_link_ops_with_no_gpu_ops(trace_linker): host_op = MagicMock(spec=PyTorchOperator, id=1) kineto_op = MagicMock( spec=KinetoOperator, - ev_idx="1", + external_id="1", inclusive_dur=100, exclusive_dur=50, timestamp=123456, @@ -529,14 +529,14 @@ def test_link_ops_with_no_gpu_ops(trace_linker): sync_dep=[], ) - cpu_ev_idx_to_gpu_ops_map = {} + cpu_external_id_to_gpu_ops_map = {} kineto_rf_id_to_kineto_op_map = {} kineto_external_id_to_kineto_op_map = {} result = trace_linker.link_ops( host_op, kineto_op, - cpu_ev_idx_to_gpu_ops_map, + cpu_external_id_to_gpu_ops_map, kineto_rf_id_to_kineto_op_map, kineto_external_id_to_kineto_op_map, ) From 31b25d4e1192d1cd754e7533d538018391d23783 Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Fri, 15 Nov 2024 23:34:56 -0500 Subject: [PATCH 6/7] Handling HTA Errors in Chakra --- src/trace_link/trace_linker.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index 2f423a9..27827c7 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -118,11 +118,17 @@ def load_sync_dependencies( absolute_kineto_file = os.path.abspath(kineto_file) trace_dir = os.path.dirname(absolute_kineto_file) trace_analysis = TraceAnalysis(trace_dir=trace_dir, trace_files={rank: kineto_file}) - cp_graph, success = trace_analysis.critical_path_analysis( - rank=rank, annotation=annotation, instance_id=instance_id - ) - if not success: - logging.error("Failed to load Critical Path Graph") + try: + cp_graph, success = trace_analysis.critical_path_analysis( + rank=rank, annotation=annotation, instance_id=instance_id + ) + if not success: + logging.error("Critical path analysis completed but failed to load Critical Path Graph.") + return sync_dependencies + + except ValueError as e: + logging.error("Critical path analysis encountered an invalid graph structure: %s", e) + # Optionally, you could log more details or include rank-specific information if relevant return sync_dependencies raw_events = trace_analysis.t.get_raw_trace_for_one_rank(rank=rank)["traceEvents"] From a01ce81b73f0e276eb67ae719227c891f73c5df3 Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Sun, 1 Dec 2024 16:16:12 -0500 Subject: [PATCH 7/7] Fix error encoding METADATA node --- src/converter/pytorch_converter.py | 3 +++ src/converter/pytorch_node.py | 9 +++++++++ tests/converter/test_pytorch_converter.py | 11 +++++++---- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index 48f307c..b5ace29 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -11,6 +11,7 @@ COMM_RECV_NODE, COMM_SEND_NODE, COMP_NODE, + METADATA_NODE, REDUCE_SCATTER, GlobalMetadata, ) @@ -338,6 +339,8 @@ def get_protobuf_node_type_from_json_node( Returns: int: The corresponding Chakra node type. """ + if json_node.is_metadata_op(): + return METADATA_NODE if json_node.is_gpu_op(): if "ncclDevKernel_SendRecv" in json_node.name: parent_node = json_node_map[json_node.parent] diff --git a/src/converter/pytorch_node.py b/src/converter/pytorch_node.py index 86b59ac..0729b6b 100644 --- a/src/converter/pytorch_node.py +++ b/src/converter/pytorch_node.py @@ -137,6 +137,15 @@ def get_op_type(self) -> PyTorchNodeType: else: return PyTorchNodeType.LABEL + def is_metadata_op(self) -> bool: + """ + Check if the node is a METADATA operator. + + Returns + bool: True if the node is a METADATA operator, False otherwise. + """ + return self.get_op_type() == PyTorchNodeType.METADATA + def is_cpu_op(self) -> bool: """ Check if the node is a CPU operator. diff --git a/tests/converter/test_pytorch_converter.py b/tests/converter/test_pytorch_converter.py index 88f2abf..c01c756 100644 --- a/tests/converter/test_pytorch_converter.py +++ b/tests/converter/test_pytorch_converter.py @@ -10,6 +10,7 @@ BROADCAST, COMM_COLL_NODE, COMP_NODE, + METADATA_NODE, REDUCE_SCATTER, ) from chakra.schema.protobuf.et_def_pb2 import Node as ChakraNode @@ -167,10 +168,11 @@ def test_write_chakra_et(mock_file: MagicMock, sample_pytorch_data: Dict) -> Non @pytest.mark.parametrize( "pytorch_node_data, expected_type", [ - ({"name": "ncclKernel", "is_gpu_op": True}, COMM_COLL_NODE), - ({"name": "ncclDevKernel", "is_gpu_op": True}, COMM_COLL_NODE), - ({"name": "c10d::all_reduce", "is_gpu_op": True}, COMP_NODE), - ({"name": "other_op", "is_gpu_op": False}, COMP_NODE), + ({"name": "process_group:init", "is_gpu_op": False, "is_metadata_op": True}, METADATA_NODE), + ({"name": "ncclKernel", "is_gpu_op": True, "is_metadata_op": False}, COMM_COLL_NODE), + ({"name": "ncclDevKernel", "is_gpu_op": True, "is_metadata_op": False}, COMM_COLL_NODE), + ({"name": "c10d::all_reduce", "is_gpu_op": True, "is_metadata_op": False}, COMP_NODE), + ({"name": "other_op", "is_gpu_op": False, "is_metadata_op": False}, COMP_NODE), ], ) def test_get_protobuf_node_type_from_json_node(pytorch_node_data: Dict, expected_type: int) -> None: @@ -178,6 +180,7 @@ def test_get_protobuf_node_type_from_json_node(pytorch_node_data: Dict, expected pytorch_node = MagicMock(spec=PyTorchNode) pytorch_node.name = pytorch_node_data["name"] pytorch_node.is_gpu_op = MagicMock(return_value=pytorch_node_data["is_gpu_op"]) + pytorch_node.is_metadata_op = MagicMock(return_value=pytorch_node_data["is_metadata_op"]) # Create a mock json_node_map dictionary with actual PyTorchNode instances mock_pytorch_node_data = {