Skip to content

Commit

Permalink
Configure global logging, refactor classes to use direct logging calls
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed Jul 2, 2024
1 parent 5143d92 commit fc6bfa8
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 127 deletions.
24 changes: 9 additions & 15 deletions src/converter/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,23 @@
import logging
import sys
import traceback
from logging import FileHandler

from .pytorch_converter import PyTorchConverter
from .text_converter import TextConverter


def get_logger(log_filename: str) -> logging.Logger:
def setup_logging(log_filename: str) -> None:
formatter = logging.Formatter("%(levelname)s [%(asctime)s] %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p")

file_handler = FileHandler(log_filename, mode="w")
file_handler = logging.FileHandler(log_filename, mode="w")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)

stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.WARNING)
stream_handler.setFormatter(formatter)

logger = logging.getLogger(__file__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)

return logger
logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, stream_handler])


def main() -> None:
Expand All @@ -45,26 +39,26 @@ def main() -> None:
parser.add_argument("--log_filename", type=str, default="debug.log", help="Log filename")
args = parser.parse_args()

logger = get_logger(args.log_filename)
logger.debug(" ".join(sys.argv))
setup_logging(args.log_filename)
logging.debug(" ".join(sys.argv))

try:
if args.input_type == "Text":
converter = TextConverter(args.input_filename, args.output_filename, args.num_npus, args.num_passes, logger)
converter = TextConverter(args.input_filename, args.output_filename, args.num_npus, args.num_passes)
converter.convert()
elif args.input_type == "PyTorch":
converter = PyTorchConverter(args.input_filename, args.output_filename, logger)
converter = PyTorchConverter(args.input_filename, args.output_filename)
converter.convert()
else:
supported_types = ["Text", "PyTorch"]
logger.error(
logging.error(
f"The input type '{args.input_type}' is not supported. "
f"Supported types are: {', '.join(supported_types)}."
)
sys.exit(1)
except Exception:
traceback.print_exc()
logger.debug(traceback.format_exc())
logging.debug(traceback.format_exc())
sys.exit(1)


Expand Down
55 changes: 26 additions & 29 deletions src/converter/pytorch_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,18 @@ class PyTorchConverter:
Attributes
input_filename (str): Input file name containing PyTorch execution trace.
output_filename (str): Output file name for the converted Chakra trace.
logger (logging.Logger): Logger for logging information during conversion.
"""

def __init__(self, input_filename: str, output_filename: str, logger: logging.Logger) -> None:
def __init__(self, input_filename: str, output_filename: str) -> None:
"""
Initialize the PyTorch to Chakra converter. It sets up necessary attributes and prepares the environment.
Args:
input_filename (str): Name of the input file containing PyTorch execution trace.
output_filename (str): Name of the output file for the converted Chakra trace.
logger (logging.Logger): Logger for logging information during the conversion.
"""
self.input_filename = input_filename
self.output_filename = output_filename
self.logger = logger

def convert(self) -> None:
"""Convert PyTorch execution traces into the Chakra format."""
Expand Down Expand Up @@ -92,12 +89,12 @@ def load_pytorch_execution_traces(self) -> Dict:
Returns
Dict: The loaded PyTorch execution trace data.
"""
self.logger.info("Loading PyTorch execution traces from file.")
logging.info("Loading PyTorch execution traces from file.")
try:
with open(self.input_filename, "r") as pytorch_et:
return json.load(pytorch_et)
except IOError as e:
self.logger.error(f"Error opening file {self.input_filename}: {e}")
logging.error(f"Error opening file {self.input_filename}: {e}")
raise Exception(f"Could not open file {self.input_filename}") from e

def _parse_and_instantiate_nodes(
Expand All @@ -115,7 +112,7 @@ def _parse_and_instantiate_nodes(
Tuple: A tuple containing PyTorch schema, PID, time, start timestamp, finish timestamp, and dictionary of
PyTorch node objects.
"""
self.logger.info("Extracting and processing node data from execution trace.")
logging.info("Extracting and processing node data from execution trace.")
pytorch_schema = pytorch_et_data["schema"]
pytorch_pid = pytorch_et_data["pid"]
pytorch_time = pytorch_et_data["time"]
Expand Down Expand Up @@ -155,7 +152,7 @@ def _establish_parent_child_relationships(
self._update_node_type_counts(node_type_counts, pytorch_node)

for node_type, count in node_type_counts.items():
self.logger.info(f"{node_type}: {count}")
logging.info(f"{node_type}: {count}")

return pytorch_node_objects

Expand Down Expand Up @@ -244,13 +241,13 @@ def open_chakra_execution_trace(self, output_filename: str) -> IO[bytes]:
Returns:
IO[bytes]: File handle for the Chakra execution trace output file.
"""
self.logger.info(f"Opening Chakra execution trace file: {output_filename}")
logging.info(f"Opening Chakra execution trace file: {output_filename}")
try:
chakra_et = open(output_filename, "wb") # noqa: SIM115
return chakra_et
except IOError as e:
err_msg = f"Error opening file {output_filename}: {e}"
self.logger.error(err_msg)
logging.error(err_msg)
raise Exception(err_msg) from e

def convert_nodes(self, pytorch_nodes: Dict[int, PyTorchNode], chakra_nodes: Dict[int, ChakraNode]) -> None:
Expand Down Expand Up @@ -302,7 +299,7 @@ def convert_to_chakra_node(
Returns:
ChakraNode: The converted Chakra node.
"""
self.logger.debug(f"Converting PyTorch node ID {pytorch_node.id} to Chakra node.")
logging.debug(f"Converting PyTorch node ID {pytorch_node.id} to Chakra node.")
chakra_node = ChakraNode()
chakra_node.id = pytorch_node.id
chakra_node.name = pytorch_node.name
Expand Down Expand Up @@ -483,7 +480,7 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901
if node_op_type == PyTorchNodeType.GPU_OP:
if last_visited_any and last_visited_any.id not in current_node.data_deps:
current_node.data_deps.append(last_visited_any.id)
self.logger.debug(
logging.debug(
f"GPU Node ID {current_node.id} now has a data dependency on Node ID {last_visited_any.id}"
)
last_visited_any = last_visited_non_gpu
Expand All @@ -492,12 +489,12 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901
dep_id = pytorch_node.inter_thread_dep
if dep_id not in current_node.data_deps:
current_node.data_deps.append(dep_id)
self.logger.debug(
logging.debug(
f"CPU Node ID {current_node.id} now has an inter-thread data dependency on Node ID {dep_id}"
)
if last_visited_non_gpu and last_visited_non_gpu.id not in current_node.data_deps:
current_node.data_deps.append(last_visited_non_gpu.id)
self.logger.debug(
logging.debug(
f"CPU Node ID {current_node.id} now has a data dependency on non-GPU Node ID "
f"{last_visited_non_gpu.id}"
)
Expand Down Expand Up @@ -533,9 +530,9 @@ def remove_dangling_nodes(self, chakra_nodes: Dict[int, ChakraNode]) -> Dict[int
del chakra_nodes[node_id]

if dangling_nodes:
self.logger.info(f"Identified and removed {len(dangling_nodes)} dangling nodes:")
logging.info(f"Identified and removed {len(dangling_nodes)} dangling nodes:")
for node_id in dangling_nodes:
self.logger.info(f" - Node ID {node_id}")
logging.info(f" - Node ID {node_id}")

return chakra_nodes

Expand Down Expand Up @@ -580,7 +577,7 @@ def dfs(node_id: int, path: List[int]) -> bool:
"""
if node_id in stack:
cycle_nodes = " -> ".join([chakra_nodes[n].name for n in path + [node_id]])
self.logger.error(f"Cyclic dependency detected: {cycle_nodes}")
logging.error(f"Cyclic dependency detected: {cycle_nodes}")
return True
if node_id in visited:
return False
Expand Down Expand Up @@ -615,12 +612,12 @@ def write_chakra_et(
Encode and write both the metadata and individual nodes to create a
complete execution trace.
"""
self.logger.info("Writing Chakra execution trace.")
logging.info("Writing Chakra execution trace.")
self._write_global_metadata(
chakra_et, pytorch_schema, pytorch_pid, pytorch_time, pytorch_start_ts, pytorch_finish_ts
)
self._encode_and_write_nodes(chakra_et, chakra_nodes)
self.logger.info("Chakra execution trace writing completed.")
logging.info("Chakra execution trace writing completed.")

def _write_global_metadata(
self,
Expand All @@ -637,7 +634,7 @@ def _write_global_metadata(
This process includes encoding metadata like schema, process ID, timestamps,
and other relevant information for the Chakra execution trace.
"""
self.logger.info("Encoding global metadata for Chakra execution trace.")
logging.info("Encoding global metadata for Chakra execution trace.")
global_metadata = GlobalMetadata(
attr=[
ChakraAttr(name="schema", string_val=pytorch_schema),
Expand All @@ -656,12 +653,12 @@ def _encode_and_write_nodes(self, chakra_et: IO[bytes], chakra_nodes: Dict[int,
Each node from the PyTorch execution trace is encoded and written into the Chakra format. This includes node
IDs, names, types, dependencies, and other attributes.
"""
self.logger.info("Encoding and writing nodes for Chakra execution trace.")
logging.info("Encoding and writing nodes for Chakra execution trace.")
seen_nids = set()
for nid in sorted(chakra_nodes.keys()):
if nid in seen_nids:
err_msg = f"Duplicate NID {nid} detected in Chakra nodes."
self.logger.error(err_msg)
logging.error(err_msg)
raise ValueError(err_msg)
seen_nids.add(nid)
chakra_node = chakra_nodes[nid]
Expand All @@ -676,7 +673,7 @@ def close_chakra_execution_trace(self, chakra_et: IO[bytes]) -> None:
Args:
chakra_et (IO[bytes]): File handle for the Chakra execution trace output file.
"""
self.logger.info("Closing Chakra execution trace file.")
logging.info("Closing Chakra execution trace file.")
if chakra_et and not chakra_et.closed:
chakra_et.close()

Expand All @@ -697,7 +694,7 @@ def simulate_execution(
pytorch_nodes (Dict[int, PyTorchNode]): The PyTorch nodes to reference for additional information.
parent_to_children_map (Dict[int, List[int]]): Mapping from parent node IDs to their child node IDs.
"""
self.logger.info("Simulating execution of Chakra nodes based on data dependencies.")
logging.info("Simulating execution of Chakra nodes based on data dependencies.")

ready_cpu_nodes = [
(node_id, chakra_nodes[node_id])
Expand All @@ -723,7 +720,7 @@ def simulate_execution(
cpu_node_id, cpu_node = ready_cpu_nodes.pop(0)
current_cpu_node = (cpu_node_id, current_time)
issued_nodes.add(cpu_node_id)
self.logger.info(
logging.info(
f"Issuing CPU Node ID {cpu_node_id} ({cpu_node.name}) at {current_time}us with duration "
f"{cpu_node.duration_micros}us"
)
Expand All @@ -732,7 +729,7 @@ def simulate_execution(
gpu_node_id, gpu_node = ready_gpu_nodes.pop(0)
current_gpu_node = (gpu_node_id, current_time)
issued_nodes.add(gpu_node_id)
self.logger.info(
logging.info(
f"Issuing GPU Node ID {gpu_node_id} ({gpu_node.name}) at {current_time}us with duration "
f"{gpu_node.duration_micros}us"
)
Expand All @@ -743,14 +740,14 @@ def simulate_execution(
current_cpu_node
and current_time - current_cpu_node[1] >= chakra_nodes[current_cpu_node[0]].duration_micros
):
self.logger.info(f"CPU Node ID {current_cpu_node[0]} completed at {current_time}us")
logging.info(f"CPU Node ID {current_cpu_node[0]} completed at {current_time}us")
current_cpu_node = None

if (
current_gpu_node
and current_time - current_gpu_node[1] >= chakra_nodes[current_gpu_node[0]].duration_micros
):
self.logger.info(f"GPU Node ID {current_gpu_node[0]} completed at {current_time}us")
logging.info(f"GPU Node ID {current_gpu_node[0]} completed at {current_time}us")
current_gpu_node = None

for node_id in list(issued_nodes):
Expand All @@ -766,4 +763,4 @@ def simulate_execution(

issued_nodes.clear()

self.logger.info("Simulation of Chakra node execution completed.")
logging.info("Simulation of Chakra node execution completed.")
3 changes: 1 addition & 2 deletions src/converter/text_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ def __init__(self, line: str) -> None:

class TextConverter:
def __init__(
self, input_filename: str, output_filename: str, num_npus: int, num_passes: int, logger: logging.Logger
self, input_filename: str, output_filename: str, num_npus: int, num_passes: int
) -> None:
self.input_filename = input_filename
self.output_filename = output_filename
self.num_npus = num_npus
self.num_passes = num_passes
self.logger = logger
self.next_node_id = 0

def get_global_metadata(self):
Expand Down
Loading

0 comments on commit fc6bfa8

Please sign in to comment.