Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add llama pipeline parallel #59

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions model_zoo/ModelParallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,78 @@ def setup(use_cpu: bool = True) -> Tuple[int, int]:
torch.manual_seed(1)
return local_rank, world_size

class DistMapping(object):
'''
A node with 8 GPUs, tp_size = 4, pp_size = 2

2 tp groups:

- [0, 1, 2, 3]
- [4, 5, 6, 7]

4 pp groups:

- [0, 4]
- [1, 5]
- [2, 6]
- [3, 7]
'''
def __init__(self,
world_size=1,
rank=0,
tp_size=1,
pp_size=1):
self.tp_size = tp_size
self.pp_size = pp_size
self.world_size = world_size
self.rank = rank

if pp_size * tp_size != world_size:
raise ValueError("world_size must equal to pp_size * tp_size")
self.pp_groups = []
self.tp_groups = []

# init pp group
for i in range(tp_size):
ranks = range(i, world_size, tp_size)
self.pp_groups.append(list(ranks))

# init tp group
for i in range(pp_size):
ranks = range(i * tp_size, (i + 1) * tp_size)
self.tp_groups.append(list(ranks))

self.pp_rank = self.rank // self.tp_size
self.tp_rank = self.rank % self.tp_size

self.tp_group = self.tp_groups[self.pp_rank]
self.pp_group = self.pp_groups[self.tp_rank]

self.tp_proc_group = None
self.pp_proc_group = None

print(f"rank: {self.rank}, tp_groups: {self.tp_groups}, pp_groups: {self.pp_groups}")

def is_last_pp_rank(self):
return self.pp_rank == self.pp_size - 1

def is_first_pp_rank(self):
return self.pp_rank == 0

def has_pp(self):
return self.pp_size > 1

def prev_pp_rank(self):
p = self.rank - self.tp_size
if p < 0:
p = p + self.world_size
return p

def next_pp_rank(self):
p = self.rank + self.tp_size
if p >= self.world_size:
p = p - self.world_size
return p

