Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ran black on the code #22

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 70 additions & 60 deletions check_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,89 @@

from duo_hmac import duo_hmac

CONFIG_FILE = 'duo.conf'
DUO_SECTION = 'duo'
IKEY_KEY = 'ikey'
SKEY_KEY = 'skey'
API_HOST_KEY = 'api_host'
CONFIG_FILE = "duo.conf"
DUO_SECTION = "duo"
IKEY_KEY = "ikey"
SKEY_KEY = "skey"
API_HOST_KEY = "api_host"


def main():
# Read from config or error out
ikey, skey, api_host = _read_config()
duo = duo_hmac.DuoHmac(ikey, skey, api_host)

# Try to call 'check' and look at return
status_code, json_content = _attempt_api_call(duo, "/auth/v2/check")

if (status_code == 200):
print("Your credentials successfully called the Auth API")
return

if not json_content:
print(f"API call failed with status {status_code}")
return

if json_content['code'] != 40301:
print(f"API call failed with status {status_code}: {json_content['message']}")
return

# if 40301, creds are probably admin; try settings endpoint
status_code, json_content = _attempt_api_call(duo, "/admin/v1/settings")

if status_code == 200:
print("Your credentials successfully called the Admin API")
elif json_content:
print(f"API call failed with status {status_code}: {json_content['message']}")
else:
print(f"API call failed with status code {status_code}")
# Read from config or error out
ikey, skey, api_host = _read_config()
duo = duo_hmac.DuoHmac(ikey, skey, api_host)

# Try to call 'check' and look at return
status_code, json_content = _attempt_api_call(duo, "/auth/v2/check")

def _attempt_api_call(duo, path):
uri, _, headers = duo.get_authentication_components("GET", path, {}, {})
if status_code == 200:
print("Your credentials successfully called the Auth API")
return

response = requests.get(f"https://{uri}", headers=headers)
if not json_content:
print(f"API call failed with status {status_code}")
return

status_code = response.status_code
json_content = json.loads(response.content)
if json_content["code"] != 40301:
print(f"API call failed with status {status_code}: {json_content['message']}")
return

return status_code, json_content

# if 40301, creds are probably admin; try settings endpoint
status_code, json_content = _attempt_api_call(duo, "/admin/v1/settings")

def _read_config():
cp = configparser.ConfigParser()
cp.read(CONFIG_FILE)
if status_code == 200:
print("Your credentials successfully called the Admin API")
elif json_content:
print(f"API call failed with status {status_code}: {json_content['message']}")
else:
print(f"API call failed with status code {status_code}")

if not cp.sections():
raise FileNotFoundError(f"Config file {CONFIG_FILE} seems to be missing or empty.")

if 'duo' not in cp.sections():
raise ValueError(f"Config file {CONFIG_FILE} seems to be missing a '{DUO_SECTION}' section.")

ikey = cp[DUO_SECTION][IKEY_KEY]
if not ikey:
raise ValueError(f"Missing entry for '{IKEY_KEY}' in {CONFIG_FILE} '{DUO_SECTION}'")
def _attempt_api_call(duo, path):
uri, _, headers = duo.get_authentication_components("GET", path, {}, {})

skey = cp[DUO_SECTION][SKEY_KEY]
if not skey:
raise ValueError(f"Missing entry for '{SKEY_KEY}' in {CONFIG_FILE} '{DUO_SECTION}'")
response = requests.get(f"https://{uri}", headers=headers)

api_host = cp[DUO_SECTION][API_HOST_KEY]
if not api_host:
raise ValueError(f"Missing entry for '{API_HOST_KEY}' in {CONFIG_FILE} '{DUO_SECTION}'")
status_code = response.status_code
json_content = json.loads(response.content)

return ikey, skey, api_host
return status_code, json_content


if __name__=="__main__":
main()
def _read_config():
cp = configparser.ConfigParser()
cp.read(CONFIG_FILE)

if not cp.sections():
raise FileNotFoundError(
f"Config file {CONFIG_FILE} seems to be missing or empty."
)

if "duo" not in cp.sections():
raise ValueError(
f"Config file {CONFIG_FILE} seems to be missing a '{DUO_SECTION}' section."
)

ikey = cp[DUO_SECTION][IKEY_KEY]
if not ikey:
raise ValueError(
f"Missing entry for '{IKEY_KEY}' in {CONFIG_FILE} '{DUO_SECTION}'"
)

skey = cp[DUO_SECTION][SKEY_KEY]
if not skey:
raise ValueError(
f"Missing entry for '{SKEY_KEY}' in {CONFIG_FILE} '{DUO_SECTION}'"
)

