Skip to content

Commit

Permalink
Update trace_linker to use external_id for finding GPU op's parent CP…
Browse files Browse the repository at this point in the history
…U op
  • Loading branch information
JoongunPark committed Nov 16, 2024
1 parent 9baffed commit 474b99f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 52 deletions.
73 changes: 30 additions & 43 deletions src/trace_link/trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def map_host_to_device_ops(
]:
"""Map Chakra host operators to corresponding device operators."""
logging.debug("Mapping Charka host operators to corresponding device operators.")
cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers(
cpu_external_id_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers(
kineto_gpu_ops, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts
)

Expand Down Expand Up @@ -569,7 +569,7 @@ def map_host_to_device_ops(
) = self.link_ops(
host_op,
kineto_op,
cpu_ev_idx_to_gpu_ops_map,
cpu_external_id_to_gpu_ops_map,
kineto_rf_id_to_device_op_map,
kineto_external_id_to_kineto_op_map,
)
Expand All @@ -593,7 +593,7 @@ def group_gpu_ops_by_cpu_launchers(
"""
Group GPU operators based on their corresponding CPU launchers.
This is determined by the 'ev_idx' which links GPU operators to their initiating CPU launcher events.
This is determined by the 'external_id' which links GPU operators to their initiating CPU launcher events.
Args:
kineto_gpu_ops (List[KinetoOperator]): List of Kineto GPU operators.
Expand All @@ -607,9 +607,9 @@ def group_gpu_ops_by_cpu_launchers(
Dict[int, List[KinetoOperator]]: Mapping from CPU launch event indices to GPU operators.
Raises:
ValueError: If 'ev_idx' is missing for any GPU operator.
ValueError: If 'external_id' is missing for any GPU operator.
"""
cpu_ev_idx_to_gpu_ops_map = {}
cpu_external_id_to_gpu_ops_map = {}
for gpu_op in kineto_gpu_ops:
parent_cpu_op = self.find_parent_cpu_op(
gpu_op, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts
Expand All @@ -619,19 +619,19 @@ def group_gpu_ops_by_cpu_launchers(
logging.warning(warning_msg)
continue

if parent_cpu_op.ev_idx == "":
if parent_cpu_op.external_id == "":
error_msg = (
f"Missing 'ev_idx' for CPU operator {parent_cpu_op.name}. "
f"Missing 'external_id' for CPU operator {parent_cpu_op.name}. "
f"Cannot link GPU op {gpu_op.name} to {parent_cpu_op.name}."
)
logging.warning(error_msg)
continue

logging.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'")

cpu_ev_idx_to_gpu_ops_map.setdefault(parent_cpu_op.ev_idx, []).append(gpu_op)
cpu_external_id_to_gpu_ops_map.setdefault(parent_cpu_op.external_id, []).append(gpu_op)

return cpu_ev_idx_to_gpu_ops_map
return cpu_external_id_to_gpu_ops_map

def find_parent_cpu_op(
self,
Expand Down Expand Up @@ -713,48 +713,35 @@ def find_closest_op(
Returns:
Optional[KinetoOperator]: The closest Kineto operator if found.
"""
# Searching for the closest timestamp index
# Step 1: Find the initial closest index
index = bisect.bisect_left(sorted_kineto_cpu_op_ts, ts)

if index == 0:
# All operators are later than the timestamp
return None
else:
# The operator immediately before the index is the closest one before the timestamp
closest_op = sorted_kineto_cpu_ops[index - 1]

# Check for NCCL specifics: if it's an NCCL operation and 'nccl:coalesced' should be skipped
if "nccl" in kineto_gpu_op.name.lower() and closest_op.name == "nccl:coalesced":
# Move back to find a non-'nccl:coalesced' operator, if available
for new_index in range(index - 2, -1, -1):
potential_op = sorted_kineto_cpu_ops[new_index]
if potential_op.tid == kineto_gpu_op.tid and potential_op.name != "nccl:coalesced":
return potential_op
# If no valid alternative found before 'nccl:coalesced', continue search forward
index = index - 1 # Adjust index to skip 'nccl:coalesced'

# After skipping 'nccl:coalesced', verify that the closest operation is on the same thread
# as the GPU operation
if closest_op.tid == kineto_gpu_op.tid:
return closest_op

# If the tids do not match, search forward to find the closest matching tid
for i in range(index - 1, -1, -1):
op = sorted_kineto_cpu_ops[i]
if op.tid == kineto_gpu_op.tid:
if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced":
continue # Skip 'nccl:coalesced' if it's an NCCL-related GPU operation
if op.timestamp <= ts:
return op

# If no matching tid is found going forward, return None
return None

# Step 2: Find the closest operator
tid_only_match = None # Track the best operator with matching tid
for i in range(index - 1, -1, -1):
op = sorted_kineto_cpu_ops[i]
# Skip 'nccl:coalesced' for NCCL-related GPU operations
if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced":
continue
# Return the operator matching both tid and external_id
if op.tid == kineto_gpu_op.tid and op.external_id == kineto_gpu_op.external_id:
return op
# Track the tid_only_match operator with matching tid if no full match is found
if tid_only_match is None and op.tid == kineto_gpu_op.tid:
tid_only_match = op

# Step 3: Return the best match or None if no match is found
return tid_only_match

def link_ops(
self,
host_op: PyTorchOperator,
kineto_op: KinetoOperator,
cpu_ev_idx_to_gpu_ops_map: Dict[int, List[KinetoOperator]],
cpu_external_id_to_gpu_ops_map: Dict[int, List[KinetoOperator]],
kineto_rf_id_to_device_op_map: Dict[int, KinetoOperator],
kineto_external_id_to_kineto_op_map: Dict[int, KinetoOperator],
) -> Tuple[List[KinetoOperator], int, int, int, Optional[int]]:
Expand All @@ -764,7 +751,7 @@ def link_ops(
Args:
host_op (PyTorchOperator): Chakra host operator to link.
kineto_op (KinetoOperator): Corresponding Kineto operator.
cpu_ev_idx_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping.
cpu_external_id_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping.
kineto_rf_id_to_device_op_map (Dict[int, KinetoOperator]): Kineto operator mapping.
kineto_external_id_to_kineto_op_map (Dict[int, KinetoOperator]): Mapping from external id to
KinetoOperators.
Expand All @@ -779,7 +766,7 @@ def link_ops(
- List[int]: List of synchronization dependency IDs.
"""
kineto_op.host_op = host_op
linked_gpu_ops = cpu_ev_idx_to_gpu_ops_map.get(kineto_op.ev_idx, [])
linked_gpu_ops = cpu_external_id_to_gpu_ops_map.get(kineto_op.external_id, [])
inclusive_dur = kineto_op.inclusive_dur
exclusive_dur = kineto_op.exclusive_dur
timestamp = kineto_op.timestamp
Expand Down
18 changes: 9 additions & 9 deletions tests/trace_link/test_trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,14 @@ def test_group_gpu_ops_by_cpu_launchers(trace_linker):
kineto_gpu_op2.tid = 2

kineto_runtime_op1 = MagicMock(spec=KinetoOperator)
kineto_runtime_op1.ev_idx = "cpu_op1"
kineto_runtime_op1.external_id = "cpu_op1"
kineto_runtime_op1.timestamp = 100
kineto_runtime_op1.tid = 1
kineto_runtime_op1.name = "runtime_op1"
kineto_runtime_op1.correlation = 123

kineto_runtime_op2 = MagicMock(spec=KinetoOperator)
kineto_runtime_op2.ev_idx = "cpu_op2"
kineto_runtime_op2.external_id = "cpu_op2"
kineto_runtime_op2.timestamp = 200
kineto_runtime_op2.tid = 2
kineto_runtime_op2.name = "runtime_op2"
Expand Down Expand Up @@ -445,7 +445,7 @@ def test_find_parent_cpu_op(mock_find_closest_op, trace_linker):
MagicMock(spec=PyTorchOperator, id=1),
MagicMock(
spec=KinetoOperator,
ev_idx="1",
external_id="1",
inclusive_dur=100,
exclusive_dur=50,
timestamp=123456,
Expand All @@ -461,7 +461,7 @@ def test_find_parent_cpu_op(mock_find_closest_op, trace_linker):
MagicMock(spec=PyTorchOperator, id=2),
MagicMock(
spec=KinetoOperator,
ev_idx="2",
external_id="2",
inclusive_dur=200,
exclusive_dur=150,
timestamp=223456,
Expand Down Expand Up @@ -491,7 +491,7 @@ def test_link_ops(
):
mock_get_inter_thread_dep.return_value = expected_inter_thread_dep

cpu_ev_idx_to_gpu_ops_map = {kineto_op.ev_idx: expected_linked_gpu_ops}
cpu_external_id_to_gpu_ops_map = {kineto_op.external_id: expected_linked_gpu_ops}
kineto_rf_id_to_kineto_op_map = {1: MagicMock(spec=KinetoOperator, host_op=MagicMock(id=42))}
kineto_external_id_to_kineto_op_map = {
2: MagicMock(spec=KinetoOperator, host_op=MagicMock(id=3)),
Expand All @@ -501,7 +501,7 @@ def test_link_ops(
result = trace_linker.link_ops(
host_op,
kineto_op,
cpu_ev_idx_to_gpu_ops_map,
cpu_external_id_to_gpu_ops_map,
kineto_rf_id_to_kineto_op_map,
kineto_external_id_to_kineto_op_map,
)
Expand All @@ -520,7 +520,7 @@ def test_link_ops_with_no_gpu_ops(trace_linker):
host_op = MagicMock(spec=PyTorchOperator, id=1)
kineto_op = MagicMock(
spec=KinetoOperator,
ev_idx="1",
external_id="1",
inclusive_dur=100,
exclusive_dur=50,
timestamp=123456,
Expand All @@ -529,14 +529,14 @@ def test_link_ops_with_no_gpu_ops(trace_linker):
sync_dep=[],
)

cpu_ev_idx_to_gpu_ops_map = {}
cpu_external_id_to_gpu_ops_map = {}
kineto_rf_id_to_kineto_op_map = {}
kineto_external_id_to_kineto_op_map = {}

result = trace_linker.link_ops(
host_op,
kineto_op,
cpu_ev_idx_to_gpu_ops_map,
cpu_external_id_to_gpu_ops_map,
kineto_rf_id_to_kineto_op_map,
kineto_external_id_to_kineto_op_map,
)
Expand Down

0 comments on commit 474b99f

Please sign in to comment.