Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SmartSim to use Dragon V0.10 #753

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
124 changes: 53 additions & 71 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import collections
import functools
import itertools
Expand All @@ -38,7 +39,6 @@
# isort: off
import dragon.infrastructure.connection as dragon_connection
import dragon.infrastructure.policy as dragon_policy
import dragon.native.group_state as dragon_group_state
import dragon.native.process as dragon_process
import dragon.native.process_group as dragon_process_group
import dragon.native.machine as dragon_machine
Expand Down Expand Up @@ -67,34 +67,48 @@
logger = get_logger(__name__)


class DragonStatus(str, Enum):
ERROR = str(dragon_group_state.Error())
RUNNING = str(dragon_group_state.Running())

def __str__(self) -> str:
return self.value


@dataclass
class ProcessGroupInfo:
status: SmartSimStatus
"""Status of step"""
process_group: t.Optional[dragon_process_group.ProcessGroup] = None
"""Internal Process Group object, None for finished or not started steps"""
puids: t.Optional[t.List[t.Optional[int]]] = None # puids can be None
"""List of Process UIDS belonging to the ProcessGroup"""
return_codes: t.Optional[t.List[int]] = None
"""List of return codes of completed processes"""
hosts: t.List[str] = field(default_factory=list)
"""List of hosts on which the Process Group """
redir_workers: t.Optional[dragon_process_group.ProcessGroup] = None
"""Workers used to redirect stdout and stderr to file"""

@property
def smartsim_info(self) -> t.Tuple[SmartSimStatus, t.Optional[t.List[int]]]:
def smartsim_info(self) -> t.Tuple[SmartSimStatus, t.List[int]]:
"""Information needed by SmartSim Launcher and Job Manager"""
return (self.status, self.return_codes)

@property
def puids(self) -> t.List[int]:
"""List of Process UIDS belonging to the ProcessGroup"""
return list(set(itertools.chain(self.active_puids, self.inactive_puids)))

@property
def active_puids(self) -> t.List[int]:
if self.process_group is None:
return []
return list(self.process_group.puids)

@property
def inactive_puids(self) -> t.List[int]:
if self.process_group is None:
return []
return [puid for puid, _ in self.process_group.inactive_puids]

@property
def return_codes(self) -> t.List[int]:
"""List of return codes of completed processes"""
if self.process_group is None:
return [-1]
if self.status == SmartSimStatus.STATUS_CANCELLED:
return [-9]
return [ret for _, ret in self.process_group.inactive_puids]

def __str__(self) -> str:
if self.process_group is not None and self.redir_workers is not None:
msg = [f"Active Group ({self.status})"]
Expand All @@ -105,7 +119,7 @@ def __str__(self) -> str:

if self.hosts is not None:
msg.append(f"Hosts: {','.join(self.hosts)}")
if self.return_codes is not None:
if self.return_codes:
msg.append(f"{self.return_codes}")

return ", ".join(msg)
Expand Down Expand Up @@ -404,10 +418,10 @@ def _stop_steps(self) -> None:
else:
# Technically we could just terminate, but what if
# the application intercepts that and ignores it?
proc_group = self._group_infos[step_id].process_group
group_info = self._group_infos[step_id]
if (
proc_group is not None
and proc_group.status == DragonStatus.RUNNING
group_info.active_puids
and (proc_group := group_info.process_group) is not None
):
try:
proc_group.kill()
Expand All @@ -416,7 +430,7 @@ def _stop_steps(self) -> None:
proc_group.stop()
except dragon_process_group.DragonProcessGroupError:
logger.error("Process group already stopped")
redir_group = self._group_infos[step_id].redir_workers
redir_group = group_info.redir_workers
if redir_group is not None:
try:
redir_group.join(0.1)
Expand All @@ -425,7 +439,6 @@ def _stop_steps(self) -> None:
logger.error(e)

self._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED
self._group_infos[step_id].return_codes = [-9]