class ParallelEmbedding(torch.nn.Module):
def __init__(
Expand Down
86 changes: 86 additions & 0 deletions model_zoo/llama_pp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Usage example

## Download model from Hugging Face

Download the model file from the [Hugging Face](https://huggingface.co/models).

## Convert model params

Due to the inconsistency with the implementation of Hugging Face's RotaryPositionEmbedding function, we need to convert the weight parameters.

```
cd ppl.pmx/model_zoo/llama/huggingface
python ConvertWeightToPmx.py --input_dir <hf_model_dir> --output_dir <pmx_model_dir>
```

you can find pmx model file in`<pmx_model_dir>` after the conversion.

## Spliting model
Here we support split model with tensor parallel and pipelien parallel in runtime, so we don't need extra split model script.

## Testing Model

The `Demo.py` script provides functionality to test the model for correctness before exporting.

```bash
OMP_NUM_THREADS=1 python Demo.py --nproc_per_node $pp_size \
--ckpt_dir <llama_dir> \
--tokenizer_path <llama_tokenizer_dir>/tokenizer.model \
--temperature 0 \
--top_p 0.95 \
--batch 4 \
--seqlen_scale_up 1 \
--unaligned_batch 0 \
--max_gen_len 16 \
--friendly_gqa 0 \
--fused_qkv 1 \
--fused_kvcache 0 \
--fused_ffn_glu 1 \
--auto_causal 1 \
--quantized_cache 1 \
--cache_layout 3 \
--cache_mode 0 \
--dynamic_batching 1 \
--pp_size $pp_size
```

- `OMP_NUM_THREADS`: This parameter determines the number of OpenMP threads. It is set to 1 to prevent excessive CPU core usage. Each PyTorch process opens an OpenMP thread pool, and setting it to 1 avoids occupying too many CPU cores.
- `--nproc_per_node`: Specifies the number of model slices per node.

## Exporting Model

To export a model, you will use the `Export.py` script provided. Here's an example command for exporting a 13B model with 1 GPU:

```bash
OMP_NUM_THREADS=1 torchrun --nproc_per_node $pp_size Export.py \
--ckpt_dir <llama_dir> \
--export_path <export_dir> \
--friendly_gqa 1 \
--fused_qkv 1 \
--fused_kvcache 1 \
--fused_ffn_glu 1 \
--auto_causal 1 \
--quantized_cache 1 \
--cache_layout 3 \
--cache_mode 0 \
--dynamic_batching 1 \
--pp_size $pp_size
```

Make sure to replace `$pp_size` with the actual number of GPUs you want to use.

## Generating Test Data

This script demonstrates how to generate test data for steps 0, 1, and 255 using the specified command.

```bash
OMP_NUM_THREADS=1 torchrun --nproc_per_node $num_gpu Demo.py --ckpt_dir <llama_dir> --tokenizer_path <llama_tokenizer_dir>/tokenizer.model --fused_qkv 1 --fused_kvcache 1 --auto_causal 1 --quantized_cache 1 --dynamic_batching 1 --seqlen_scale_up 1 --max_gen_len 256 --dump_steps 0,1,255 --dump_tensor_path <dump_dir> --batch 1
```

- `seqlen_scale_up`: Scale factor for input byte size (sequence length scaled up by 8).
- `max_gen_len`: Specifies the maximum generated output length in bytes.
- `dump_steps`: Steps at which to dump the test data.
- `dump_tensor_path`: Path to store the dumped test data.
- `batch`: Specifies the batch size for data processing.

Make sure to replace `<llama_dir>` , `<llama_tokenizer_dir>` and `<dump_tensor_path>`with the actual directory paths in your environment.
174 changes: 174 additions & 0 deletions model_zoo/llama_pp/huggingface/Demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import sys
import os
import json

from pathlib import Path
from typing import List

sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/../..")

import llama_pp.modeling.Loader as Loader
from llama.huggingface.Tokenizer import Tokenizer
from ModelParams import ModelParams

import torch.multiprocessing as mp
import argparse

import signal
import traceback
import os
import time

def ParseCommandLineArgs():
parser = argparse.ArgumentParser()
# basic command
parser.add_argument(
"--nnodes",
type=int,
default=1,
help="Number of nodes, or the range of nodes in form <minimum_nodes>:<maximum_nodes>.",
)
parser.add_argument(
"--nproc_per_node",
type=int,
default=1,
help="Number of workers per node;"
)

parser.add_argument(
"--master_port",
type=str,
default="29500"
)

parser.add_argument(
"--local_addr",
type=str,
default="localhost"
)

# llm param
parser.add_argument("--ckpt_dir", type=str)
parser.add_argument("--tokenizer_path", type=str)
parser.add_argument("--temperature", type=float, default=0.0)
parser.add_argument("--top_p", type=float, default=0.95)
parser.add_argument("--batch", type=int, default=4)

parser.add_argument("--seqlen_scale_up", type=int, default=1)
parser.add_argument("--unaligned_batch", type=int, default=False)
parser.add_argument("--max_gen_len", type=int, default=256)
parser.add_argument("--friendly_gqa", type=int, default=False)
parser.add_argument("--fused_qkv", type=int, default=False)

parser.add_argument("--fused_kvcache", type=int, default=True)
parser.add_argument("--fused_ffn_glu", type=int, default=True)
parser.add_argument("--auto_causal", type=int, default=True)
parser.add_argument("--quantized_cache", type=int, default=True)
parser.add_argument("--cache_layout", type=int, default=0)

parser.add_argument("--cache_mode", type=int, default=0)
parser.add_argument("--dynamic_batching", type=int, default=True)
parser.add_argument("--pp_size", type=int, default=1)
# parser.add_argument("--tp_size", type=int, default=1)
parser.add_argument("--dump_tensor_path", type=str, default=None)
parser.add_argument("--dump_steps", type=str, default=None)

args = parser.parse_args()
if args.dump_steps:
args.dump_steps = [int(s) for s in args.dump_steps.split(",")]

args.unaligned_batch = bool(args.unaligned_batch)
args.friendly_gqa = bool(args.friendly_gqa)
args.fused_qkv = bool(args.fused_qkv)
args.fused_kvcache = bool(args.fused_kvcache)
args.fused_ffn_glu = bool(args.fused_ffn_glu)
args.auto_causal = bool(args.auto_causal)
args.quantized_cache = bool(args.quantized_cache)
args.dynamic_batching = bool(args.dynamic_batching)

args.world_size = args.nproc_per_node * args.nnodes
return args


def set_dist_env_var(rank: int, world_size: int, local_addr: str, master_port: str):
os.environ["RANK"] = str(rank)
os.environ["LOCAL_RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["MASTER_ADDR"] = local_addr
os.environ["MASTER_PORT"] = master_port

def main(rank: int, args: argparse.Namespace, queue: mp.Queue, global_start=None):
set_dist_env_var(rank, args.world_size, args.local_addr, args.master_port)

tokenizer = Tokenizer(model_path=args.tokenizer_path)

with open(Path(args.ckpt_dir) / "pmx_params.json", "r") as f:
params = json.loads(f.read())
params: ModelParams = ModelParams(**params)

generator = Loader.load(
args.ckpt_dir, params, args.friendly_gqa,
args.fused_qkv, args.fused_kvcache, args.fused_ffn_glu,
args.auto_causal, args.quantized_cache, args.cache_layout,
args.cache_mode, args.dynamic_batching,
False, False, False, False,
0, pp_size=args.pp_size,
dump_tensor_path=args.dump_tensor_path, dump_steps=args.dump_steps
)

if args.unaligned_batch:
test_prompt = [ # For these prompts, the expected answer is the natural continuation of the prompt
"I believe the meaning of life is",
"Simply put, the theory of relativity states that ",
"""A brief message congratulating the team on the launch:

Hi everyone,

I just """,
# Few shot prompt (providing a few examples before asking model to complete more);
"""Translate English to French:

sea otter => loutre de mer
peppermint => menthe poivrée
plush girafe => girafe peluche
cheese =>""",
]
test_prompt = [tokenizer.encode(t, bos=True, eos=False) for t in test_prompt]

prompt_tokens = test_prompt.copy()
for _ in range((args.batch - 1) // len(test_prompt)):
prompt_tokens.extend(test_prompt)
else:
test_prompt = "I believe the meaning of life is"
test_prompt = tokenizer.encode(test_prompt, bos=True, eos=False)

_scale_up_prompt = []
for _ in range(args.seqlen_scale_up):
_scale_up_prompt.extend(test_prompt)
test_prompt = _scale_up_prompt

prompt_tokens = [test_prompt for _ in range(args.batch)]

print(f"prepared {len(prompt_tokens)} prompts")
results = generator.generate(
prompt_tokens[:args.batch], tokenizer.get_eos_id(), tokenizer.get_pad_id(),
max_gen_len=args.max_gen_len, temperature=args.temperature, top_p=args.top_p, top_k=0,
queue=queue, global_start=global_start
)
if generator.model.dist_mapping.is_last_pp_rank():
for result in results:
print(result)
print(tokenizer.decode(result))
print("\n==================================\n")


if __name__ == "__main__":
args = ParseCommandLineArgs()
print(args)
mp.set_start_method('spawn')
queue = mp.Queue()
global_start = time.time()
# tid_dict = mp.Manager().dict()
# lock = mp.Lock()

mp.spawn(main, nprocs=args.world_size, args=(args, queue, global_start), join=True)
43 changes: 43 additions & 0 deletions model_zoo/llama_pp/huggingface/Export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import fire
import sys
import os
import json

from pathlib import Path

sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/../..")

import llama_pp.modeling.Loader as Loader
from ModelParams import ModelParams

def main(
ckpt_dir: str,
export_path: str,
friendly_gqa: bool = False, # done gqa by repeating key and value by key_value_cache op
fused_qkv: bool = True, # fuse qkv linear
fused_kvcache: bool = True, # fuse key_value_cache and multi_head_attention
fused_ffn_glu: bool = True, # fuse feed forward gate linear unit
auto_causal: bool = True, # causal mask is auto done by attention op, no need to pass additional mask to the model
quantized_cache: bool = True, # 8bit kv cache quantization
cache_layout: int = 3, # change kv cache layout for hardware performance friendly
cache_mode: int = 0, # change kv cache indexing mode for memory management friendly, only affected when dynamic_batching == True
dynamic_batching: bool = True, # use dynamic batching scheduling
pp_size: int = 1,
):
with open(Path(ckpt_dir) / "pmx_params.json", "r") as f:
params = json.loads(f.read())
params: ModelParams = ModelParams(**params)

generator = Loader.load(
ckpt_dir, params, friendly_gqa,
fused_qkv, fused_kvcache, fused_ffn_glu,
auto_causal, quantized_cache, cache_layout,
cache_mode, dynamic_batching,
False, False, False, True,
0, pp_size=pp_size
)

generator.export(export_path)

if __name__ == "__main__":
fire.Fire(main)
Loading