Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SmartSim to use Dragon V0.10 #753

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
7 changes: 7 additions & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@ To be released at some point in the future

Description

- Update the `DragonBackend` to use
[Dragon V0.10](https://github.com/DragonHPC/dragon/releases/tag/v0.10-beta)
- Implement workaround for Tensorflow that allows RedisAI to build with GCC-14
- Add instructions for installing SmartSim on PML's Scylla

Detailed Notes

- Dragon V0.10 introduced support for infiniband networks and largely
overhauled the ``ProcessGroup`` API, used widely throughout SmartSim's
``DragonBackend``, for better readability and debugging. SmartSim has has
adopted this new version of Dragon to take advantage of these improvements.
([SmartSim-PR753](https://github.com/CrayLabs/SmartSim/pull/753))
- In libtensorflow, the input argument to TF_SessionRun seems to be mistyped to
TF_Output instead of TF_Input. These two types differ only in name. GCC-14
catches this and throws an error, even though earlier versions allow this. To
Expand Down
5 changes: 2 additions & 3 deletions smartsim/_core/_cli/scripts/dragon_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def python_version() -> str:
def dragon_pin() -> str:
"""Return a string indicating the pinned major/minor version of the dragon
package to install"""
return "0.9"
return "0.10"


def _platform_filter(asset_name: str) -> bool:
Expand All @@ -60,7 +60,7 @@ def _platform_filter(asset_name: str) -> bool:

:param asset_name: A value to inspect for keywords indicating a Cray EX asset
:returns: True if supplied value is correct for current platform"""
key = "crayex"
key = "hsn"
is_cray = key in asset_name.lower()
if is_crayex_platform():
return is_cray
Expand Down Expand Up @@ -132,7 +132,6 @@ def filter_assets(assets: t.Collection[GitReleaseAsset]) -> t.Optional[GitReleas
def retrieve_asset_info() -> GitReleaseAsset:
"""Find a release asset that meets all necessary filtering criteria

:param dragon_pin: identify the dragon version to install (e.g. dragon-0.8)
:returns: A GitHub release asset"""
assets = _get_release_assets()
asset = filter_assets(assets)
Expand Down
125 changes: 53 additions & 72 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import collections
import functools
import itertools
import time
import typing as t
from dataclasses import dataclass, field
from enum import Enum
from threading import RLock

from tabulate import tabulate
Expand All @@ -38,7 +38,6 @@
# isort: off
import dragon.infrastructure.connection as dragon_connection
import dragon.infrastructure.policy as dragon_policy
import dragon.native.group_state as dragon_group_state
import dragon.native.process as dragon_process
import dragon.native.process_group as dragon_process_group
import dragon.native.machine as dragon_machine
Expand Down Expand Up @@ -67,34 +66,48 @@
logger = get_logger(__name__)


class DragonStatus(str, Enum):
ERROR = str(dragon_group_state.Error())
RUNNING = str(dragon_group_state.Running())

def __str__(self) -> str:
return self.value


@dataclass
class ProcessGroupInfo:
status: SmartSimStatus
"""Status of step"""
process_group: t.Optional[dragon_process_group.ProcessGroup] = None
"""Internal Process Group object, None for finished or not started steps"""
puids: t.Optional[t.List[t.Optional[int]]] = None # puids can be None
"""List of Process UIDS belonging to the ProcessGroup"""
return_codes: t.Optional[t.List[int]] = None
"""List of return codes of completed processes"""
hosts: t.List[str] = field(default_factory=list)
"""List of hosts on which the Process Group """
redir_workers: t.Optional[dragon_process_group.ProcessGroup] = None
"""Workers used to redirect stdout and stderr to file"""

@property
def smartsim_info(self) -> t.Tuple[SmartSimStatus, t.Optional[t.List[int]]]:
def smartsim_info(self) -> t.Tuple[SmartSimStatus, t.List[int]]:
"""Information needed by SmartSim Launcher and Job Manager"""
return (self.status, self.return_codes)

@property
def puids(self) -> t.List[int]:
"""List of Process UIDS belonging to the ProcessGroup"""
return list(set(itertools.chain(self.active_puids, self.inactive_puids)))

Check warning on line 88 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L88

Added line #L88 was not covered by tests

@property
def active_puids(self) -> t.List[int]:
if self.process_group is None:
return []
return list(self.process_group.puids)

Check warning on line 94 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L92-L94

Added lines #L92 - L94 were not covered by tests

@property
def inactive_puids(self) -> t.List[int]:
if self.process_group is None:
return []
return [puid for puid, _ in self.process_group.inactive_puids]

Check warning on line 100 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L98-L100

Added lines #L98 - L100 were not covered by tests

@property
def return_codes(self) -> t.List[int]:
"""List of return codes of completed processes"""
if self.process_group is None:
return [-1]
if self.status == SmartSimStatus.STATUS_CANCELLED:
return [-9]
return [ret for _, ret in self.process_group.inactive_puids]

Check warning on line 109 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L105-L109

Added lines #L105 - L109 were not covered by tests

def __str__(self) -> str:
if self.process_group is not None and self.redir_workers is not None:
msg = [f"Active Group ({self.status})"]
Expand All @@ -105,7 +118,7 @@

if self.hosts is not None:
msg.append(f"Hosts: {','.join(self.hosts)}")
if self.return_codes is not None:
if self.return_codes:

Check warning on line 121 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L121

Added line #L121 was not covered by tests
msg.append(f"{self.return_codes}")

return ", ".join(msg)
Expand Down Expand Up @@ -404,10 +417,10 @@
else:
# Technically we could just terminate, but what if
# the application intercepts that and ignores it?
proc_group = self._group_infos[step_id].process_group
group_info = self._group_infos[step_id]

Check warning on line 420 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L420

Added line #L420 was not covered by tests
if (
proc_group is not None
and proc_group.status == DragonStatus.RUNNING
group_info.active_puids
and (proc_group := group_info.process_group) is not None
):
try:
proc_group.kill()
Expand All @@ -416,7 +429,7 @@
proc_group.stop()
except dragon_process_group.DragonProcessGroupError:
logger.error("Process group already stopped")
redir_group = self._group_infos[step_id].redir_workers
redir_group = group_info.redir_workers

Check warning on line 432 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L432

Added line #L432 was not covered by tests
if redir_group is not None:
try:
redir_group.join(0.1)
Expand All @@ -425,7 +438,6 @@
logger.error(e)

self._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED
self._group_infos[step_id].return_codes = [-9]

@staticmethod
def create_run_policy(
Expand All @@ -438,33 +450,27 @@
if isinstance(request, DragonRunRequest):
run_request: DragonRunRequest = request

affinity = dragon_policy.Policy.Affinity.DEFAULT
cpu_affinity: t.List[int] = []
gpu_affinity: t.List[int] = []

# Customize policy only if the client requested it, otherwise use default
if run_request.policy is not None:
# Affinities are not mutually exclusive. If specified, both are used
if run_request.policy.cpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
cpu_affinity = run_request.policy.cpu_affinity

if run_request.policy.gpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
gpu_affinity = run_request.policy.gpu_affinity
logger.debug(
f"Affinity strategy: {affinity}, "
f"CPU affinity mask: {cpu_affinity}, "
f"GPU affinity mask: {gpu_affinity}"
)
if affinity != dragon_policy.Policy.Affinity.DEFAULT:
return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
affinity=affinity,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)
return dragon_policy.Policy(

Check warning on line 468 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L468

Added line #L468 was not covered by tests
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)

return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
Expand Down Expand Up @@ -513,22 +519,19 @@
logger.error(e)
grp_status = SmartSimStatus.STATUS_FAILED

puids = None
try:
puids = list(
set(grp.puids + [puid for puid, retcode in grp.inactive_puids])
)
self._group_infos[step_id] = ProcessGroupInfo(
grp_info = ProcessGroupInfo(

Check warning on line 523 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L523

Added line #L523 was not covered by tests
process_group=grp,
puids=puids,
return_codes=[],
status=grp_status,
hosts=hosts,
)
puids = grp_info.puids
self._group_infos[step_id] = grp_info

Check warning on line 529 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L528-L529

Added lines #L528 - L529 were not covered by tests
self._running_steps.append(step_id)
started.append(step_id)
except Exception as e:
logger.error(e)
puids = None

Check warning on line 534 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L534

Added line #L534 was not covered by tests

if (
puids is not None
Expand Down Expand Up @@ -575,32 +578,15 @@
grp = group_info.process_group
if grp is None:
group_info.status = SmartSimStatus.STATUS_FAILED
group_info.return_codes = [-1]
elif group_info.status not in TERMINAL_STATUSES:
if grp.status == str(DragonStatus.RUNNING):
if group_info.active_puids:

Check warning on line 582 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L582

Added line #L582 was not covered by tests
group_info.status = SmartSimStatus.STATUS_RUNNING
else:
puids = group_info.puids
if puids is not None and all(
puid is not None for puid in puids
):
try:
group_info.return_codes = [
dragon_process.Process(None, ident=puid).returncode
for puid in puids
]
except (ValueError, TypeError) as e:
logger.error(e)
group_info.return_codes = [-1 for _ in puids]
else:
group_info.return_codes = [0]
if not group_info.status == SmartSimStatus.STATUS_CANCELLED:
group_info.status = (
SmartSimStatus.STATUS_FAILED
if any(group_info.return_codes)
or grp.status == DragonStatus.ERROR
else SmartSimStatus.STATUS_COMPLETED
)
elif group_info.status != SmartSimStatus.STATUS_CANCELLED:
group_info.status = (

Check warning on line 585 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L584-L585

Added lines #L584 - L585 were not covered by tests
SmartSimStatus.STATUS_FAILED
if any(group_info.return_codes)
else SmartSimStatus.STATUS_COMPLETED
)

if group_info.status in TERMINAL_STATUSES:
terminated.append(step_id)
Expand All @@ -620,7 +606,7 @@
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
self._free_hosts.append(host)
group_info.process_group = None
# group_info.process_group = None
group_info.redir_workers = None

def _update_shutdown_status(self) -> None:
Expand Down Expand Up @@ -685,7 +671,7 @@
honorable, err = self._can_honor(request)
if not honorable:
self._group_infos[step_id] = ProcessGroupInfo(
status=SmartSimStatus.STATUS_FAILED, return_codes=[-1]
status=SmartSimStatus.STATUS_FAILED
)
else:
self._queued_steps[step_id] = request
Expand Down Expand Up @@ -751,12 +737,7 @@
else:
table_line.append("")

if proc_group_info.return_codes is not None:
table_line.append(
f"{','.join(str(ret) for ret in proc_group_info.return_codes)}"
)
else:
table_line.append("")
table_line.append(",".join(str(ret) for ret in proc_group_info.return_codes))

Check warning on line 740 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L740

Added line #L740 was not covered by tests

if proc_group_info.puids is not None:
table_line.append(f"{len(proc_group_info.puids)}")
Expand Down
Loading
Loading