Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Pollux scheduler #9

Open
wants to merge 51 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e88465d
create pollux.py pollux_scheduler.py framework
yuxianshi Apr 10, 2024
98fd651
Add code for tracking and using job_infos and allocations as in Pollu…
yuxianshi Apr 15, 2024
e3a3910
add cluster_state.allocations
yuxianshi Apr 15, 2024
9b8a395
wrap some code into function update_allocation_info
yuxianshi Apr 15, 2024
a0ef639
minor changes
yuxianshi Apr 15, 2024
ff209d5
minor changes
yuxianshi Apr 15, 2024
cdf3c9c
obtain to_suspend
yuxianshi Apr 15, 2024
c05ed2d
Convert node_info of Blox to nodes map of Pollux
manasdtrivedi Apr 15, 2024
322ebc0
Merge job_info and node_info changes
manasdtrivedi Apr 15, 2024
f1f710d
allow sending model name in workload.py
yuxianshi Apr 22, 2024
7488bb9
Fix imports, cluster state bug
manasdtrivedi Apr 23, 2024
90cfb88
add pollux trace to workload_pollux
yuxianshi Apr 23, 2024
895b4af
change default scheduler in simulator_simple.py to Pollux
yuxianshi Apr 23, 2024
5543c4a
Add requirements.txt
manasdtrivedi Apr 23, 2024
926deff
add calculations of "to_suspend" and "to_launch" in pollux.py
yuxianshi Apr 25, 2024
a2d9529
add simulation args to match Pollux simulation setting
yuxianshi Apr 26, 2024
ece995f
minor change
yuxianshi Apr 26, 2024
71890e3
Add workload files
manasdtrivedi Apr 27, 2024
bbf3873
debugging pollux.py
yuxianshi Apr 29, 2024
6660fa3
minor change
yuxianshi Apr 29, 2024
a80477c
debugging: job_ids_to_track & parse_philly_jobs
yuxianshi Apr 29, 2024
97dc150
set job completion by target epoch for pollux
yuxianshi Apr 29, 2024
232493c
minor change
yuxianshi Apr 29, 2024
df69651
Use numCPUcores parameter when initializing Pollux's nodes
manasdtrivedi Apr 29, 2024
95a80b1
run.sh
yuxianshi Apr 29, 2024
44737af
debugging: prune "allocations"
yuxianshi Apr 30, 2024
4fc7cde
debugging: solve keyerror in calculation of "to_launch" in pollux.py
yuxianshi Apr 30, 2024
b778378
debugging: address inifinite loop issue
yuxianshi Apr 30, 2024
a759d43
debugging: address infinit loop
yuxianshi Apr 30, 2024
c455a82
debugging: remove Job object from job_state.job_runtime_stats
yuxianshi Apr 30, 2024
a07e126
result: interval = 1min
yuxianshi Apr 30, 2024
ef2daa6
result: 2min and 8min
yuxianshi Apr 30, 2024
06d235b
result: 4 min
manasdtrivedi Apr 30, 2024
f05dbd0
result: 30s interval
yuxianshi Apr 30, 2024
83d2e49
Add adaptdl JCT data
manasdtrivedi May 1, 2024
d8789fe
Add plotting, update requirements
manasdtrivedi May 1, 2024
253689f
pollux reproduce results: 4m & 8m
yuxianshi May 1, 2024
5a39fe6
plot: using reproduced result of pollux-author version 4m and 8m
yuxianshi May 1, 2024
3a30ccc
plot: match color in poster
yuxianshi May 1, 2024
bf227fe
minor change
yuxianshi May 1, 2024
d07f94a
reprocuding pollux result: 1m & 2m
yuxianshi May 1, 2024
630c395
reprocuding pollux result: 30s
yuxianshi May 2, 2024
40ed3c0
minor change
yuxianshi May 7, 2024
95cb545
update plot to use reproduced Pollux author implementation result
yuxianshi May 7, 2024
a7e21b4
minor changes
yuxianshi May 12, 2024
af9d32e
debugging: fix current time in Pollux Job object (inside job_state)
yuxianshi May 12, 2024
9b21dfa
simulation: update result
yuxianshi May 13, 2024
9921089
minor change
yuxianshi May 13, 2024
3bda855
Remove debug comments
manasdtrivedi May 21, 2024
9aed55c
Refactoring: file names and directory structure
manasdtrivedi May 21, 2024
e69871b
Update installation and execution instructions
manasdtrivedi May 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ __pycache__/
*pb2*.py

