Skip to content

Commit

Permalink
Merge branch 'main' into qwen
Browse files Browse the repository at this point in the history
  • Loading branch information
xiezipeng-ML authored Sep 18, 2024
2 parents e42b8d0 + 8d35c08 commit 4c803dc
Show file tree
Hide file tree
Showing 45 changed files with 2,330 additions and 147 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/py.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ env:
ONEFLOW_SRC: oneflow-src
on:
pull_request:
types: [review_requested]
types: [opened, review_requested, ready_for_review, synchronize, unlocked]
branches:
- "*"
workflow_dispatch:
Expand Down
2 changes: 1 addition & 1 deletion docs/source/tutorials/basics/Distributed_Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ train.dist.pipeline_num_layers = model.cfg.hidden_layers
- `train.dist.pipeline_num_layers` must be set consistent with the model layers. If unset, it will use the default value `1000`,
which might trigger unexpected behavior.

- For models which have been configured with pipeline parallelism(e.g., BERT, GPT-2, T5 and ViT), you can simply update the distributed config to execute pipeline parallel training on them. If you need to train your own model with pipeline parallel strategy, please refer to [Write Models](https://libai.readthedocs.io/en/latest/tutorials/basics/Write_Models.html) for more details about configuring your own model with pipeline parallelism.
- For models which have been configured with pipeline parallelism(e.g., BERT, GPT-2, T5 and ViT), you can simply update the distributed config to execute pipeline parallel training on them. If you need to train your own model with pipeline parallel strategy, please refer to [Customize Parallelism](https://libai.readthedocs.io/en/latest/tutorials/advanced_tutorials/customize_parallel.html#write-your-own-pipeline-parallel-model) for more details about configuring your own model with pipeline parallelism.

#### **Data Parallel + Tensor Parallel for 2D Parallel Training on 8 GPUs**

Expand Down
4 changes: 2 additions & 2 deletions libai/engine/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,8 @@ def get_batch(

if mixup_func is not None:
images, labels = mixup_func(
data.get("images").tensor.cuda(),
data.get("labels").tensor.cuda(),
data.get("images").tensor.to(input_placement_device),
data.get("labels").tensor.to(input_placement_device),
)
data.get("images").tensor = images
data.get("labels").tensor = labels
Expand Down
6 changes: 3 additions & 3 deletions libai/evaluation/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from libai.utils import distributed as dist


def pad_batch(x_dict, batch_size, last_batch_lack, is_last_batch):
def pad_batch(x_dict, batch_size, last_batch_lack, is_last_batch, device="cuda"):
x = list(x_dict.values())[0]
tensor_batch = x.shape[0]
assert tensor_batch <= batch_size
Expand All @@ -37,9 +37,9 @@ def pad_batch(x_dict, batch_size, last_batch_lack, is_last_batch):
for key, xi in x_dict.items():
pad_shape = (batch_size, *xi.shape[1:])
local_xi = xi.to_global(
sbp=flow.sbp.broadcast, placement=flow.env.all_device_placement("cuda")
sbp=flow.sbp.broadcast, placement=flow.env.all_device_placement(device)
).to_local()
padded_xi = flow.zeros(pad_shape, dtype=xi.dtype, device="cuda")
padded_xi = flow.zeros(pad_shape, dtype=xi.dtype, device=device)
padded_xi[:tensor_batch, ...] = padded_xi[:tensor_batch, ...] + local_xi
for i in range(last_batch_lack - 1):
start_idx = tensor_micro_batch_size * (data_parallel_size - i - 1) - 1
Expand Down
24 changes: 23 additions & 1 deletion libai/inference/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import logging
from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import Any, Dict

import oneflow as flow
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(
pipeline_num_layers=None,
model_path=None,
mode="libai",
device="cuda",
**kwargs,
):
# init cfg
Expand All @@ -60,10 +62,21 @@ def __init__(
pipeline_stage_id,
pipeline_num_layers,
)
self.device = device
self.cfg.train.dist.device_type = device
dist.setup_dist_util(self.cfg.train.dist)
logger.info(self.cfg.train.dist)

# initial and load model
self.model_path = model_path
if self.model_path is not None:
# If a model_path is provided in BasePipeline,
# we use it with priority, overwrite the pretrained_model_path in config
self.cfg.model.cfg.pretrained_model_path = self.model_path
else:
# If the model_path in BasePipeline is None, then use the one from the config
assert "pretrained_model_path" in self.cfg.model.cfg
self.model_path = self.cfg.model.cfg.pretrained_model_path

self.model = self.load_pretrain_weight(self.cfg.model, model_path, mode=mode)
self.model._apply(dist.convert_to_distributed_default_setting)
Expand Down Expand Up @@ -134,6 +147,13 @@ def load_pretrain_weight(
def build_tokenizer(self, cfg):
tokenizer = None
if try_get_key(cfg, "tokenization") is not None:
tokenizer_cfg = cfg.tokenization.tokenizer
if "pretrained_model_path" not in tokenizer_cfg:
# If "pretrained_model_path" does not exist in the tokenizer's config,
# set it to default as f"{model_path}/tokenizer.model"
tokenizer_cfg.pretrained_model_path = str(
Path(self.model_path).joinpath("tokenizer.model")
)
tokenizer = DefaultTrainer.build_tokenizer(cfg)
return tokenizer

Expand Down Expand Up @@ -167,7 +187,9 @@ def to_local(self, model_outputs_dict):
for key, value in model_outputs_dict.items():
if isinstance(value, flow.Tensor) and value.is_global:
model_outputs_dict[key] = dist.ttol(
value, ranks=[0] if value.placement.ranks.ndim == 1 else [[0]]
value,
device=self.device,
ranks=[0] if value.placement.ranks.ndim == 1 else [[0]],
)
if flow.cuda.is_available():
dist.synchronize()
Expand Down
13 changes: 8 additions & 5 deletions libai/inference/generator/generation_beam_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(
do_early_stopping: Optional[bool] = False,
num_beam_hyps_to_keep: Optional[int] = 1,
num_beam_groups: Optional[int] = 1,
device: Optional[str] = "cuda",
**kwargs,
):
self.num_beams = num_beams
Expand All @@ -119,7 +120,7 @@ def __init__(
[False for _ in range(batch_size)],
dtype=flow.bool,
sbp=dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]),
placement=flow.placement("cuda", list(range(dist.get_world_size()))),
placement=flow.placement(device, list(range(dist.get_world_size()))),
)

if not isinstance(num_beams, int) or num_beams <= 1:
Expand Down Expand Up @@ -159,6 +160,7 @@ def process(
pad_token_id: Optional[int] = None,
eos_token_id: Optional[int] = None,
beam_indices: Optional[flow.Tensor] = None,
device: Optional[str] = "cuda",
) -> Tuple[flow.Tensor]:
cur_len = input_ids.shape[-1]
batch_size = len(self._beam_hyps)
Expand All @@ -177,19 +179,19 @@ def process(
(batch_size, self.group_size),
dtype=next_scores.dtype,
sbp=dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]),
placement=flow.placement("cuda", list(range(dist.get_world_size()))),
placement=flow.placement(device, list(range(dist.get_world_size()))),
)
next_beam_tokens = flow.zeros(
(batch_size, self.group_size),
dtype=next_tokens.dtype,
sbp=dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]),
placement=flow.placement("cuda", list(range(dist.get_world_size()))),
placement=flow.placement(device, list(range(dist.get_world_size()))),
)
next_beam_indices = flow.zeros(
(batch_size, self.group_size),
dtype=next_indices.dtype,
sbp=dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]),
placement=flow.placement("cuda", list(range(dist.get_world_size()))),
placement=flow.placement(device, list(range(dist.get_world_size()))),
)

