Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure et_relay subpackage #124

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/python_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,18 @@ jobs:

- name: Run Black
run: black . --check

- name: Run tests
run:
python -m pip install -r requirements.txt
python -m pip install et_replay
python et_replay/tests/test_execution_trace.py

- name: Validate imports
run:
python -m pip install fbgemm-gpu
python -c 'from et_replay import ExecutionTrace'
python -c 'from et_replay.comm import comms_utils'
python -c 'from et_replay.tools.validate_trace import TraceValidator'
python -c 'from et_replay.utils import trace_handler'

2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.venv/
__pycache__/
3 changes: 3 additions & 0 deletions et_replay/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from et_replay.execution_trace import ExecutionTrace

__all__ = ["ExecutionTrace"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
from __future__ import annotations

import json

from typing import List, Tuple

from et_replay.lib.comm import comms_utils
from et_replay.lib.comm.comms_utils import commsArgs
from et_replay.lib.comm.pytorch_backend_utils import supportedP2pOps

from et_replay.lib.execution_trace import ExecutionTrace
from et_replay import ExecutionTrace
from et_replay.comm import comms_utils
from et_replay.comm.comms_utils import commsArgs
from et_replay.comm.pytorch_backend_utils import supportedP2pOps

tensorDtypeMap = {
"Tensor(int)": "int",
Expand Down Expand Up @@ -63,7 +61,6 @@ def _parseBasicTrace(in_trace: List):
"""
newCommsTrace = []
for cnt, curComm in enumerate(in_trace):

newComm = commsArgs()
newComm.id = cnt
newComm.markerStack = curComm.get("markers")
Expand All @@ -84,7 +81,6 @@ def _parseBasicTrace(in_trace: List):


def _parseBasicTraceComms(curComm, newComm: commsArgs) -> None:

newComm.comms = comms_utils.paramToCommName(curComm["comms"].lower())
if newComm.markerStack is None:
newComm.markerStack = [newComm.comms]
Expand Down Expand Up @@ -165,7 +161,6 @@ def _parseKinetoUnitrace(in_trace: List, target_rank: int) -> List:
and entry["name"] == "record_param_comms"
and entry["args"]["rank"] == target_rank
):

newComm = commsArgs()
newComm.comms = comms_utils.paramToCommName(entry["args"]["comms"].lower())
newComm.id = commsCnt
Expand Down Expand Up @@ -244,9 +239,7 @@ def _parseExecutionTrace(
ranks = pg["ranks"]
groupCnt = pg["group_count"]
pgRanksMap[pgId] = (
ranks
if len(ranks) > 0
else list(range(pg["group_size"]))
ranks if len(ranks) > 0 else list(range(pg["group_size"]))
# rank list is empty when all ranks are in a pg
)
if ET_BACKENDID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@

import numpy as np
import torch
from et_replay.lib.comm.param_profile import paramTimer
from et_replay.lib.comm.pytorch_backend_utils import (
from torch._C._distributed_c10d import ProcessGroup # @manual

from et_replay.comm.param_profile import paramTimer
from et_replay.comm.pytorch_backend_utils import (
backendFunctions,
collectiveArgsHolder,
customized_backend,
supportedC10dBackends,
supportedDevices,
)
from torch._C._distributed_c10d import ProcessGroup # @manual

random.seed()

Expand Down Expand Up @@ -789,9 +790,7 @@ def __init__(self, args: Namespace) -> None:
self.quant_a2a_embedding_dim = args.quant_a2a_embedding_dim
self.quant_threshold = args.quant_threshold
self.dcheck = args.c
self.groupRanks = (
{}
) # record what ranks each process group will work on {pg_id, ranks}
self.groupRanks = {} # record what ranks each process group will work on {pg_id, ranks}
self.use_ext_dist = args.use_ext_dist
self.size_from_trace = False
self.init_method = args.init_method
Expand Down Expand Up @@ -1072,7 +1071,6 @@ def _prep_all_to_all_single(
scaleFactor: float,
allocate: bool = True,
) -> Tuple[torch.Tensor, torch.Tensor]:

ipTensor = None
opTensor = None
if allocate:
Expand Down Expand Up @@ -1193,7 +1191,6 @@ def _prep_all_gather_base(
scaleFactor: float,
allocate: bool = True,
) -> Tuple[torch.Tensor, torch.Tensor]:

opTensor = []
if not commsParams.size_from_trace:
numElementsOut = numElementsIn
Expand Down Expand Up @@ -1257,7 +1254,6 @@ def _prep_reduce_scatter(
scaleFactor: float,
allocate: bool = True,
) -> Tuple[torch.Tensor, torch.Tensor]:

ipTensor = []
opTensor = []
if not commsParams.size_from_trace:
Expand Down Expand Up @@ -1304,7 +1300,6 @@ def _prep_reduce_scatter_base(
scaleFactor: float,
allocate: bool = True,
) -> Tuple[torch.Tensor, torch.Tensor]:

ipTensor = []
opTensor = []
if not commsParams.size_from_trace:
Expand Down Expand Up @@ -1727,7 +1722,6 @@ def init_emb_lookup(collectiveArgs, commsParams, backendFuncs):
try:
# fbgemm_gpu can be downloaded from https://github.com/pytorch/FBGEMM/tree/main/fbgemm_gpu
from fbgemm_gpu.split_embedding_utils import generate_requests

from fbgemm_gpu.split_table_batched_embeddings_ops import (
ComputeDevice,
EmbeddingLocation,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
from typing import Dict, List, Optional

import torch

from et_replay.lib.comm.param_profile import paramTimer

from torch.distributed import ProcessGroup

from et_replay.comm.param_profile import paramTimer

logger = logging.getLogger(__name__)

supportedDevices = ["cpu", "cuda", "rocm", "tpu"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import torch
import torch.distributed as dist
import torch.nn as nn
from et_replay.lib.comm.param_profile import paramProfile
from et_replay.lib.comm.pytorch_backend_utils import (

from et_replay.comm.param_profile import paramProfile
from et_replay.comm.pytorch_backend_utils import (
backendFunctions,
collectiveArgsHolder,
)
Expand Down Expand Up @@ -634,7 +635,6 @@ def complete_single_op(self, collectiveArgs, retFlag=False):
self.device_sync(collectiveArgs)

def wait(self, collectiveArgs, retFlag=False):

# for backwards compatibility, use old wait functionality.
if len(collectiveArgs.waitObjIds) == 0:
self.complete_single_op(collectiveArgs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import torch_xla.core.xla_model as xm # @manual
import torch_xla.distributed.xla_multiprocessing as xmp # @manual

from .comms_utils import backendFunctions
from et_replay.comm.comms_utils import backendFunctions


class PyTorchTPUBackend(backendFunctions):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
import re

import torch
from et_replay.lib.execution_trace import NodeType
from fbgemm_gpu.split_table_batched_embeddings_ops import PoolingMode, WeightDecayMode

from param_bench.train.compute.python.lib.pytorch.config_util import create_op_args

from param_bench.train.compute.python.workloads.pytorch.split_table_batched_embeddings_ops import (
SplitTableBatchedEmbeddingBagsCodegenInputDataGenerator,
SplitTableBatchedEmbeddingBagsCodegenOp,
)

from et_replay.execution_trace import NodeType

# TODO: Add all torch dtypes to here
TORCH_DTYPES_RNG = {
Expand Down
File renamed without changes.
4 changes: 1 addition & 3 deletions et_replay/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ name = "et_replay"
version = "0.5.0"

[tool.setuptools.package-dir]
"et_replay.lib" = "lib"
"et_replay.lib.comm" = "lib/comm"
"et_replay.tools" = "tools"
"et_replay" = "."
"param_bench" = "../../param"

[project.scripts]
Expand Down
6 changes: 3 additions & 3 deletions et_replay/tests/test_execution_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import os
import unittest

from param_bench.train.compute.python.tools.execution_trace import ExecutionTrace
from param_bench.train.compute.python.tools.validate_trace import TraceValidator
from et_replay import ExecutionTrace
from et_replay.tools.validate_trace import TraceValidator

CURR_DIR = os.path.dirname(os.path.realpath(__file__))


class TestTraceLoadAndValidate(unittest.TestCase):
def setUp(self):
self.trace_base = os.path.join(CURR_DIR, "data")
self.trace_base = os.path.join(CURR_DIR, "inputs")

def _test_and_validate_trace(self, trace_file):
with (
Expand Down
3 changes: 1 addition & 2 deletions et_replay/tools/comm_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import numpy as np
import torch

from et_replay.comm import comms_utils
from et_replay.comm.comms_utils import (
bootstrap_info_holder,
Expand Down Expand Up @@ -294,7 +295,6 @@ def checkArgs(self, args: argparse.Namespace) -> None:
and not os.path.isfile(self.trace_file)
and not os.path.isdir(self.trace_file)
):

raise ValueError(
f"The specified trace path '{self.trace_file}' is neither a "
"file nor a directory. Please provide a valid path."
Expand Down Expand Up @@ -637,7 +637,6 @@ def prepComms(
# for all_to_allv, we can shrink the size if running on smaller scale
# this is for sanity test or debug purpose only since we don't always get to run very large scale
if self.shrink:

cur_world_size = self.collectiveArgs.world_size
real_world_size = cur_world_size

Expand Down
26 changes: 10 additions & 16 deletions et_replay/tools/et_replay.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import gc
import json

import logging
import os
import sys
Expand All @@ -12,13 +11,6 @@

import numpy as np
import torch

from et_replay.lib.comm import comms_utils

from et_replay.lib.execution_trace import ExecutionTrace, NodeType

from et_replay.lib.utils import trace_handler

from param_bench.train.compute.python.lib import pytorch as lib_pytorch
from param_bench.train.compute.python.lib.init_helper import load_modules
from param_bench.train.compute.python.workloads import pytorch as workloads_pytorch
Expand All @@ -29,7 +21,11 @@
from torch._inductor.runtime.triton_heuristics import grid, split_scan_grid
from torch.profiler import ExecutionTraceObserver

from ..lib.et_replay_utils import (
from et_replay.comm import comms_utils
from et_replay.et_replay_utils import (
TORCH_DTYPES_BYTES,
TORCH_DTYPES_RNG,
TORCH_DTYPES_RNG_str,
build_fbgemm_func,
build_torchscript_func,
build_triton_func,
Expand All @@ -48,10 +44,9 @@
is_tensor,
is_tensor_list,
skip_op,
TORCH_DTYPES_BYTES,
TORCH_DTYPES_RNG,
TORCH_DTYPES_RNG_str,
)
from et_replay.execution_trace import ExecutionTrace, NodeType
from et_replay.utils import trace_handler


class ExgrReplayManager:
Expand Down Expand Up @@ -613,11 +608,11 @@ def allocate_tensors(self):
self.tensors_mapping[
(node.id, tuple(node.inputs[2][:5]), True)
]
][i] = (i * nnz)
][i] = i * nnz
else:
self.tensor_registry_permanent[
self.tensors_mapping[(node.id, tuple(node.inputs[2]), True)]
][i] = (i * nnz)
][i] = i * nnz
######

def build_func(self, node):
Expand Down Expand Up @@ -1188,8 +1183,7 @@ def run_op(self, node, iter):
not in self.unchangeable_intermediate_tensors
):
if (
self.tensors_mapping[(node.id, t_id, False)]
not in self.instantiate
self.tensors_mapping[(node.id, t_id, False)] not in self.instantiate
# and self.tensors_mapping[(node.id, t_id, False)]
# not in self.tensor_registry
):
Expand Down
2 changes: 1 addition & 1 deletion et_replay/tools/validate_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import gzip
import json

from et_replay.lib.execution_trace import ExecutionTrace
from et_replay.execution_trace import ExecutionTrace


class TraceValidator:
Expand Down
2 changes: 1 addition & 1 deletion et_replay/lib/utils.py → et_replay/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import uuid
from typing import Any, Dict

from et_replay.lib.execution_trace import ExecutionTrace
from et_replay.execution_trace import ExecutionTrace
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for the record, this can also be from et_replay import ExecutionTrace. But this works too. (y)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! This makes sense, updated.



def get_tmp_trace_filename() -> str:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
torch
future
numpy
pydot
2 changes: 1 addition & 1 deletion train/comms/pt/commsTraceParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing import List, Tuple

from et_replay.lib.execution_trace import ExecutionTrace
from et_replay import ExecutionTrace

from param_bench.train.comms.pt import comms_utils
from param_bench.train.comms.pt.comms_utils import commsArgs
Expand Down
Loading