Skip to content

Commit

Permalink
Output non-pipelined sharded modules from rewrite + enhance pipelinab…
Browse files Browse the repository at this point in the history
…ility test (pytorch#2579)

Summary:
Pull Request resolved: pytorch#2579

Includes non-pipelined modules to `_rewrite_model` output; for testing and logging purposes.

Reviewed By: sarckk

Differential Revision: D63445036

fbshipit-source-id: aced9d92838b19d67e49d24d2edcf6dc5425fc3d
  • Loading branch information
che-sh authored and facebook-github-bot committed Nov 29, 2024
1 parent 7e7819e commit 99162b5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
1 change: 1 addition & 0 deletions torchrec/distributed/train_pipeline/train_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ def _pipeline_model(
self._model,
self._original_forwards,
self._pipelined_preprocs,
_,
) = _rewrite_model(
model=self._model,
context=context,
Expand Down
40 changes: 30 additions & 10 deletions torchrec/distributed/train_pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ def _rewrite_model( # noqa C901
torch.nn.Module,
List[Callable[..., Any]],
List[PipelinedPreproc],
List[str],
]:
input_model = model
# Get underlying nn.Module
Expand Down Expand Up @@ -1235,6 +1236,7 @@ def _rewrite_model( # noqa C901
original_forwards = []

pipelined_preprocs: Set[PipelinedPreproc] = set()
non_pipelined_sharded_modules = []

for node in graph.nodes:
if node.op == "call_module" and node.target in sharded_modules:
Expand Down Expand Up @@ -1265,6 +1267,7 @@ def _rewrite_model( # noqa C901
logger.warning(
f"Module '{node.target}'' will not be pipelined, due to input modifications"
)
non_pipelined_sharded_modules.append(node.target)

# JIT script unsharded modules if applicable.
if apply_jit:
Expand All @@ -1273,7 +1276,20 @@ def _rewrite_model( # noqa C901
if isinstance(input_model, DistributedModelParallel):
input_model.module = graph_model

return pipelined_forwards, input_model, original_forwards, list(pipelined_preprocs)
if non_pipelined_sharded_modules:
logger.warn(
"Sharded modules were not pipelined: %s. "
+ "This should be fixed for pipelining to work to the full extent.",
", ".join(non_pipelined_sharded_modules),
)

return (
pipelined_forwards,
input_model,
original_forwards,
list(pipelined_preprocs),
non_pipelined_sharded_modules,
)


def _override_input_dist_forwards(
Expand Down Expand Up @@ -1559,15 +1575,19 @@ def start_sparse_data_dist(self, batch: In) -> In:
if not self.initialized:
# Step 1: Pipeline input dist in trec sharded modules
# TODO (yhshin): support preproc modules for `StagedTrainPipeline`
self._pipelined_modules, self.model, self._original_forwards, _ = (
_rewrite_model(
model=self.model,
context=self.context,
dist_stream=self.data_dist_stream,
batch=batch,
apply_jit=self.apply_jit,
pipelined_forward=self._pipelined_forward,
)
(
self._pipelined_modules,
self.model,
self._original_forwards,
_,
_,
) = _rewrite_model(
model=self.model,
context=self.context,
dist_stream=self.data_dist_stream,
batch=batch,
apply_jit=self.apply_jit,
pipelined_forward=self._pipelined_forward,
)
# initializes input dist, so we can override input dist forwards
_start_data_dist(self._pipelined_modules, batch, self.context)
Expand Down

0 comments on commit 99162b5

Please sign in to comment.