Skip to content

Draft: Megatron (updated) #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/configs/grpo_math_1B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ checkpointing:
save_period: 10

policy:
training_backend: "hf"
# Qwen/Qwen2.5-1.5B has tied weights which are only supported with dtensor policy with tp size 1 (https://github.com/NVIDIA/nemo-rl/issues/227)
model_name: "Qwen/Qwen2.5-1.5B"
tokenizer:
Expand Down
132 changes: 132 additions & 0 deletions examples/configs/grpo_math_1B_megatron.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# GRPO Algorithm Configuration
defaults: "grpo_math_1B.yaml"

grpo:
num_prompts_per_step: 32
num_generations_per_prompt: 16
max_num_steps: 1000000
normalize_rewards: true
use_leave_one_out_baseline: true
val_period: 10
val_at_start: false
max_val_samples: 256
val_batch_size: 256

loss_fn:
reference_policy_kl_penalty: 0.01
ratio_eps_min: 0.2
ratio_eps_max: 0.2
# (default off) loss formulation improvements (docs/guides/grpo.md#loss)
use_on_policy_kl_approximation: false
use_importance_sampling_correction: false

checkpointing:
enabled: false
checkpoint_dir: "results/grpo_megatron"
metric_name: "val_reward"
higher_is_better: true
keep_top_k: 3
save_period: 10

policy:
training_backend: "megatron"
model_name: "Qwen/Qwen2.5-1.5B-Instruct"
tokenizer:
name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default
train_global_batch_size: 512
train_micro_batch_size: 2
generation_batch_size: 64 # Only used when generating using megatron backend
logprob_batch_size: 4
max_total_sequence_length: 512
precision: "bfloat16"
tensor_model_parallel_size: 1
pipeline_model_parallel_size: 1
pipeline_dtype: "float32"
context_parallel_size: 1
refit_buffer_size_gb: 4 # used for refitting inference engine, the unit is GB

dtensor_cfg:
enabled: false

max_grad_norm: 1.0
# makes the training sequence length divisible by the tensor parallel size
# this is useful for sequence parallel training
make_sequence_length_divisible_by: ${policy.tensor_model_parallel_size}

optimizer: null # remove default FSDP optimizer

megatron_cfg:
enabled: true
empty_unused_memory_level: 1
converter_type: "Qwen2ForCausalLM"

optimizer:
optimizer: "adam"
lr: 5.0e-6
min_lr: 5.0e-7
weight_decay: 0.01
bf16: false
fp16: false
params_dtype: "float32"

#adam
adam_beta1: 0.9
adam_beta2: 0.999
adam_eps: 1e-8

#sgd
sgd_momentum: 0.9

#distributed optimizer
use_distributed_optimizer: true
use_precision_aware_optimizer: true

clip_grad: ${policy.max_grad_norm}

scheduler:
start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay}
end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay}
weight_decay_incr_style: "constant"
lr_decay_style: "constant"
lr_decay_iters: null
lr_warmup_iters: 50
lr_warmup_init: 5.0e-7

generation:
backend: "vllm"
max_new_tokens: ${policy.max_total_sequence_length}
temperature: 1.0
top_p: 1.0
top_k: null
vllm_cfg:
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}

data:
max_input_seq_length: ${policy.max_total_sequence_length} # upper bound, real truncation occurs at vllm.max_model_len
prompt_file: "examples/prompts/cot.txt"
system_prompt_file: null
dataset_name: "OpenMathInstruct-2"

env:
math:
num_workers: 8

logger:
log_dir: "logs" # Base directory for all logs
num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal
wandb_enabled: false
tensorboard_enabled: false
monitor_gpus: false # If true, will monitor GPU usage and log to wandb and/or tensorboard
wandb:
project: "grpo-dev"
name: "sj_megatron_1B"
tensorboard: {}
gpu_monitoring:
collection_interval: 10 # How often to collect GPU usage metrics (in seconds)
flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds)

