diff --git a/requirements-testing.txt b/requirements-testing.txt index 0e233c82..632c4761 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -7,3 +7,4 @@ black >= 23.7, == 23.* ruff >= 0.0.286, == 0.0.* mypy >= 1.5.1, == 1.5.* psutil >= 5.9.5, == 5.9.* +pywin32 == 306; platform_system == "Windows" diff --git a/src/openjd/sessions/_session_user.py b/src/openjd/sessions/_session_user.py index aa3439bc..1ef357eb 100644 --- a/src/openjd/sessions/_session_user.py +++ b/src/openjd/sessions/_session_user.py @@ -1,11 +1,15 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. import os -from ._os_checker import is_posix +from ._os_checker import is_posix, is_windows if is_posix(): import grp +if is_windows(): + import win32api + import win32security + from typing import Optional __all__ = ("PosixSessionUser", "SessionUser") @@ -38,7 +42,48 @@ def __init__(self, user: str, *, group: Optional[str] = None) -> None: group (Optional[str]): The group. Defaults to the name of this process' effective group. """ - if os.name != "posix": + if not is_posix(): raise RuntimeError("Only available on posix systems.") self.user = user self.group = group if group else grp.getgrgid(os.getegid()).gr_name # type: ignore + + +class WindowsSessionUser(SessionUser): + __slots__ = ("user", "group") + """Specific os-user identity to run a Session as under Windows.""" + + user: str + """ + User name of the identity to run the Session's subprocesses under. + This can be just a username for a local user, a domain user's UPN, or a domain user in down-level format. + ex: localUser, domainuser@domain.com, domain\\domainUser + """ + + group: str + """ + Group name of the identity to run the Session's subprocesses under. + This can be just a group name for a local group, or a domain group in down-level format. + ex: localGroup, domain\\domainGroup + """ + + @staticmethod + def upn_to_down_level(upn): + return win32security.TranslateName( + upn, win32api.NameUserPrincipal, win32api.NameSamCompatible + ) + + def __init__(self, user: str, *, group: str) -> None: + """ + Arguments: + user (str): The user + group (str): The group + """ + if not is_windows(): + raise RuntimeError("Only available on Windows systems.") + + self.group = group + + try: + self.user = self.upn_to_down_level(user) + except Exception: + raise RuntimeError("Unable to convert user {user} to down-level logon format.")