-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sdd user options and serverinfo and userinfo
- Loading branch information
Showing
2 changed files
with
383 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,6 +96,7 @@ async def _websocket_call(self, data: dict) -> dict: | |
|
||
async for message in websocket: | ||
response = json.loads(message) | ||
|
||
if "responseid" in response: | ||
if data["responseid"] == response["responseid"]: | ||
return response | ||
|
@@ -108,6 +109,78 @@ def _send(self, data: dict) -> dict: | |
|
||
return asyncio.run(self._websocket_call(data)) | ||
|
||
def server_info(self) -> dict: | ||
"""Gets MeshCentral server info. | ||
Returns: | ||
dict: | ||
Returns server info. | ||
Example: | ||
{ | ||
'domain': '', | ||
'name': 'mesh.example.com', | ||
'mpsname': 'mesh.example.com', | ||
'mpsport': 4433, | ||
'port': 4443, | ||
'emailcheck': True, | ||
'domainauth': False, | ||
'serverTime': 1645560067270, | ||
'features': 9607777, | ||
'features2': 16513, | ||
'languages': ['en', 'cs', 'da', 'de', 'es', 'fi', 'fr', 'hi', 'it', 'ja', 'ko', 'nl', 'nn', 'pl', 'pt-br', 'pt', 'ru', 'sv', 'tr', 'zh-chs', 'zh-cht'], | ||
'tlshash': '16D462CC0D306CFC7F242382A1606E2A57E6481B4EAAC7E5C6D91EFA306F9CABD0CD91566A8A35C3DA9580E1F51CF985', | ||
'agentCertHash': 'V7IZUeuuIWMCY8e1SIb8fKqM1RkS4fUmCbCZzi4cMMzHAi3EJPi9Y8CP5XQfz2tZ', | ||
'https': True, | ||
'redirport': 8080, | ||
'magenturl': 'mc://mesh.example.com:4443', | ||
'domainsuffix': '', | ||
'certExpire': 1652972190000 | ||
} | ||
""" | ||
|
||
data = { | ||
"action": "serverinfo" | ||
} | ||
|
||
return self._send(data)["serverinfo"] | ||
|
||
def user_info(self) -> dict: | ||
"""Gets logged on user info. | ||
Returns: | ||
dict: | ||
Returns current user info | ||
Example: | ||
{ | ||
'_id': 'user//username', | ||
'name': 'username', | ||
'creation': 1643754241, | ||
'links': { | ||
'mesh//oAUeYE3HCqUFXWCkqwqfW@ElJ7orX6hrNv$r$RyCEsVgtUQNxYC6dLs4jlfQNTPA': { | ||
'rights': 4294967295 | ||
}, | ||
'mesh//$lhtFH8ZYcVEZYSqLx1O2vxqgSdzX9bjZLAbmRMz3lJ@XLulbyhqeRUPF4MbaN64': { | ||
'rights': 4294967295 | ||
} | ||
}, | ||
'email': '[email protected]', | ||
'emailVerified': True, | ||
'siteadmin': 4294967295, | ||
'pastlogin': 1645505345, | ||
'access': 1645558617, | ||
'login': 1645505346 | ||
} | ||
""" | ||
|
||
data = { | ||
"action": "userinfo" | ||
} | ||
|
||
return self._send(data)["userinfo"] | ||
|
||
|
||
def get_device_group_id_by_name(self, group: str) -> Optional[str]: | ||
"""Get the device group id by group name. | ||
|
@@ -347,6 +420,261 @@ def edit_device_group( | |
|
||
return self._send(data) | ||
|
||
def get_user_id_by_name(self, username: str) -> Optional[str]: | ||
"""Get the user account id by username. | ||
Args: | ||
username (str): | ||
Used to search through users. | ||
Returns: | ||
str, None: | ||
Returns the user account _id if the username exists otherwise returns None. | ||
""" | ||
|
||
users = self.list_users() | ||
|
||
for user in users: | ||
if user["username"] == username: | ||
return user["_id"] | ||
|
||
return None | ||
|
||
def user_exists( | ||
self, username: Optional[str] = None, id: Optional[str] = None | ||
) -> bool: | ||
"""Check if a user account exists by username or id. | ||
This method needs either user or id arguments set. If both are set then name | ||
takes precedence. | ||
Args: | ||
username (str): | ||
Used to check if a device group with the same name exists. | ||
id (str): | ||
Used to check if a device group with the same id exists. | ||
Returns: | ||
bool: True or False depending on if the user account exists. | ||
""" | ||
|
||
if not username and not id: | ||
raise ValueError("Arguments username or id must be specified") | ||
|
||
users = self.list_users() | ||
|
||
for user in users: | ||
if user: | ||
if user["name"] == username: | ||
return True | ||
elif id: | ||
if user["_id"] == id: | ||
return True | ||
|
||
return False | ||
|
||
def list_users(self) -> list: | ||
"""List users | ||
Returns: | ||
list: Mesh user accounts. | ||
""" | ||
|
||
data = { | ||
"action": "users" | ||
} | ||
|
||
return self._send(data)["users"] | ||
|
||
def add_user( | ||
self, | ||
username: str, | ||
password: Optional[str] = None, | ||
random_pass: bool = False, | ||
domain: Optional[str] = None, | ||
email: Optional[str] = None, | ||
email_verfied: bool = False, | ||
reset_pass: bool = False, | ||
full_name: Optional[str] = None, | ||
phone: Optional[str] = None, | ||
rights: Optional[str] = None, | ||
) -> dict: | ||
"""Add User | ||
This method needs a username set and password is optional only is random_pass is true. random_pass | ||
will take precedence. | ||
Args: | ||
username (str): | ||
Username for the user that is used to login | ||
password (str): | ||
Password to set for the user. Not needed if random_pass is set to True | ||
random_pass (str, optional): | ||
Sets a random password for the user account. | ||
domain (str, optional): | ||
Account domain, only for cross-domain admins. | ||
email (str, optional): | ||
New account email address. | ||
email_verified (bool, optional): | ||
New account email is verified. | ||
reset_pass (bool, optional): | ||
Request password reset on next login. | ||
full_name (str, optional): | ||
Set the full name for this account. | ||
phone (str, optional): | ||
Set the account phone number. | ||
rights (str, optional): | ||
Server permissions for account. Can be none, full, or a comma separated | ||
list of these possible values: | ||
manageusers,backup,restore,update,fileaccess,locked,nonewgroups,notools,usergroups,recordings,locksettings,allevents | ||
Returns: | ||
dict: Returns a confirmation that the user was added | ||
Example: | ||
{ | ||
'action': 'adduser', | ||
'responseid': '31424b26-9539-400d-ab41-e406aeb337b2', | ||
'result': 'ok' | ||
} | ||
""" | ||
|
||
if not password and not random_pass: | ||
raise ValueError("Either password or random_pass must be set") | ||
|
||
data = { | ||
"action": "adduser", | ||
"username": username, | ||
"pass": utils.gen_password() if random_pass else password, | ||
"responseid": utils.gen_response_id(), | ||
} | ||
|
||
if email: | ||
data["email"] = email | ||
if email_verfied: | ||
data["emailVerified"] = True | ||
|
||
if reset_pass: | ||
data["resetNextLogin"] = True | ||
|
||
if domain: | ||
data["domain"] = domain | ||
|
||
if phone: | ||
data["phone"] = phone | ||
|
||
if full_name: | ||
data["realname"] = full_name | ||
|
||
if rights: | ||
data["siteadmin"] = utils.permissions_str_to_int(rights) | ||
|
||
return self._send(data) | ||
|
||
def edit_user( | ||
self, | ||
username: str, | ||
domain: str = "", | ||
email: Optional[str] = None, | ||
email_verfied: bool = False, | ||
reset_pass: bool = False, | ||
full_name: Optional[str] = None, | ||
phone: Optional[str] = None, | ||
rights: Optional[str] = None, | ||
) -> dict: | ||
"""Edit User | ||
This method needs a username set to identify the user to edit. | ||
Args: | ||
username (str): | ||
Username for the user that is used to login | ||
domain (str, optional): | ||
Account domain, only for cross-domain admins. (defaults to '') | ||
email (str, optional): | ||
New account email address. | ||
email_verified (bool, optional): | ||
New account email is verified. | ||
reset_pass (bool, optional): | ||
Request password reset on next login. | ||
full_name (str, optional): | ||
Set the full name for this account. | ||
phone (str, optional): | ||
Set the account phone number. | ||
rights (str, optional): | ||
Server permissions for account. Can be none, full, or a comma separated | ||
list of these possible values: | ||
manageusers,backup,restore,update,fileaccess,locked,nonewgroups,notools,usergroups,recordings,locksettings,allevents | ||
Returns: | ||
dict: Returns a confirmation that the user was edited | ||
Example: | ||
{ | ||
'action': 'edituser', | ||
'responseid': '1d508225-818d-444c-9a33-62c4ef76f652', | ||
'result': 'ok' | ||
} | ||
""" | ||
|
||
data = { | ||
"action": "edituser", | ||
"userid": utils.format_user_id(username, domain), | ||
"responseid": utils.gen_response_id(), | ||
} | ||
|
||
if email: | ||
data["email"] = email | ||
if email_verfied: | ||
data["emailVerified"] = True | ||
|
||
if reset_pass: | ||
data["resetNextLogin"] = True | ||
|
||
if domain: | ||
data["domain"] = domain | ||
|
||
if phone: | ||
data["phone"] = phone | ||
|
||
if full_name: | ||
data["realname"] = full_name | ||
|
||
if rights: | ||
data["siteadmin"] = utils.permissions_str_to_int(rights) | ||
|
||
return self._send(data) | ||
|
||
def remove_user(self, username: str, domain: str = "") -> dict: | ||
"""Delete User | ||
This method needs a username set to identify the user to delete. | ||
Args: | ||
username (str): | ||
Username for the user that is used to login | ||
domain (str, optional) | ||
Account domain, only for cross-domain admins. (defaults to '') | ||
Returns: | ||
dict: Returns a confirmation that the user was deleted. | ||
Example: | ||
{ | ||
'action': 'deleteuser', | ||
'responseid': '1d508225-818d-444c-9a33-62c4ef76f652', | ||
'result': 'ok' | ||
} | ||
""" | ||
|
||
data = { | ||
"action": "deleteuser", | ||
"userid": utils.format_user_id(username, domain), | ||
"responseid": utils.gen_response_id(), | ||
} | ||
|
||
return self._send(data) | ||
|
||
# run command on an agent | ||
def run_command(self, node_id: str, command: str, runAsUser: int = 0) -> dict: | ||
|
||
|
Oops, something went wrong.