Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce the resource if the pending time is too long. #555

6 changes: 6 additions & 0 deletions dlrover/python/common/global_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ConfigKeys(object):
SECONDS_FOR_STABLE_WORKER_COUNT = "seconds_for_stable_worker_count"
SECONDS_INTERVAL_TO_OPTIMIZE = "seconds_interval_to_optimize"
FACTOR_TO_CUT_PENDING_CPU = "factor_to_cut_pending_cpu"
FACTOR_TO_CUT_PENDING_MEM = "factor_to_cut_pending_mem"
SECONDS_TO_WAIT_PENDING_POD = "seconds_to_wait_pending_pod"
SECONDS_HUGE_TRAINING_THRESHOLD = "seconds_huge_training_threshold"
GLOBAL_STEP_COUNT_TO_AUTO_WORKER = "global_step_count_to_auto_worker"
Expand All @@ -42,6 +43,7 @@ class DefaultConfigValues(object):
DEFAULT_SECONDS_FOR_STABLE_WORKER_COUNT = 60
DEFAULT_SECONDS_INTERVAL_TO_OPTIMIZE = 300
DEFAULT_FACTOR_TO_CUT_PENDING_CPU = 2
DEFAULT_FACTOR_TO_CUT_PENDING_MEM = 2
DEFAULT_SECONDS_TO_WAIT_PENDING_POD = 900 # 15min
DEFAULT_SECONDS_HUGE_TRAINING_THRESHOLD = 1800 # 30min
DEFALUT_GLOBAL_STEP_COUNT_TO_AUTO_WORKER = 5
Expand Down Expand Up @@ -81,6 +83,10 @@ def __init__(self):
ConfigKeys.FACTOR_TO_CUT_PENDING_CPU,
DefaultConfigValues.DEFAULT_FACTOR_TO_CUT_PENDING_CPU,
)
self.factor_to_cut_pending_mem = self.get_param_value_from_brain(
ConfigKeys.FACTOR_TO_CUT_PENDING_MEM,
DefaultConfigValues.DEFAULT_FACTOR_TO_CUT_PENDING_MEM,
)
self.seconds_to_wait_pending_pod = self.get_param_value_from_brain(
ConfigKeys.SECONDS_TO_WAIT_PENDING_POD,
DefaultConfigValues.DEFAULT_SECONDS_TO_WAIT_PENDING_POD,
Expand Down
8 changes: 5 additions & 3 deletions dlrover/python/common/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ def find_free_port(port=0):


def find_free_port_in_range(start, end):
for i in range(start, end):
for port in range(start, end):
try:
return find_free_port(i)
return find_free_port(port)
except OSError as e:
logger.info("Socket creation attempt failed.", exc_info=e)
logger.info(
f"Socket creation attempt failed with {port}.", exc_info=e
)
return RuntimeError(f"Fail to find a free port in [{start}, {end})")


Expand Down
2 changes: 2 additions & 0 deletions dlrover/python/common/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ def get_relaunch_node_info(self, new_id):
new_node.name = None
new_node.status = NodeStatus.INITIAL
new_node.start_time = None
new_node.create_time = None
new_node.finish_time = None
new_node.is_released = False
new_node.relaunchable = True
new_node.init_time = time.time()
Expand Down
28 changes: 23 additions & 5 deletions dlrover/python/master/node/job_auto_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ def __init__(
self._scaler = node_scaler
self._job_nodes = job_nodes
self._autoscaling_started = False
threading.Thread(
target=self.monitor_pending_node_at_begining,
name="monitor_pending_nodes",
daemon=True,
).start()

def monitor_pending_node_at_begining(self):
logger.info("Start monitoring pending nodes.")
while True:
if self._autoscaling_started:
logger.info("Stop mointoring pending nodes.")
break
self._reduce_timeout_pending_node_resource()
time.sleep(60)

def start_auto_scaling(self):
"""Start to auto-scale nodes to improve the training throughput."""
Expand Down Expand Up @@ -145,7 +159,7 @@ def _periodic_optimize_running_resource(self):
opt_interval = _dlrover_context.seconds_interval_to_optimize
while True:
if self._stop_autoscaling:
logger.info("Stop auto-scaling PS Trainign.")
logger.info("Stop auto-scaling PS Training.")
break
if (
self._speed_monitor.worker_adjustment_finished()
Expand All @@ -157,7 +171,6 @@ def _periodic_optimize_running_resource(self):
if plan:
last_plan_time = time.time()
self.execute_job_optimization_plan(plan)
self._cut_timeout_pending_node_cpu()
time.sleep(30)

def execute_job_optimization_plan(self, plan: ResourcePlan):
Expand Down Expand Up @@ -217,14 +230,19 @@ def _migrate_nodes(self, node_resources: Dict[str, NodeResource]):
logger.info("Migration plan = %s", scale_plan.toJSON())
return scale_plan

def _cut_timeout_pending_node_cpu(self):
def _reduce_timeout_pending_node_resource(self):
"""Cut down CPU cores of pending pod at the job starts"""
if self._autoscaling_started:
return
scale_plan = ScalePlan()
if _dlrover_context.auto_ps_enabled:
self._ps_manager.cut_pending_node_cpu()
plan = self._ps_manager.reduce_pending_node_resource()
scale_plan.merge(plan)
if _dlrover_context.auto_worker_enabled:
self._worker_manager.cut_pending_node_cpu()
plan = self._worker_manager.reduce_pending_node_resource()
scale_plan.merge(plan)
self._scaler.scale(scale_plan)
return scale_plan


class AllreduceTrainingAutoScaler(JobAutoScaler):
Expand Down
36 changes: 26 additions & 10 deletions dlrover/python/master/node/training_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ def get_critical_worker_index(params: JobArgs):
return critical_worker_index


def cut_timeout_pending_node_cpu(node: Node):
"""Cut down CPU cores and relaunch it if the pending
def reduce_timeout_pending_node_resource(node: Node):
"""Reduce CPU cores or memroy and relaunch it if the pending
time is too long"""
now = time.time()
if node.is_released or not node.create_time:
Expand All @@ -117,6 +117,7 @@ def cut_timeout_pending_node_cpu(node: Node):
new_cpu = math.ceil(
original_cpu / _dlrover_context.factor_to_cut_pending_cpu
)
reduced = False
if new_cpu > NodeResourceLimit.MIN_CPU_CORES:
node.config_resource.cpu = new_cpu
logger.info(
Expand All @@ -127,9 +128,23 @@ def cut_timeout_pending_node_cpu(node: Node):
_dlrover_context.seconds_to_wait_pending_pod,
new_cpu,
)
return True
else:
return False
reduced = True
original_memory = node.config_resource.memory
new_memory = math.ceil(
original_memory / _dlrover_context.factor_to_cut_pending_mem
)
if new_memory > NodeResourceLimit.MIN_MEMORY:
node.config_resource.memory = new_memory
logger.info(
"Pod %s pending time %s beyonds %s."
"Delete and relaunch it with memory %s",
node.name,
pending_time,
_dlrover_context.seconds_to_wait_pending_pod,
new_memory,
)
reduced = True
return reduced


class TrainingNodeManager(object):
Expand Down Expand Up @@ -193,16 +208,17 @@ def relaunch_node(self, node: Node):
)
return plan

def cut_pending_node_cpu(self):
def reduce_pending_node_resource(self):
"""Cut down CPU cores of pendding PS Pods"""
plan = ScalePlan()
nodes = copy.deepcopy(self._nodes)
for node in nodes.values():
cur_nodes = list(self._nodes.values())
for node in cur_nodes:
if node.status == NodeStatus.PENDING:
cut_cpu = cut_timeout_pending_node_cpu(node)
if cut_cpu:
reduced = reduce_timeout_pending_node_resource(node)
if reduced:
node.relaunchable = False
node_plan = self.relaunch_node(node)
plan.remove_nodes.append(node)
plan.merge(node_plan)
return plan

Expand Down
6 changes: 5 additions & 1 deletion dlrover/python/master/resource/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def _init_job_resource_by_optimizer(self):
plan = self._resource_optimizer.generate_opt_plan(self._job_stage)
if not plan or plan.empty():
logger.info("Use the default plan to start the job")
plan = ResourcePlan.new_default_plan()
plan = self._gen_default_resource_plan()
self._job_stage = JobOptStage.WORKER_INITIAL

if (
Expand Down Expand Up @@ -260,6 +260,10 @@ def _init_job_resource_by_optimizer(self):
ps_resource.node_resource.memory,
)

def _gen_default_resource_plan(self):
plan = ResourcePlan.new_default_plan()
return plan

def init_job_resource(self, job_resource: JobResource):
"""Adjust the initial resource of typed pods by EasyDL.
Args:
Expand Down
1 change: 1 addition & 0 deletions dlrover/python/master/watcher/k8s_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def _convert_pod_event_to_node_event(event):
host_name=host_name,
host_ip=host_ip,
)
node.create_time = evt_obj.metadata.creation_timestamp
node.set_exit_reason(_get_pod_exit_reason(evt_obj))
node_event = NodeEvent(event_type=evt_type, node=node)
return node_event
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/tests/test_ps_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_cut_pending_ps_cpu(self):
node.status = NodeStatus.PENDING
node.create_time = datetime.now() + timedelta(days=-1)

plan = self._ps_manager.cut_pending_node_cpu()
plan = self._ps_manager.reduce_pending_node_resource()
self.assertEqual(len(plan.launch_nodes), 2)
self.assertEqual(plan.launch_nodes[0].config_resource.cpu, 8)
self.assertEqual(plan.launch_nodes[0].config_resource.memory, 2048)
Expand Down
4 changes: 2 additions & 2 deletions dlrover/python/tests/test_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_relaunch_chief_node(self):
self.assertEqual(plan.launch_nodes[0].config_resource.cpu, 16)
self.assertEqual(manager._nodes[1].id, 1)

def test_cut_pending_node_cpu(self):
def test_reduce_pending_node_resource(self):
worker_manager = WorkerManager(
self._job_nodes[NodeType.WORKER],
self._job_resource,
Expand All @@ -134,7 +134,7 @@ def test_cut_pending_node_cpu(self):
for node in worker_manager._nodes.values():
node.status = NodeStatus.PENDING
node.create_time = datetime.now() + timedelta(days=-1)
plan = worker_manager.cut_pending_node_cpu()
plan = worker_manager.reduce_pending_node_resource()
self.assertEqual(len(plan.launch_nodes), 5)

def test_pending_without_workers(self):
Expand Down