Skip to content

feat: use async vllm engine (only used in unit tests) #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/configs/eval.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ generation:
stop_token_ids: null
stop_strings: null
vllm_cfg:
async_engine: false
precision: "bfloat16"
tensor_parallel_size: 1
gpu_memory_utilization: 0.9
Expand Down
1 change: 1 addition & 0 deletions examples/configs/grpo-deepscaler-1.5b-8K.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ policy:
stop_token_ids: null
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
Expand Down
1 change: 1 addition & 0 deletions examples/configs/grpo_math_1B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ policy:
stop_token_ids: null
stop_strings: null
vllm_cfg:
async_engine: false # Only for internal testing, will be enabled by https://github.com/NVIDIA/NeMo-RL/issues/447.
precision: ${policy.precision}
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
Expand Down
1 change: 1 addition & 0 deletions examples/configs/grpo_sliding_puzzle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ policy:
stop_token_ids: null
stop_strings: null
vllm_cfg:
async_engine: false
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ policy:
stop_token_ids: null
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ policy:
stop_token_ids: null
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 4
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ policy:
- 128009
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ policy:
- 128009
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ policy:
- 151643
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 4
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ policy:
- 151643
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 4
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ policy:
- 151645
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ policy:
- 151645
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 4
gpu_memory_utilization: 0.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ policy:
- 151645
stop_strings: null
vllm_cfg:
async_engine: false
precision: ${policy.precision}
tensor_parallel_size: 1
gpu_memory_utilization: 0.6
Expand Down
29 changes: 26 additions & 3 deletions nemo_rl/distributed/worker_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,36 @@ def get_results(self, worker_group: "RayWorkerGroup") -> list[Any]:
worker group each worker belongs to, then selects only the first result from each group.

Args:
worker_group: The RayWorkerGroup that created this bundle
worker_group: The RayWorkerGroup that spawned the futures. The
mapping contained in worker_group.worker_to_tied_group_index
is required for the deduplication path.

Returns:
List of results, deduplicated by tied workers if respect_tied_workers is True
"""
# Basic case: Get all results
all_results = ray.get(self.futures)
from ray._raylet import ObjectRef, ObjectRefGenerator

# Flatten futures into a list of ObjectRefs
object_refs: list[ObjectRef] = []

has_generator = False

for idx, fut in enumerate(self.futures):
if isinstance(fut, ObjectRefGenerator):
# ray.get cannot be called directly on the generator object – it must be iterated to obtain the individual ObjectRef instances first.
for generated_ref in fut:
object_refs.append(generated_ref)
has_generator = True
else:
object_refs.append(fut)

# Retrieve the concrete results.
all_results = ray.get(object_refs)

# If expanded generator was present we are in streaming mode.
# Every ObjectRef now corresponds to a unique, ordered chunk of data
if has_generator:
return all_results

if self.return_from_workers is not None:
if self.called_workers is not None:
Expand Down
Loading
Loading