for batch_idx, beam_hyp in enumerate(self._beam_hyps):
Expand Down Expand Up @@ -274,6 +276,7 @@ def finalize(
pad_token_id: Optional[int] = None,
eos_token_id: Optional[int] = None,
beam_indices: Optional[flow.Tensor] = None,
device: Optional[str] = "cuda",
):
batch_size = len(self._beam_hyps)
# finalize all open beam hypotheses and add to generated hypotheses
Expand Down Expand Up @@ -303,7 +306,7 @@ def finalize(
batch_size * self.num_beam_hyps_to_keep,
dtype=flow.float32,
sbp=dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]),
placement=flow.placement("cuda", list(range(dist.get_world_size()))),
placement=flow.placement(device, list(range(dist.get_world_size()))),
)

# retrieve best hypotheses
Expand Down
4 changes: 2 additions & 2 deletions libai/inference/generator/generation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ def greedy_search(

# if eos_token was found in one sentence, set sentence to finished
if eos_token_id is not None:
unfinished_sequences = flow.mul(
unfinished_sequences, (next_tokens != eos_token_id).long()
unfinished_sequences = unfinished_sequences.mul(
next_tokens.ne(eos_token_id).prod(dim=0)
)

if unfinished_sequences.max() == 0 or stopping_criteria(input_ids, scores):
Expand Down
4 changes: 3 additions & 1 deletion libai/models/gpt_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ def forward(self, input_ids, past_length=0):
bsz, seq_length = input_ids.size()

position_ids = self.position_ids[:, past_length : past_length + seq_length]
position_ids = position_ids.expand_as(input_ids).to_global(sbp=input_ids.sbp)
position_ids = position_ids.expand_as(input_ids).to_global(
sbp=input_ids.sbp, placement=input_ids.placement
)

token_embeds = self.token_embeddings(input_ids)
position_embeds = self.position_embeddings(position_ids)
Expand Down
4 changes: 3 additions & 1 deletion libai/models/utils/graph_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ def __init__(
is_train=True,
auto_parallel_conf=None,
global_mode=None,
device="cuda",
):
super().__init__()

self.model = model
self.is_train = is_train
self.global_mode = global_mode
self.device = device

if is_train:
self.add_optimizer(optimizer, lr_sch=lr_scheduler)
Expand Down Expand Up @@ -103,7 +105,7 @@ def build(self, **kwargs):
if self.is_train:
placement_sbp_dict = (
dict(
placement=flow.env.all_device_placement("cuda"),
placement=flow.env.all_device_placement(self.device),
sbp=flow.sbp.split(0),
)
if self.global_mode.enabled
Expand Down
6 changes: 6 additions & 0 deletions libai/models/utils/model_loader/base_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import omegaconf
import oneflow as flow
from safetensors import safe_open
from termcolor import colored
from safetensors import safe_open

Expand Down Expand Up @@ -384,6 +385,11 @@ def _convert_tensor(self, tensor):
Returns:
flow.Tensor: The target tensor.
"""
import torch

if tensor.dtype == torch.bfloat16:
data = tensor.detach().half().cpu().numpy()
return flow.Tensor(data)
return flow.Tensor(tensor.detach().cpu().numpy())

def _convert_tensors(self, torch_state_dict):
Expand Down
14 changes: 10 additions & 4 deletions libai/tokenizer/tokenization_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,9 @@ def convert_tokens_to_ids(self, tokens: Union[str, List[str]]) -> Union[int, Lis
ids.append(self._convert_token_to_id_with_added_voc(token))
return ids

def convert_to_tensors(self, token_ids, return_tensors=None, is_global=False, **kwargs):
def convert_to_tensors(
self, token_ids, return_tensors=None, is_global=False, device="cuda", **kwargs
):
if return_tensors is None:
return_token_ids = token_ids
elif return_tensors == "of":
Expand All @@ -783,7 +785,7 @@ def convert_to_tensors(self, token_ids, return_tensors=None, is_global=False, **
elif is_global:
sbp = kwargs.get("sbp", dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]))
placement = kwargs.get(
"placement", flow.placement("cuda", list(range(dist.get_world_size())))
"placement", flow.placement(device, list(range(dist.get_world_size())))
)
return_token_ids = flow.tensor(
token_ids, sbp=sbp, placement=placement, dtype=flow.long
Expand All @@ -803,14 +805,18 @@ def _convert_token_to_id_with_added_voc(self, token):
def _convert_token_to_id(self, token):
raise NotImplementedError

def encode(self, text, return_tensors=None, is_global=False, **kwargs):
def encode(self, text, return_tensors=None, is_global=False, device="cuda", **kwargs):
if isinstance(text, str):
tokens = self.tokenize(text)
token_ids = self.convert_tokens_to_ids(tokens)
if hasattr(self, "build_inputs_with_special_tokens"):
token_ids = self.build_inputs_with_special_tokens(token_ids)
token_ids = self.convert_to_tensors(
token_ids, return_tensors=return_tensors, is_global=is_global, **kwargs
token_ids,
return_tensors=return_tensors,
is_global=is_global,
device=device,
**kwargs,
)
return token_ids
elif isinstance(text, (list, tuple)) and len(text) > 0 and isinstance(text[0], str):
Expand Down
20 changes: 16 additions & 4 deletions libai/utils/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ def _init_distributed_env(self, cfg):

# Add set device type
self._device_type = try_get_key(cfg, "device_type", default="cuda")
if self._device_type == "npu":
try:
import oneflow_npu # noqa: F401
except ImportError:
raise ImportError("'oneflow_npu' is missing. Install it to use NPU devices.")
elif self._device_type == "xpu":
try:
import oneflow_xpu # noqa: F401
except ImportError:
raise ImportError("'oneflow_xpu' is missing. Install it to use NPU devices.")
elif self._device_type not in ("cuda", "npu", "xpu", "cpu"):
raise NotImplementedError(f"Unsupported device {self._device_type}")

def _init_parallel_size(self, cfg):

Expand Down Expand Up @@ -228,7 +240,7 @@ def device_type(self):
return self._device_type

def set_device_type(self, device_type):
assert device_type in ["cpu", "cuda"], f"not supported for {device_type}"
# assert device in ["cpu", "cuda"], f"not supported for device:{device}"
self._device_type = device_type

def get_layer_ranks(self, layer_idx):
Expand Down Expand Up @@ -435,10 +447,10 @@ def convert_to_distributed_default_setting(t):
return t.to_global(placement=flow.placement(device_type, ranks=t.placement.ranks))


def ttol(tensor, pure_local=False, ranks=None):
def ttol(tensor, pure_local=False, device="cuda", ranks=None):
"""Global tensor to local tensor."""
if tensor.is_global:
placement = tensor.placement if not ranks else flow.placement("cuda", ranks)
placement = tensor.placement if not ranks else flow.placement(device, ranks)
if pure_local:
tensor = tensor.to_global(placement=placement).to_local()
else:
Expand All @@ -459,7 +471,7 @@ def tton(tensor, local_only=False, ranks=None):

def tensor_to_rank0(tensor, device="cuda", to_local=False):
"""Global tensor to rank0."""
assert device in ["cpu", "cuda"], f"not supported for device:{device}"
# assert device in ["cpu", "cuda"], f"not supported for device:{device}"
if tensor.is_global:
# Consider if it's 2d mesh, ranks should be [[0]] instead of [0]
placement = flow.placement(device, ranks=[0] if tensor.placement.ranks.ndim == 1 else [[0]])
Expand Down
58 changes: 58 additions & 0 deletions projects/Aquila/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@


## Aquila
### 推理
- cuda

```bash
python projects/Aquila/pipeline.py --model_path=/root/models/Aquila-7B --mode=huggingface
```

- npu

```bash
python projects/Aquila/pipeline.py --model_path=/data0/hf_models/Aquila-7B --mode=huggingface --device=npu
```

- xpu

```bash
python projects/Aquila/pipeline.py --model_path=/root/models/Aquila-7B --mode=huggingface --device=xpu
```

### 训练
- data preparation
```bash
python projects/Aquila/utils/data_prepare.py
```
- cuda
```bash
export NUM_GPUS=4
python3 -m oneflow.distributed.launch \
--nproc_per_node ${NUM_GPUS} \
--nnodes 1 \
--node_rank 0 \
--master_addr 127.0.0.1 \
--master_port 12345 \
tools/train_net.py --config-file=projects/Aquila/configs/aquila_sft.py \
graph.enabled=True \
train.input_placement_device="cuda" \
train.dist.device_type="cuda" \
train.dist.pipeline_parallel_size=${NUM_GPUS}
```

- xpu
```bash
export NUM_GPUS=1
python3 -m oneflow.distributed.launch \
--nproc_per_node ${NUM_GPUS} \
--nnodes 1 \
--node_rank 0 \
--master_addr 127.0.0.1 \
--master_port 12345 \
tools/train_net.py --config-file=projects/Aquila/configs/aquila_sft.py \
graph.enabled=False \
train.input_placement_device="xpu" \
train.dist.device_type="xpu" \
train.dist.pipeline_parallel_size=${NUM_GPUS}
```
Loading

0 comments on commit 4c803dc

Please sign in to comment.