cluster:
gpus_per_node: 1
num_nodes: 1
49 changes: 31 additions & 18 deletions nemo_rl/algorithms/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from nemo_rl.models.interfaces import PolicyInterface
from nemo_rl.models.policy import PolicyConfig
from nemo_rl.models.policy.hf_policy import HfPolicy
from nemo_rl.models.policy.megatron_policy import MegatronPolicy
from nemo_rl.utils.checkpoint import CheckpointingConfig, CheckpointManager
from nemo_rl.utils.logger import (
Logger,
Expand Down Expand Up @@ -198,7 +199,7 @@ def setup(
# Cluster
# ==========================
print("\n▶ Setting up compute cluster...")
colocated_inference = generation_config["backend"] != "hf"
colocated_inference = generation_config["backend"] not in ["hf", "megatron"]
cluster = RayVirtualCluster(
name="grpo_policy_cluster",
bundle_ct_per_node_list=[cluster_config["gpus_per_node"]]
Expand All @@ -218,30 +219,42 @@ def setup(
backend = generation_config["backend"]
generation_config["model_name"] = policy_config["model_name"] # Needed for vLLM

if backend == "hf":
if backend in ["hf", "megatron"]:
policy_generation = None
print(f" ✓ Using HF backend for generation with {policy_config['model_name']}")
elif backend == "vllm":
policy_generation = VllmGeneration(cluster=cluster, config=generation_config)
# Worker groups are not initialized until the first call to run something on workergroups.
# vllm 0.8 fails in initialization if its called in the first training step since it has no clean view of the GPU memory (HF is sharing the same memory).
policy_generation.finish_generation()
print(
f" ✓ Using vLLM backend for generation with {policy_config['model_name']}"
else:
raise ValueError(f"Unknown generation backend: {backend}")
print(f" ✓ Using {backend} for generation with {policy_config['model_name']}")

if policy_config["training_backend"] == "hf":
policy = HfPolicy(
cluster=cluster,
config=policy_config,
tokenizer=tokenizer,
weights_path=Path(last_checkpoint_path) / "policy" / "weights"
if last_checkpoint_path
else None,
optimizer_path=Path(last_checkpoint_path) / "policy" / "optimizer"
if last_checkpoint_path
else None,
init_optimizer=True,
)
elif policy_config["training_backend"] == "megatron":
policy = MegatronPolicy(
cluster=cluster,
config=policy_config,
tokenizer=tokenizer,
init_optimizer=True,
init_reference_model=True,
)
else:
raise ValueError(
f"Unknown training backend: {policy_config['training_backend']}"
)

policy = HfPolicy(
cluster=cluster,
config=policy_config,
tokenizer=tokenizer,
weights_path=Path(last_checkpoint_path) / "policy" / "weights"
if last_checkpoint_path
else None,
optimizer_path=Path(last_checkpoint_path) / "policy" / "optimizer"
if last_checkpoint_path
else None,
init_optimizer=True,
)

loss_fn = ClippedPGLossFn(loss_config)

Expand Down
16 changes: 14 additions & 2 deletions nemo_rl/algorithms/loss_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Tuple, TypedDict
from typing import Any, Optional, Tuple, TypedDict

import torch

Expand All @@ -21,6 +21,7 @@
masked_mean,
)
from nemo_rl.distributed.batched_data_dict import BatchedDataDict
from nemo_rl.distributed.model_utils import from_parallel_logits_to_logprobs
from nemo_rl.models.dtensor.parallelize import (
get_logprobs_from_vocab_parallel_logits,
)
Expand Down Expand Up @@ -90,6 +91,8 @@ def __call__(
self,
next_token_logits: torch.Tensor,
data: BatchedDataDict[ClippedPGLossDataDict],
vocab_parallel_rank: Optional[int] = None,
vocab_parallel_group: Optional[torch.distributed.ProcessGroup] = None,
) -> Tuple[torch.Tensor, dict]:
"""Clipped Policy Gradient RL loss function."""
token_mask = data["token_mask"][:, 1:]
Expand All @@ -109,7 +112,16 @@ def __call__(

next_token_logits = next_token_logits.to(torch.float32)

if isinstance(next_token_logits, torch.distributed.tensor.DTensor):
if vocab_parallel_group is not None:
curr_logprobs = from_parallel_logits_to_logprobs(
next_token_logits,
data["input_ids"],
vocab_start_index=vocab_parallel_rank * next_token_logits.shape[-1],
vocab_end_index=(vocab_parallel_rank + 1) * next_token_logits.shape[-1],
group=vocab_parallel_group,
inference_only=False,
)
elif isinstance(next_token_logits, torch.distributed.tensor.DTensor):
curr_logprobs = get_logprobs_from_vocab_parallel_logits(
next_token_logits, data["input_ids"]
)
Expand Down
Loading
Loading