api_host = cp[DUO_SECTION][API_HOST_KEY]
if not api_host:
raise ValueError(
f"Missing entry for '{API_HOST_KEY}' in {CONFIG_FILE} '{DUO_SECTION}'"
)

return ikey, skey, api_host


if __name__ == "__main__":
main()
121 changes: 62 additions & 59 deletions duo_hmac/duo_canonicalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,79 @@
import urllib.parse


def generate_canonical_string(date_string: str,
http_method: str,
api_host: str,
api_path: str,
qs_parameters: dict[bytes, list[bytes]],
body: str,
duo_headers: dict[str, str]) -> str:
"""
Create the "canonical string" of the request:
Date string in RFC 2822 format
HTTP method in uppercase
API host in lowercase
API path
Canonicalized string of query string parameters, or empty line if none
Hash of body as JSON string of body parameters, or hash of empty string if none
Hash of 'x-duo' headers, or hash of empty string if none
"""
canon_parts = [
date_string,
http_method.upper(),
api_host.lower(),
api_path,
canonicalize_parameters(qs_parameters),
canonicalize_body(body),
canonicalize_x_duo_headers(duo_headers)
]
return '\n'.join(canon_parts)
def generate_canonical_string(
date_string: str,
http_method: str,
api_host: str,
api_path: str,
qs_parameters: dict[bytes, list[bytes]],
body: str,
duo_headers: dict[str, str],
) -> str:
"""
Create the "canonical string" of the request:
Date string in RFC 2822 format
HTTP method in uppercase
API host in lowercase
API path
Canonicalized string of query string parameters, or empty line if none
Hash of body as JSON string of body parameters, or hash of empty string if none
Hash of 'x-duo' headers, or hash of empty string if none
"""
canon_parts = [
date_string,
http_method.upper(),
api_host.lower(),
api_path,
canonicalize_parameters(qs_parameters),
canonicalize_body(body),
canonicalize_x_duo_headers(duo_headers),
]
return "\n".join(canon_parts)


def canonicalize_parameters(parameters: dict[bytes, list[bytes]]) -> str:
""" Canonicalize the parameters by sorting and formatting them """
if parameters is None:
return ''
"""Canonicalize the parameters by sorting and formatting them"""
if parameters is None:
return ""

# This is normalized the same as for OAuth 1.0,
# http://tools.ietf.org/html/rfc5849#section-3.4.1.3.2
args = []
for (key, vals) in sorted(
(urllib.parse.quote(key, '~'), vals) for (key, vals) in list(parameters.items())):
for val in sorted(urllib.parse.quote(val, '~') for val in vals):
args.append(f'{key}={val}')
return '&'.join(args)
# This is normalized the same as for OAuth 1.0,
# http://tools.ietf.org/html/rfc5849#section-3.4.1.3.2
args = []
for key, vals in sorted(
(urllib.parse.quote(key, "~"), vals) for (key, vals) in list(parameters.items())
):
for val in sorted(urllib.parse.quote(val, "~") for val in vals):
args.append(f"{key}={val}")
return "&".join(args)


def canonicalize_body(body: str) -> str:
""" Canonicalize the body by encoding and hashing it """
if body is None:
body = ''
return hashlib.sha512(body.encode('utf-8')).hexdigest()
"""Canonicalize the body by encoding and hashing it"""
if body is None:
body = ""
return hashlib.sha512(body.encode("utf-8")).hexdigest()


def canonicalize_x_duo_headers(duo_headers: dict[str, str]) -> str:
""" Canonicalize the x-duo headers by joining everything together and hashing it """
if duo_headers is None:
duo_headers = {}

# Lower the headers before sorting them
lowered_headers = {}
for header_name, header_value in duo_headers.items():
header_name = header_name.lower() if header_name is not None else None
lowered_headers[header_name] = header_value
"""Canonicalize the x-duo headers by joining everything together and hashing it"""
if duo_headers is None:
duo_headers = {}

canon_list = []
# Lower the headers before sorting them
lowered_headers = {}
for header_name, header_value in duo_headers.items():
header_name = header_name.lower() if header_name is not None else None
lowered_headers[header_name] = header_value

for header_name in sorted(lowered_headers.keys()):
# Extract header value and set key to lower case from now on.
value = lowered_headers[header_name]
canon_list = []

# Add to the list of values to canonicalize:
canon_list.extend([header_name, value])
for header_name in sorted(lowered_headers.keys()):
# Extract header value and set key to lower case from now on.
value = lowered_headers[header_name]

canon = '\x00'.join(canon_list)
return hashlib.sha512(canon.encode('utf-8')).hexdigest()
# Add to the list of values to canonicalize:
canon_list.extend([header_name, value])

canon = "\x00".join(canon_list)
return hashlib.sha512(canon.encode("utf-8")).hexdigest()
Loading