@staticmethod
def create_run_policy(
Expand All @@ -438,33 +451,27 @@ def create_run_policy(
if isinstance(request, DragonRunRequest):
run_request: DragonRunRequest = request

affinity = dragon_policy.Policy.Affinity.DEFAULT
cpu_affinity: t.List[int] = []
gpu_affinity: t.List[int] = []

# Customize policy only if the client requested it, otherwise use default
if run_request.policy is not None:
# Affinities are not mutually exclusive. If specified, both are used
if run_request.policy.cpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
cpu_affinity = run_request.policy.cpu_affinity

if run_request.policy.gpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
gpu_affinity = run_request.policy.gpu_affinity
logger.debug(
f"Affinity strategy: {affinity}, "
f"CPU affinity mask: {cpu_affinity}, "
f"GPU affinity mask: {gpu_affinity}"
)
if affinity != dragon_policy.Policy.Affinity.DEFAULT:
return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
affinity=affinity,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)
return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)

return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
Expand Down Expand Up @@ -513,22 +520,19 @@ def _start_steps(self) -> None:
logger.error(e)
grp_status = SmartSimStatus.STATUS_FAILED

puids = None
try:
puids = list(
set(grp.puids + [puid for puid, retcode in grp.inactive_puids])
)
self._group_infos[step_id] = ProcessGroupInfo(
grp_info = ProcessGroupInfo(
process_group=grp,
puids=puids,
return_codes=[],
status=grp_status,
hosts=hosts,
)
puids = grp_info.puids
self._group_infos[step_id] = grp_info
self._running_steps.append(step_id)
started.append(step_id)
except Exception as e:
logger.error(e)
puids = None

if (
puids is not None
Expand Down Expand Up @@ -575,32 +579,15 @@ def _refresh_statuses(self) -> None:
grp = group_info.process_group
if grp is None:
group_info.status = SmartSimStatus.STATUS_FAILED
group_info.return_codes = [-1]
elif group_info.status not in TERMINAL_STATUSES:
if grp.status == str(DragonStatus.RUNNING):
if group_info.active_puids:
group_info.status = SmartSimStatus.STATUS_RUNNING
else:
puids = group_info.puids
if puids is not None and all(
puid is not None for puid in puids
):
try:
group_info.return_codes = [
dragon_process.Process(None, ident=puid).returncode
for puid in puids
]
except (ValueError, TypeError) as e:
logger.error(e)
group_info.return_codes = [-1 for _ in puids]
else:
group_info.return_codes = [0]
if not group_info.status == SmartSimStatus.STATUS_CANCELLED:
group_info.status = (
SmartSimStatus.STATUS_FAILED
if any(group_info.return_codes)
or grp.status == DragonStatus.ERROR
else SmartSimStatus.STATUS_COMPLETED
)
elif group_info.status != SmartSimStatus.STATUS_CANCELLED:
group_info.status = (
SmartSimStatus.STATUS_FAILED
if any(group_info.return_codes)
else SmartSimStatus.STATUS_COMPLETED
)

if group_info.status in TERMINAL_STATUSES:
terminated.append(step_id)
Expand All @@ -620,7 +607,7 @@ def _refresh_statuses(self) -> None:
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
self._free_hosts.append(host)
group_info.process_group = None
# group_info.process_group = None
group_info.redir_workers = None

def _update_shutdown_status(self) -> None:
Expand Down Expand Up @@ -685,7 +672,7 @@ def _(self, request: DragonRunRequest) -> DragonRunResponse:
honorable, err = self._can_honor(request)
if not honorable:
self._group_infos[step_id] = ProcessGroupInfo(
status=SmartSimStatus.STATUS_FAILED, return_codes=[-1]
status=SmartSimStatus.STATUS_FAILED
)
else:
self._queued_steps[step_id] = request
Expand Down Expand Up @@ -751,12 +738,7 @@ def _proc_group_info_table_line(
else:
table_line.append("")

if proc_group_info.return_codes is not None:
table_line.append(
f"{','.join(str(ret) for ret in proc_group_info.return_codes)}"
)
else:
table_line.append("")
table_line.append(",".join(str(ret) for ret in proc_group_info.return_codes))

if proc_group_info.puids is not None:
table_line.append(f"{len(proc_group_info.puids)}")
Expand Down