Skip to content

Commit

Permalink
Merge pull request #555 from workingloong/reduce_pending_resource
Browse files Browse the repository at this point in the history
Reduce the resource if the pending time is too long.
  • Loading branch information
samplise authored Aug 1, 2023
2 parents 00cfa3b + 5431e3e commit fe945e5
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 22 deletions.
6 changes: 6 additions & 0 deletions dlrover/python/common/global_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ConfigKeys(object):
SECONDS_FOR_STABLE_WORKER_COUNT = "seconds_for_stable_worker_count"
SECONDS_INTERVAL_TO_OPTIMIZE = "seconds_interval_to_optimize"
FACTOR_TO_CUT_PENDING_CPU = "factor_to_cut_pending_cpu"
FACTOR_TO_CUT_PENDING_MEM = "factor_to_cut_pending_mem"
SECONDS_TO_WAIT_PENDING_POD = "seconds_to_wait_pending_pod"
SECONDS_HUGE_TRAINING_THRESHOLD = "seconds_huge_training_threshold"
GLOBAL_STEP_COUNT_TO_AUTO_WORKER = "global_step_count_to_auto_worker"
Expand All @@ -42,6 +43,7 @@ class DefaultConfigValues(object):
DEFAULT_SECONDS_FOR_STABLE_WORKER_COUNT = 60
DEFAULT_SECONDS_INTERVAL_TO_OPTIMIZE = 300
DEFAULT_FACTOR_TO_CUT_PENDING_CPU = 2
DEFAULT_FACTOR_TO_CUT_PENDING_MEM = 2
DEFAULT_SECONDS_TO_WAIT_PENDING_POD = 900 # 15min
DEFAULT_SECONDS_HUGE_TRAINING_THRESHOLD = 1800 # 30min
DEFALUT_GLOBAL_STEP_COUNT_TO_AUTO_WORKER = 5
Expand Down Expand Up @@ -81,6 +83,10 @@ def __init__(self):
ConfigKeys.FACTOR_TO_CUT_PENDING_CPU,
DefaultConfigValues.DEFAULT_FACTOR_TO_CUT_PENDING_CPU,
)
self.factor_to_cut_pending_mem = self.get_param_value_from_brain(
ConfigKeys.FACTOR_TO_CUT_PENDING_MEM,
DefaultConfigValues.DEFAULT_FACTOR_TO_CUT_PENDING_MEM,
)
self.seconds_to_wait_pending_pod = self.get_param_value_from_brain(
ConfigKeys.SECONDS_TO_WAIT_PENDING_POD,
DefaultConfigValues.DEFAULT_SECONDS_TO_WAIT_PENDING_POD,
Expand Down
8 changes: 5 additions & 3 deletions dlrover/python/common/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ def find_free_port(port=0):


def find_free_port_in_range(start, end):
for i in range(start, end):
for port in range(start, end):
try:
return find_free_port(i)
return find_free_port(port)
except OSError as e:
logger.info("Socket creation attempt failed.", exc_info=e)
logger.info(
f"Socket creation attempt failed with {port}.", exc_info=e
)
return RuntimeError(f"Fail to find a free port in [{start}, {end})")


Expand Down
2 changes: 2 additions & 0 deletions dlrover/python/common/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ def get_relaunch_node_info(self, new_id):
new_node.name = None
new_node.status = NodeStatus.INITIAL
new_node.start_time = None
new_node.create_time = None
new_node.finish_time = None
new_node.is_released = False
new_node.relaunchable = True
new_node.init_time = time.time()
Expand Down
28 changes: 23 additions & 5 deletions dlrover/python/master/node/job_auto_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ def __init__(
self._scaler = node_scaler
self._job_nodes = job_nodes
self._autoscaling_started = False
threading.Thread(
target=self.monitor_pending_node_at_begining,
name="monitor_pending_nodes",
daemon=True,
).start()

def monitor_pending_node_at_begining(self):
logger.info("Start monitoring pending nodes.")
while True:
if self._autoscaling_started:
logger.info("Stop mointoring pending nodes.")
break
self._reduce_timeout_pending_node_resource()
time.sleep(60)

def start_auto_scaling(self):
"""Start to auto-scale nodes to improve the training throughput."""
Expand Down Expand Up @@ -145,7 +159,7 @@ def _periodic_optimize_running_resource(self):
opt_interval = _dlrover_context.seconds_interval_to_optimize
while True:
if self._stop_autoscaling:
logger.info("Stop auto-scaling PS Trainign.")
logger.info("Stop auto-scaling PS Training.")
break
if (
self._speed_monitor.worker_adjustment_finished()
Expand All @@ -157,7 +171,6 @@ def _periodic_optimize_running_resource(self):
if plan:
last_plan_time = time.time()
self.execute_job_optimization_plan(plan)
self._cut_timeout_pending_node_cpu()
time.sleep(30)

def execute_job_optimization_plan(self, plan: ResourcePlan):
Expand Down Expand Up @@ -217,14 +230,19 @@ def _migrate_nodes(self, node_resources: Dict[str, NodeResource]):
logger.info("Migration plan = %s", scale_plan.toJSON())
return scale_plan

def _cut_timeout_pending_node_cpu(self):
def _reduce_timeout_pending_node_resource(self):
"""Cut down CPU cores of pending pod at the job starts"""
if self._autoscaling_started:
return
scale_plan = ScalePlan()
if _dlrover_context.auto_ps_enabled:
self._ps_manager.cut_pending_node_cpu()
plan = self._ps_manager.reduce_pending_node_resource()
scale_plan.merge(plan)
if _dlrover_context.auto_worker_enabled:
self._worker_manager.cut_pending_node_cpu()
plan = self._worker_manager.reduce_pending_node_resource()
scale_plan.merge(plan)
self._scaler.scale(scale_plan)
return scale_plan


class AllreduceTrainingAutoScaler(JobAutoScaler):
Expand Down
36 changes: 26 additions & 10 deletions dlrover/python/master/node/training_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ def get_critical_worker_index(params: JobArgs):
return critical_worker_index


def cut_timeout_pending_node_cpu(node: Node):
"""Cut down CPU cores and relaunch it if the pending
def reduce_timeout_pending_node_resource(node: Node):
"""Reduce CPU cores or memroy and relaunch it if the pending
time is too long"""
now = time.time()
if node.is_released or not node.create_time:
Expand All @@ -117,6 +117,7 @@ def cut_timeout_pending_node_cpu(node: Node):
new_cpu = math.ceil(
original_cpu / _dlrover_context.factor_to_cut_pending_cpu
)
reduced = False
if new_cpu > NodeResourceLimit.MIN_CPU_CORES:
node.config_resource.cpu = new_cpu
logger.info(
Expand All @@ -127,9 +128,23 @@ def cut_timeout_pending_node_cpu(node: Node):
_dlrover_context.seconds_to_wait_pending_pod,
new_cpu,
)
return True
else:
return False
reduced = True
original_memory = node.config_resource.memory
new_memory = math.ceil(
original_memory / _dlrover_context.factor_to_cut_pending_mem
)
if new_memory > NodeResourceLimit.MIN_MEMORY:
node.config_resource.memory = new_memory
logger.info(
"Pod %s pending time %s beyonds %s."
"Delete and relaunch it with memory %s",
node.name,
pending_time,
_dlrover_context.seconds_to_wait_pending_pod,
new_memory,
)
reduced = True
return reduced


class TrainingNodeManager(object):
Expand Down Expand Up @@ -193,16 +208,17 @@ def relaunch_node(self, node: Node):
)
return plan

def cut_pending_node_cpu(self):
def reduce_pending_node_resource(self):
"""Cut down CPU cores of pendding PS Pods"""
plan = ScalePlan()
nodes = copy.deepcopy(self._nodes)
for node in nodes.values():
cur_nodes = list(self._nodes.values())
for node in cur_nodes:
if node.status == NodeStatus.PENDING:
cut_cpu = cut_timeout_pending_node_cpu(node)
if cut_cpu:
reduced = reduce_timeout_pending_node_resource(node)
if reduced:
node.relaunchable = False
node_plan = self.relaunch_node(node)
plan.remove_nodes.append(node)
plan.merge(node_plan)
return plan

Expand Down
6 changes: 5 additions & 1 deletion dlrover/python/master/resource/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def _init_job_resource_by_optimizer(self):
plan = self._resource_optimizer.generate_opt_plan(self._job_stage)
if not plan or plan.empty():
logger.info("Use the default plan to start the job")
plan = ResourcePlan.new_default_plan()
plan = self._gen_default_resource_plan()
self._job_stage = JobOptStage.WORKER_INITIAL

if (
Expand Down Expand Up @@ -260,6 +260,10 @@ def _init_job_resource_by_optimizer(self):
ps_resource.node_resource.memory,
)

def _gen_default_resource_plan(self):
plan = ResourcePlan.new_default_plan()
return plan

def init_job_resource(self, job_resource: JobResource):
"""Adjust the initial resource of typed pods by EasyDL.
Args:
Expand Down
1 change: 1 addition & 0 deletions dlrover/python/master/watcher/k8s_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def _convert_pod_event_to_node_event(event):
host_name=host_name,
host_ip=host_ip,
)
node.create_time = evt_obj.metadata.creation_timestamp
node.set_exit_reason(_get_pod_exit_reason(evt_obj))
node_event = NodeEvent(event_type=evt_type, node=node)
return node_event
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/tests/test_ps_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_cut_pending_ps_cpu(self):
node.status = NodeStatus.PENDING
node.create_time = datetime.now() + timedelta(days=-1)

plan = self._ps_manager.cut_pending_node_cpu()
plan = self._ps_manager.reduce_pending_node_resource()
self.assertEqual(len(plan.launch_nodes), 2)
self.assertEqual(plan.launch_nodes[0].config_resource.cpu, 8)
self.assertEqual(plan.launch_nodes[0].config_resource.memory, 2048)
Expand Down
4 changes: 2 additions & 2 deletions dlrover/python/tests/test_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_relaunch_chief_node(self):
self.assertEqual(plan.launch_nodes[0].config_resource.cpu, 16)
self.assertEqual(manager._nodes[1].id, 1)

def test_cut_pending_node_cpu(self):
def test_reduce_pending_node_resource(self):
worker_manager = WorkerManager(
self._job_nodes[NodeType.WORKER],
self._job_resource,
Expand All @@ -134,7 +134,7 @@ def test_cut_pending_node_cpu(self):
for node in worker_manager._nodes.values():
node.status = NodeStatus.PENDING
node.create_time = datetime.now() + timedelta(days=-1)
plan = worker_manager.cut_pending_node_cpu()
plan = worker_manager.reduce_pending_node_resource()
self.assertEqual(len(plan.launch_nodes), 5)

def test_pending_without_workers(self):
Expand Down

0 comments on commit fe945e5

Please sign in to comment.