*.pickle
*.DS_Store

.idea/
venv/

19 changes: 19 additions & 0 deletions INSTALL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
We run Blox inside a virtual environment. We recommend using [Anaconda](https://docs.anaconda.com/free/anaconda/install/index.html) for the same. Once you've installed Anaconda, ensure that the conda command works by running ```conda -V```, which should print the Anaconda version.

Next, we create a virtual environment called ```bloxenv```. Run the following command:

```
conda create -n bloxenv python=3.8
```

To activate the environment, run the following command:

```
conda activate bloxenv
```

Now, ```cd``` into the blox repository. You should see a ```requirements.txt``` file. We will use this file to install blox's dependencies by running the following command:

```
python -m pip install -r requirements.txt
```
28 changes: 28 additions & 0 deletions blox/blox_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ def update_metrics(self, cluster_state, job_state):
ipaddress_to_fetch_from = list()
if_simulation = list()


# update Pollux specific metrics
if job_state.scheduler_name == "Pollux":
# find nodes used by more than one jobs
interfere_nodes = set(idx for idx in range(cluster_state.node_counter)
if sum(len(set(val)) > 1 and idx in val
for key, val in cluster_state.allocations.items()) > 1)

# update job_state.active_jobs["tracked_metrics"]["pollux_metrics"]
for jid in job_state.active_jobs:
job = job_state.active_jobs[jid]["tracked_metrics"]["pollux_metrics"]
alloc_set = set(cluster_state.allocations.get(job.name, []))
interference = 0.0
if len(alloc_set) > 1 and any(idx in interfere_nodes for idx in alloc_set):
interference = job_state.interference
job.step(job_state.round_duration, interference=interference)

# this dict is the same of self.allocation in Pollux repo
cluster_state.allocations = {k: v for k, v in cluster_state.allocations.items() if
k in job_state.active_jobs}

for jid in job_state.active_jobs:
if job_state.active_jobs[jid]["is_running"] == True:
job_id_to_fetch.append(jid)
Expand Down Expand Up @@ -161,6 +182,13 @@ def update_metrics(self, cluster_state, job_state):
job_state.job_runtime_stats[jid] = copy.deepcopy(
job_state.active_jobs[jid]
)
# track completion_time and submission_time as maintained in the Pollux Job object
if job_state.scheduler_name == "Pollux":
del job_state.job_runtime_stats[jid]["tracked_metrics"]["pollux_metrics"]
job_state.job_runtime_stats[jid]["completion_time_pollux"] = \
job_state.active_jobs[jid]["tracked_metrics"]["pollux_metrics"].completion_time
job_state.job_runtime_stats[jid]["submission_time_pollux"] = \
job_state.active_jobs[jid]["tracked_metrics"]["pollux_metrics"].submission_time

jid_to_terminate.append(jid)
# delete GPU utilization
Expand Down
2 changes: 2 additions & 0 deletions blox/cluster_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def __init__(self, args: argparse.ArgumentParser) -> None:
self.time = 0
self.cluster_stats = dict()

self.allocations = dict() # added for Pollux

# def get_new_nodes(self):
# """
# Fetch any new nodes which have arrived at the scheduler
Expand Down
8 changes: 5 additions & 3 deletions blox/deployment/Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
grpc:
grpc: setup
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/rm.proto
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/nm.proto
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/simulator.proto
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/frontend.proto
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/backend.proto
# I get confused between grpc and rcp
rpc:
# I get confused between grpc and rpc
rpc: setup
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/rm.proto
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/nm.proto
python -m grpc_tools.protoc -I./grpc_proto --python_out=./grpc_stubs --grpc_python_out=./grpc_stubs ./grpc_proto/simulator.proto
Expand All @@ -14,3 +14,5 @@ rpc:
clean:
rm ./grpc_stubs/*_pb2.py
rm ./grpc_stubs/*_pb2_grpc.py
setup:
mkdir -p grpc_stubs
5 changes: 4 additions & 1 deletion blox/deployment/grpc_client_rm.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ def get_metrics(
]
+ active_job_dict[job_id]["job_executed_iteration"]
)
if (
if os.environ["sched_policy"] == "Pollux":
if active_job_dict[job_id]["tracked_metrics"]["pollux_metrics"].completion_time is not None:
job_exit = True
elif (
total_iteration_achieved
>= active_job_dict[job_id]["job_total_iteration"]
):
Expand Down
39 changes: 38 additions & 1 deletion blox/job_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from concurrent import futures

from typing import Tuple, List
from schedulers.pollux_lib.job import *
from schedulers.pollux_lib.applications import *

from blox_manager import BloxManager

Expand All @@ -26,16 +28,39 @@ def __init__(self, args: argparse.ArgumentParser):
# self.blr = blox_resource_manager
# dict of active jobs
self.active_jobs = dict()
"""
active_jobs is a dict that maps jid -> job info, which itself is a dict that includes:
"tracked_metrics": dict
"pollux_metrics": job.Job() object for pollux job # XY added
"per_iter_time": float - mean updated
"attained_service": float - additively updated
"iter num": int - additively updated
"attained_service_scheduler": - updated using "round_duration" * "numGPUs"
"job_exit": optional(boolean)
"time_since_scheduled": int
"job_priority": int
"previously_launched": boolean
"is_running": boolean
"suspended": int
"simulation": boolean
"running_ip_address":
"num_GPUs"
"submit_time"
"""
# count number of accepted jobs
self.job_counter = 0
self.job_completion_stats = dict()
self.job_completion_stats = dict() # used when job finishes
self.job_responsiveness_stats = dict()
self.cluster_stats = dict()
self.custom_metrics = dict()
self.job_runtime_stats = dict()
self.finished_job = dict() # keys are ids of the jobs which have finished
self.job_ids_to_track = list(range(args.start_id_track, args.stop_id_track + 1))
self.time = 0
self.scheduler_name = args.scheduler_name
if self.scheduler_name == "Pollux":
self.interference = args.interference
self.round_duration = args.round_duration

# def get_new_jobs(self):
# """
Expand Down Expand Up @@ -119,6 +144,18 @@ def add_new_jobs(self, new_jobs: List[dict]) -> None:
tracking_dict[p] = v
jobs["tracked_metrics"] = tracking_dict

if self.scheduler_name == "Pollux":
"""
Create pollux.job.Job object, decide how to refer to model name, the options of which include
"bert", "cifar10", "ncf", "imagenet", "deepspeech2", "yolov3"
"""
print(jobs)
job_temp = Job(self.job_counter, APPLICATIONS[jobs["application"]],
jobs["job_arrival_time"], self.time)
if job_temp.application.name == "ncf":
job_temp.target_batch_size = 32768
jobs["tracked_metrics"]["pollux_metrics"] = job_temp

jobs["time_since_scheduled"] = 0
jobs["job_priority"] = 999
jobs["previously_launched"] = False
Expand Down
4 changes: 4 additions & 0 deletions blox/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def prune_jobs(job_state: JobState, cluster_state: ClusterState, blr: BloxManage
job_state.job_runtime_stats[jid] = copy.deepcopy(
job_state.active_jobs[jid]
)
if job_state.scheduler_name == "Pollux":
del job_state.job_runtime_stats[jid]["tracked_metrics"]["pollux_metrics"]
job_state.job_runtime_stats[jid]["completion_time_pollux"] = job_state.active_jobs[jid]["tracked_metrics"]["pollux_metrics"].completion_time
job_state.job_runtime_stats[jid]["submission_time_pollux"] = job_state.active_jobs[jid]["tracked_metrics"]["pollux_metrics"].submission_time

jid_to_terminate.append(jid)
# delete GPU utilization
Expand Down
22 changes: 22 additions & 0 deletions jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def __init__(
job_queueing_delay=0,
cluster_id=0,
job_priority=0,
name=None,
application=None,
target_num_replicas=None,
target_batch_size=None,
iter_is_duration=False):
# logger handle
self.logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -93,6 +97,24 @@ def __init__(
self.lease_extended = False
self.job_command = None

# Pollux Params
self.name = name
self.application = application
self.target_num_replicas = target_num_replicas
self.target_batch_size = target_batch_size
self.completion_time = None
self.current_time = 0
self.rescale_time = 0
self.placement = ()
self.atomic_bsz = 0
self.accum_steps = 0
self.profile = {}
self.perf_params = None
self.grad_params = None
self.best_metric = None
self.progress = 0.0
self.epoch = 0

self.cpu_val = {0:1, 1:2, 2:3, 3:4, 4:5, 5:6, 6:9, 7:12, 8:24}
self.mem_val = {0:20.83, 1:62.5, 2:125, 3:187.5, 4:250}

Expand Down
Loading