From e5d76f60ace4c607dd98b8a929cb98bea4b58186 Mon Sep 17 00:00:00 2001 From: John Walstra <70371225+jsupun@users.noreply.github.com> Date: Thu, 24 Feb 2022 16:27:13 -0600 Subject: [PATCH] Add record framework (#225) * Add CLI change in that use the helper module * Added ability to block all apps Some Windows apps will block, some will not. For example, Visual Code is launched with code.cmd, which launches code.exe. Since code.cmd ends quickly, we don't get a chance to edit. So we have blocking that checks to see if the process name is actually running. MacOS UI apps have a different kind of blocking. The application needs to end. Also fixed the problem with the record type json not being included in the sdist. * Add github actions and handle parsing better * Add test for config command Add test for config and fixed a few problem. The color --enable/disabled did not work. * Allow user to recheck if record data file If the editor they are using is nonblocking, the CLI will complain about template markers being found before the user saves. This allows the user to finished editing and then typing 'r' to recheck the file. * Handle output and exceptions better * Add missing module to requirements.txt and setup.py * Tests are failing in GitHub action. Print the command results. * Fixed unit test and error message for config commands The unit test was failing because no keeper.ini file existed. Use the Mock config and the Export module too make a fake keeper.ini. Also fixed some of the error when a command fails. --- .github/workflows/publish.pypi.helper.yml | 45 ++ .github/workflows/test.cli.yml | 14 + .github/workflows/test.python.helper.yml | 46 ++ .../keeper_secrets_manager_cli/__init__.py | 31 +- .../keeper_secrets_manager_cli/__main__.py | 240 ++++++- .../keeper_secrets_manager_cli/common.py | 105 +++ .../keeper_secrets_manager_cli/exception.py | 4 + .../keeper_secrets_manager_cli/profile.py | 92 ++- .../keeper_secrets_manager_cli/secret.py | 206 +++++- .../requirements.txt | 4 +- .../keeper_secrets_manager_cli/setup.py | 3 +- .../tests/config_test.py | 108 +++ .../tests/secret_inflate_test.py | 4 + .../tests/secret_test.py | 93 +++ .../core/keeper_secrets_manager_core/mock.py | 7 +- sdk/python/helper/LICENSE | 0 sdk/python/helper/MANIFEST.in | 1 + sdk/python/helper/README.md | 5 + .../keeper_secrets_manager_helper/__init__.py | 0 .../keeper_secrets_manager_helper/common.py | 30 + .../exception.py | 3 + .../keeper_secrets_manager_helper/field.py | 94 +++ .../field_type.py | 20 + .../keeper_secrets_manager_helper/format.py | 20 + .../keeper_secrets_manager_helper/record.py | 42 ++ .../record_type.py | 50 ++ .../v3/__init__.py | 0 .../v3/default_record_types.yml | 152 +++++ .../keeper_secrets_manager_helper/v3/enum.py | 299 +++++++++ .../v3/field_type.py | 620 ++++++++++++++++++ .../v3/parser.py | 150 +++++ .../v3/record.py | 367 +++++++++++ .../v3/record_type.py | 209 ++++++ sdk/python/helper/requirements.txt | 3 + sdk/python/helper/setup.py | 52 ++ sdk/python/helper/tests/record_file_test.py | 125 ++++ sdk/python/helper/tests/record_test.py | 47 ++ .../tests/v3/v3_field_type_all_fields_test.py | 309 +++++++++ .../helper/tests/v3/v3_field_type_test.py | 52 ++ sdk/python/helper/tests/v3/v3_parser_test.py | 105 +++ sdk/python/helper/tests/v3/v3_record_test.py | 255 +++++++ .../helper/tests/v3/v3_record_type_test.py | 86 +++ 42 files changed, 4051 insertions(+), 47 deletions(-) create mode 100644 .github/workflows/publish.pypi.helper.yml create mode 100644 .github/workflows/test.python.helper.yml create mode 100644 integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/common.py create mode 100644 integration/keeper_secrets_manager_cli/tests/config_test.py create mode 100644 sdk/python/helper/LICENSE create mode 100644 sdk/python/helper/MANIFEST.in create mode 100644 sdk/python/helper/README.md create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/__init__.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/common.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/exception.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/field.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/field_type.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/format.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/record.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/record_type.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/v3/__init__.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/v3/default_record_types.yml create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/v3/enum.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/v3/field_type.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/v3/parser.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/v3/record.py create mode 100644 sdk/python/helper/keeper_secrets_manager_helper/v3/record_type.py create mode 100644 sdk/python/helper/requirements.txt create mode 100644 sdk/python/helper/setup.py create mode 100644 sdk/python/helper/tests/record_file_test.py create mode 100644 sdk/python/helper/tests/record_test.py create mode 100644 sdk/python/helper/tests/v3/v3_field_type_all_fields_test.py create mode 100644 sdk/python/helper/tests/v3/v3_field_type_test.py create mode 100644 sdk/python/helper/tests/v3/v3_parser_test.py create mode 100644 sdk/python/helper/tests/v3/v3_record_test.py create mode 100644 sdk/python/helper/tests/v3/v3_record_type_test.py diff --git a/.github/workflows/publish.pypi.helper.yml b/.github/workflows/publish.pypi.helper.yml new file mode 100644 index 00000000..45279ef1 --- /dev/null +++ b/.github/workflows/publish.pypi.helper.yml @@ -0,0 +1,45 @@ +name: Publish to PyPI (KSM SDK Helper) +on: + workflow_dispatch: + +jobs: + publish-pypi: + name: Publish KSM SDK Helper to PyPI + environment: prod + runs-on: ubuntu-latest + timeout-minutes: 10 # To keep builds from running too long + + defaults: + run: + working-directory: ./sdk/python/helper + + steps: + - name: Get the source code + uses: actions/checkout@v2 + + - name: Set up Python 3.9 + uses: actions/setup-python@v1 + with: + python-version: 3.9 + + - name: Retrieve secrets from KSM + id: ksmsecrets + uses: Keeper-Security/ksm-action@master + with: + keeper-secret-config: ${{ secrets.KSM_PYPI_PUBLISHER_PYPI_SDK_CONFIG }} + secrets: | + -aBWi3-yU_qvatNh0Eaqew/field/password > PYPI_API_TOKEN + + - name: Install dependencies + run: | + python3 -m pip install --upgrade setuptools pip wheel twine + python3 -m pip install -r requirements.txt + + - name: Build and Publish + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ steps.ksmsecrets.outputs.PYPI_API_TOKEN }} + run: | + python3 setup.py build + python3 setup.py sdist + python3 -m twine upload --verbose dist/* diff --git a/.github/workflows/test.cli.yml b/.github/workflows/test.cli.yml index 5cd5acf1..28e7cc45 100644 --- a/.github/workflows/test.cli.yml +++ b/.github/workflows/test.cli.yml @@ -32,6 +32,20 @@ jobs: run: | python3 setup.py build install + ########## KSM Python Helper (from source) + + - name: Install SDK Helper dependencies + working-directory: ./sdk/python/helper + run: | + python3 -m pip install --upgrade pip + python3 -m pip install -r requirements.txt + python3 -m pip install -e . + + - name: Install SDK Helper for integrations + working-directory: ./sdk/python/helper + run: | + python3 setup.py build install + ########## CLI - name: Install CLI dependencies diff --git a/.github/workflows/test.python.helper.yml b/.github/workflows/test.python.helper.yml new file mode 100644 index 00000000..d4055541 --- /dev/null +++ b/.github/workflows/test.python.helper.yml @@ -0,0 +1,46 @@ +name: Test-Python-Helper + +on: + pull_request: + branches: [ master ] + +jobs: + test-cli: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + + ########## KSM Python SDK (from source) + + - name: Install SDK dependencies + working-directory: ./sdk/python/core + run: | + python3 -m pip install --upgrade pip + python3 -m pip install -r requirements.txt + python3 -m pip install -e . + + - name: Install SDK for integrations + working-directory: ./sdk/python/core + run: | + python3 setup.py build install + + ########## PYTHON HELPER + + - name: Install CLI dependencies + working-directory: ./sdk/python/helper + run: | + python3 -m pip install -r requirements.txt + python3 -m pip install pytest pytest-cov + + - name: Run CLI tests + working-directory: ./sdk/python/helper + run: | + PYTHONPATH=$PWD pytest diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__init__.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__init__.py index 0e59c498..95bf2928 100644 --- a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__init__.py +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__init__.py @@ -14,6 +14,8 @@ from keeper_secrets_manager_core.core import KSMCache from keeper_secrets_manager_core.storage import InMemoryKeyValueStorage from keeper_secrets_manager_core.configkeys import ConfigKeys +from keeper_secrets_manager_cli.common import find_ksm_path +from keeper_secrets_manager_helper.record_type import RecordType from distutils.util import strtobool from .exception import KsmCliException from .profile import Profile @@ -27,13 +29,22 @@ class KeeperCli: def get_client(**kwargs): return SecretsManager(**kwargs) - def __init__(self, ini_file=None, profile_name=None, output=None, use_color=None, use_cache=None): + def __init__(self, ini_file=None, profile_name=None, output=None, use_color=None, use_cache=None, + record_type_dir=None, editor=None, editor_use_blocking=False, editor_process_name=None): self.profile = Profile(cli=self, ini_file=ini_file) self._client = None self.log_level = os.environ.get("KSM_DEBUG", None) self.use_color = use_color + self.record_type_dir = record_type_dir + + # The editor to launch ... however this might be a bat or cmd file, not the real application + self.editor = editor + # Some application don't block. To enabling blocking the CLI, set this to True + self.editor_use_blocking = editor_use_blocking + # Blocking might be waiting until a process in the task list goes away. This is that process. + self.editor_process_name = editor_process_name self.use_cache = use_cache @@ -56,6 +67,7 @@ def __init__(self, ini_file=None, profile_name=None, output=None, use_color=None config_storage.set(ConfigKeys.KEY_PRIVATE_KEY, self.config.get("privateKey")) config_storage.set(ConfigKeys.KEY_APP_KEY, self.config.get("appKey")) config_storage.set(ConfigKeys.KEY_HOSTNAME, self.config.get("hostname")) + config_storage.set(ConfigKeys.KEY_OWNER_PUBLIC_KEY, self.config.get("appOwnerPublicKey")) common_profile = self.profile.get_profile_config(Profile.config_profile) @@ -74,6 +86,23 @@ def __init__(self, ini_file=None, profile_name=None, output=None, use_color=None # By default, use colors. if self.use_color is None: self.use_color = bool(strtobool(common_profile.get(Profile.color_key, str(True)))) + + if self.record_type_dir is None: + self.record_type_dir = common_profile.get(Profile.record_type_dir_key, None) + if self.record_type_dir is None: + self.record_type_dir = find_ksm_path("record_type", is_file=False) + + # If the have a directory where record type schema files may exists, attempt to load + # them. + if self.record_type_dir is not None and os.path.exists(self.record_type_dir) is True: + RecordType.find_and_load_record_type_schema_files(self.record_type_dir) + + # Get the editor to use for visual editing a record + if self.editor is None: + self.editor = common_profile.get(Profile.editor_key, None) + self.editor_use_blocking = bool(strtobool(common_profile.get(Profile.editor_use_blocking_key, + str(editor_use_blocking)))) + self.editor_process_name = common_profile.get(Profile.editor_process_name_key, editor_process_name) else: # Set the log level. We don't have the client to set the level, so set it here. if use_color is None: diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py index 677ec97f..073e8a79 100644 --- a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py @@ -54,7 +54,15 @@ class AliasedGroup(HelpColorsGroup): "notation", "update", "version", - "password" + "password", + "template", + "add", + "editor", + "field", + "record", + "file", + "cache", + "record-type-dir" ] alias_commands = { @@ -542,13 +550,195 @@ def secret_password_command(ctx, length, lc, uc, d, sc): ) +# SECRET TEMPLATE COMMAND + + +@click.group( + name='template', + cls=AliasedGroup, + help_headers_color='yellow', + help_options_color='green' +) +def secret_template_command(): + """Record and field information""" + + +@click.command( + name='record', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.pass_context +@click.option('--show-list', '-l', is_flag=True, help='List available record types.') +@click.option('--output-format', '-o', type=click.Choice(['yaml', 'json'], case_sensitive=False), default='json', + help='File format to export.') +@click.option('--output-file', '-f', type=str, help='Write template to a file.') +@click.option('--version', type=click.Choice(['v3'], case_sensitive=False), default='v3', + help='Record version.') +@click.argument('record_type', type=str, nargs=-1) +def secret_template_record_command(ctx, show_list, output_format, output_file, version, record_type): + """Get a record type or list available record types""" + + if show_list is True: + print("", file=sys.stderr) + ctx.obj["secret"].get_record_type_list(version=version) + print("", file=sys.stderr) + else: + if record_type is None or len(record_type) == 0: + raise KsmCliException("A record type is required.") + + print("", file=sys.stderr) + ctx.obj["secret"].get_record_type_template( + record_type=record_type[0], + version=version, + output_format=output_format, + file=output_file + ) + print("", file=sys.stderr) + + +@click.command( + name='field', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.pass_context +@click.option('--show-list', '-l', is_flag=True, help='List available fields types.') +@click.option('--output-format', '-o', type=click.Choice(['yaml', 'json'], case_sensitive=False), default='json', + help='Display field schema in this format.') +@click.option('--version', type=click.Choice(['v3'], case_sensitive=False), default='v3', + help='Record version.') +@click.argument('field_type', type=str, nargs=-1) +def secret_template_field_command(ctx, show_list, output_format, version, field_type): + """List field types and field schemas""" + if show_list is True: + print("", file=sys.stderr) + ctx.obj["secret"].get_field_type_list(version=version) + else: + if field_type is None or len(field_type) == 0: + raise KsmCliException("A field type is required.") + + print("", file=sys.stderr) + ctx.obj["secret"].get_field_type_schema( + field_type=field_type[0], + output_format=output_format, + version=version + ) + print("", file=sys.stderr) + + +secret_template_command.add_command(secret_template_record_command) +secret_template_command.add_command(secret_template_field_command) + + +# SECRET ADD COMMAND + + +@click.group( + name='add', + cls=AliasedGroup, + help_headers_color='yellow', + help_options_color='green' +) +def secret_add_command(): + """Add a secret record to a folder""" + + +@click.command( + name='editor', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.pass_context +@click.option('--shared-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") +@click.option('--record-type', '--rt', required=True, type=str, help="Record type") +@click.option('--password-generate', '-p', is_flag=True, help='Generate passwords for empty password fields.') +@click.option('--title', '-t', type=str, help="Record title") +@click.option('--notes', '-n', type=str, help="Record simple note") +@click.option('--output-format', '-o', type=click.Choice(['yaml', 'json'], case_sensitive=False), default='json', + help='File format to display in editor.') +@click.option('--editor', '-e', type=str, help='Application to use to edit record data.') +@click.option('--version', type=click.Choice(['v3'], case_sensitive=False), default='v3', help='Record version.') +def secret_add_editor_command(ctx, shared_folder_uid, record_type, password_generate, title, notes, + output_format, editor, version): + """Add a secret record via a text editor""" + + ctx.obj["secret"].add_record_interactive( + version=version, + folder_uid=shared_folder_uid, + record_type=record_type, + output_format=output_format, + password_generate_flag=password_generate, + title=title, + notes=notes, + editor=editor + ) + print("", file=sys.stderr) + + +@click.command( + name='file', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.pass_context +@click.option('--shared-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") +@click.option('--file', '-f', required=True, type=str, help='Add records from record script file.') +@click.option('--password-generate', '-p', is_flag=True, help='Generate passwords for empty password fields.') +def secret_add_file_command(ctx, shared_folder_uid, file, password_generate): + """Add a secret record(s) from a file""" + + ctx.obj["secret"].add_record_from_file( + folder_uid=shared_folder_uid, + file=file, + password_generate_flag=password_generate, + ) + print("", file=sys.stderr) + + +@click.command( + name='field', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.pass_context +@click.option('--shared-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") +@click.option('--record-type', '--rt', required=True, type=str, help="Record type") +@click.option('--title', '-t', required=True, type=str, help="Record title") +@click.option('--password-generate', '-p', is_flag=True, help='Generate passwords for empty password fields.') +@click.option('--notes', '-n', type=str, help="Record simple note") +@click.option('--version', type=click.Choice(['v3'], case_sensitive=False), default='v3', help='Record version.') +@click.argument('field_args', type=str, nargs=-1) +def secret_add_field_command(ctx, shared_folder_uid, record_type, title, password_generate, notes, version, + field_args): + """Add a secret record from a command line field arguments""" + + ctx.obj["secret"].add_record_from_field_args( + version=version, + folder_uid=shared_folder_uid, + password_generate_flag=password_generate, + record_type=record_type, + title=title, + notes=notes, + field_args=list(field_args) + ) + print("", file=sys.stderr) + + +secret_add_command.add_command(secret_add_field_command) +secret_add_command.add_command(secret_add_file_command) +secret_add_command.add_command(secret_add_editor_command) + + secret_command.add_command(secret_list_command) secret_command.add_command(secret_get_command) secret_command.add_command(secret_notation_command) secret_command.add_command(secret_update_command) +secret_command.add_command(secret_add_command) secret_command.add_command(secret_download_command) secret_command.add_command(secret_totp_command) secret_command.add_command(secret_password_command) +secret_command.add_command(secret_template_command) # EXEC COMMAND @@ -607,20 +797,64 @@ def config_log_command(ctx, enable): @click.command( - name='record-cache', + name='cache', cls=HelpColorsCommand, help_options_color='blue' ) @click.option('--enable/--disable', required=True, help="Enable or disable cache.") @click.pass_context def config_cache_command(ctx, enable): - """Enable or disable cache""" + """Enable or disable record cache""" ctx.obj["profile"].set_cache(enable) +@click.command( + name='record-type-dir', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.option('--directory', "-d", type=str, help='Location of record type schema directory') +@click.option('--clear', is_flag=True, help='Clear location of record type schema directory') +@click.pass_context +def config_rt_dir_command(ctx, directory, clear): + """Set the directory that contains record type schemas""" + + if clear is True: + directory = None + elif directory is None: + raise KsmCliException("Either a --directory is required or the --clear flag set") + + ctx.obj["profile"].set_record_type_dir(directory) + + +@click.command( + name='editor', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.option('--application', "--app", type=str, help='Application path and name to use for editor.') +@click.option('--blocking', is_flag=True, help='Application requires blocking.') +@click.option('--process-name', type=str, help='Application process name.') +@click.option('--clear', is_flag=True, help='Clear location of record type schema directory') +@click.pass_context +def config_editor_command(ctx, application, blocking, process_name, clear): + """Set the editor to use for record editing""" + + if clear is True: + application = None + blocking = False + process_name = None + if clear is not True and application is None: + raise KsmCliException("Either a --application is required or the --clear flag set") + + ctx.obj["profile"].set_editor(editor=application, use_blocking=blocking, process_name=process_name) + + config_command.add_command(config_show_command) config_command.add_command(config_log_command) config_command.add_command(config_cache_command) +config_command.add_command(config_rt_dir_command) +config_command.add_command(config_editor_command) # REDEEM COMMAND diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/common.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/common.py new file mode 100644 index 00000000..d46472d3 --- /dev/null +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/common.py @@ -0,0 +1,105 @@ +import os +import shutil +import platform +import subprocess +import time +import psutil + + +def find_ksm_path(find_path, is_file=True): + + # Directories to scan for the keeper INI file. This both Linux and Windows paths. The os.path.join + # should create a path that the OS understands. The not_set stuff in case the environmental var is not set. + # The last entry is the current working directory. + not_set = "_NOTSET_" + dir_locations = [ + [os.environ.get("KSM_INI_DIR", not_set)], + [os.getcwd()], + + # Linux + [os.environ.get("HOME", not_set)], + + # This seems like where other applications like to store their configs. + [os.environ.get("HOME", not_set), ".config", "ksm"], + + [os.environ.get("HOME", not_set), ".keeper"], + ["/etc"], + ["/etc", "ksm"], + ["/etc", "keeper"], + + # Windows + [os.environ.get("USERPROFILE", not_set)], + [os.environ.get("APPDIR", not_set)], + [os.environ.get("PROGRAMDATA", not_set), "Keeper"], + [os.environ.get("PROGRAMFILES", not_set), "Keeper"], + ] + + for dir_location in dir_locations: + path = os.path.join(*dir_location, find_path) + if (is_file is True and os.path.exists(path) and os.path.isfile(path)) or os.path.exists(path): + return path + + return None + + +def launch_editor(file, editor=None, use_blocking=False, process_name=None): + + if editor is None: + + editor = os.environ.get("EDITOR") + if editor is None: + # If no editor is try to find one. + if platform.system() == "Windows": + # If someone installed Visual Code, use that first. It had a nice JSON and YAML syntax tester. Else + # call back to good old notepad + editor_list = [ + {"cmd": "code.cmd", "use_blocking": True, "process_name": "code.exe"}, + {"cmd": "notepad.exe"} + ] + else: + # MacOS and Linux use the same list. nano is the default command line editor for both MacOS and Linux. + editor_list = [ + {"cmd": "nano"}, + {"cmd": "vim"}, + {"cmd": "vi"}, + {"cmd": "emacs"} + ] + for editor_file in editor_list: + located = shutil.which(editor_file.get("cmd")) + if located is not None: + editor = located + use_blocking = editor_file.get("use_blocking") + process_name = editor_file.get("process_name", editor_file.get("cmd")) + break + if editor is None: + raise FileNotFoundError("Cannot find an editor. Please configure an editor in the CLI or set the " + "environmental variable 'EDITOR' with the name, and path if required, of a " + "text editor.") + + cmd = [editor, file] + + # Windows and MacOS may launch an application that doesn't block until the application exists. Or the application + # launches another application and exists. If we are using blocking we are going to either cause blocking on + # the way we launch the application (MacOS) or monitor the processes until the application exits. + if use_blocking is True: + # In MacOS, opening the application with -W will wait until the application exits before continuing. The + # application needs to completely exit to continue. I mean completely exit, not just that windows closed. + if platform.system() == "Darwin": + cmd = ["open", "-W", "-a"] + cmd + subprocess.call(cmd) + + # Check the task list to see if the application is running. Once it is not, break out the while loop. + elif platform.system() == "Windows": + subprocess.call(cmd) + + while True: + time.sleep(2) + process_found = False + for proc in psutil.process_iter(): + if proc.name().lower() in process_name.lower() and proc.status() == psutil.STATUS_RUNNING: + process_found = True + break + if process_found is False: + break + else: + subprocess.call(cmd) diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/exception.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/exception.py index da5a74a8..f71c2177 100644 --- a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/exception.py +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/exception.py @@ -17,3 +17,7 @@ def format_message(self): def __str__(self): return self.colorize() + + +class KsmRecordSyntaxException: + pass diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/profile.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/profile.py index 190e27bc..2f79c10c 100644 --- a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/profile.py +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/profile.py @@ -13,6 +13,7 @@ import os import configparser from keeper_secrets_manager_cli.exception import KsmCliException +from keeper_secrets_manager_cli.common import find_ksm_path from keeper_secrets_manager_core.storage import InMemoryKeyValueStorage from keeper_secrets_manager_core.configkeys import ConfigKeys from keeper_secrets_manager_core.exceptions import KeeperError, KeeperAccessDenied @@ -32,6 +33,10 @@ class Profile: default_ini_file = os.environ.get("KSM_INI_FILE", "keeper.ini") color_key = "color" cache_key = "cache" + record_type_dir_key = "record_type_dir" + editor_key = "editor" + editor_use_blocking_key = "editor_use_blocking" + editor_process_name_key = "editor_process_name" def __init__(self, cli, ini_file=None): @@ -109,34 +114,8 @@ def save(self): @staticmethod def find_ini_config(): - - # Directories to scan for the keeper INI file. This both Linux and Windows paths. The os.path.join - # should create a path that the OS understands. The not_set stuff in case the environmental var is not set. - # The last entry is the current working directory. - not_set = "_NOTSET_" - dir_locations = [ - [os.environ.get("KSM_INI_DIR", not_set)], - [os.getcwd()], - - # Linux - [os.environ.get("HOME", not_set)], - [os.environ.get("HOME", not_set), ".keeper"], - ["/etc"], - ["/etc", "keeper"], - - # Windows - [os.environ.get("USERPROFILE", not_set)], - [os.environ.get("APPDIR", not_set)], - [os.environ.get("PROGRAMDATA", not_set), "Keeper"], - [os.environ.get("PROGRAMFILES", not_set), "Keeper"], - ] - - for dir_location in dir_locations: - path = os.path.join(*dir_location, Profile.default_ini_file) - if os.path.exists(path) and os.path.isfile(path): - return path - - return None + file = find_ksm_path(Profile.default_ini_file, is_file=True) + return file def get_config(self): self._load_config() @@ -239,7 +218,8 @@ def init(token, ini_file=None, server=None, profile_name=None): "clientId": "", "privateKey": "", "appKey": "", - "hostname": "" + "hostname": "", + "appOwnerPublicKey": "" } for k, v in config_storage.config.items(): @@ -251,7 +231,10 @@ def init(token, ini_file=None, server=None, profile_name=None): print("Added profile {} to INI config file located at {}".format(profile_name, ini_file), file=sys.stderr) - def list_profiles(self, output='text', use_color=True): + def list_profiles(self, output='text', use_color=None): + + if use_color is None: + use_color = self.cli.use_color profiles = [] @@ -371,20 +354,59 @@ def import_config(config_base64, file=None, profile_name=None): print("Imported config saved to profile {} at {}.".format(profile_name, file), file=sys.stderr) def set_color(self, on_off): - common_config = self._get_common_config("Cannot set log level.") + common_config = self._get_common_config("Cannot set color settings.") common_config[Profile.color_key] = str(on_off) self.cli.use_color = on_off self.save() def set_cache(self, on_off): - common_config = self._get_common_config("Cannot set log level.") + common_config = self._get_common_config("Cannot set record cache.") common_config[Profile.cache_key] = str(on_off) self.cli.use_color = on_off self.save() + def set_record_type_dir(self, directory): + common_config = self._get_common_config("Cannot set the record type directory.") + if directory is None: + del common_config[Profile.record_type_dir_key] + else: + if os.path.exists(directory) is False: + raise FileNotFoundError(f"Cannot find the directory 'directory' for record type schemas.") + common_config[Profile.record_type_dir_key] = str(directory) + self.cli.record_type_dir = directory + self.save() + + def set_editor(self, editor, use_blocking=None, process_name=None): + common_config = self._get_common_config("Cannot set editor.") + if editor is None: + common_config.pop(Profile.editor_key, None) + common_config.pop(Profile.editor_use_blocking_key, None) + common_config.pop(Profile.editor_process_name_key, None) + else: + common_config[Profile.editor_key] = editor + if use_blocking is not None: + common_config[Profile.editor_use_blocking_key] = str(use_blocking) + if process_name is not None: + common_config[Profile.editor_process_name_key] = process_name + self.cli.editor = editor + self.cli.editor_use_blocking = use_blocking + self.save() + def show_config(self): common_config = self._get_common_config("Cannot show the config.") + + table = Table(use_color=self.cli.use_color) + table.add_column("Config Item", data_color=Fore.GREEN) + table.add_column("Value", data_color=Fore.YELLOW, allow_wrap=True) + not_set_text = "-NOT SET-" - print("Active Profile: {}".format(common_config.get(Profile.active_profile_key, not_set_text))) - print("Cache Enabled: {}".format(common_config.get(Profile.cache_key, not_set_text))) - print("Color Enabled: {}".format(common_config.get(Profile.color_key, not_set_text))) + table.add_row(["Active Profile", common_config.get(Profile.active_profile_key, not_set_text)]) + table.add_row(["Cache Enabled", common_config.get(Profile.cache_key, not_set_text)]) + table.add_row(["Color Enabled", common_config.get(Profile.color_key, not_set_text)]) + table.add_row(["Record Type Directory", common_config.get(Profile.record_type_dir_key, not_set_text)]) + table.add_row(["Editor", "{} ({})".format( + common_config.get(Profile.editor_key, not_set_text), + common_config.get(Profile.editor_process_name_key, "NA") + )]) + table.add_row(["Editor Blocking", common_config.get(Profile.editor_use_blocking_key, not_set_text)]) + self.cli.output(table.get_string()) diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py index d5c51684..e42c0f59 100644 --- a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py @@ -11,18 +11,25 @@ # import json +import yaml +import os +import re from jsonpath_rw_ext import parse import sys from colorama import Fore, Style from keeper_secrets_manager_cli.exception import KsmCliException +from keeper_secrets_manager_cli.common import launch_editor from keeper_secrets_manager_core.core import SecretsManager from keeper_secrets_manager_core.utils import get_totp_code, generate_password as sdk_generate_password +from keeper_secrets_manager_helper.record import Record +from keeper_secrets_manager_helper.field_type import FieldType +from keeper_secrets_manager_helper.exception import FileSyntaxException from .table import Table, ColumnAlign import uuid +import tempfile class Secret: - # Maps the type in a field to what it should pull in from the real record. There # might be multiple field that need to be pulled in. support_ref_types = { @@ -235,8 +242,8 @@ def _format_record(record_dict, use_color=True): ret += table.get_string() + "\n" if len(problems) > 0: - ret += " !! Found duplicate labels ({}). When accessing custom fields the first record found will be "\ - "returned.\n".format(",".join(problems)) + ret += " !! Found duplicate labels ({}). When accessing custom fields the first record found will" \ + "be returned.\n".format(",".join(problems)) if len(record_dict["files"]) > 0: ret += "\n" @@ -338,7 +345,10 @@ def _query_jsonpath(self, jsonpath_query, records, force_array): raise KsmCliException("JSONPath failed: {}".format(err)) def query(self, uids=None, titles=None, field=None, output_format='json', jsonpath_query=None, - force_array=False, load_references=False, unmask=False, use_color=True, inflate=True): + force_array=False, load_references=False, unmask=False, use_color=None, inflate=True): + + if use_color is None: + use_color = self.cli.use_color if uids is None: uids = [] @@ -404,7 +414,10 @@ def _format_list(record_dict, use_color=True): table.add_row([record["uid"], record["type"], record["title"]]) return "\n" + table.get_string() + "\n" - def secret_list(self, uids=None, output_format='json', use_color=True): + def secret_list(self, uids=None, output_format='json', use_color=None): + + if use_color is None: + use_color = self.cli.user_color record_dict = self.query(uids=uids, output_format='dict', unmask=True, use_color=use_color) if output_format == 'text': @@ -551,6 +564,149 @@ def update(self, uid, fields=None, custom_fields=None, fields_json=None, custom_ except Exception as err: raise KsmCliException("Could not save record: {}".format(err)) + def _check_if_can_add_records(self): + # Check to see if appOwnerPublicKey is in the keeper.ini. It's a newly added key and if the + # profile is too old we can't add a record. + profile_config = self.cli.profile.get_profile_config(self.cli.profile.get_active_profile_name()) + if profile_config.get("appOwnerPublicKey") is None: + raise KsmCliException("Your profile is out of date. It is missing the application order key. " + "To create a record you will need to init a profile with a new token.") + + def add_record_interactive(self, version, folder_uid, record_type, output_format, + password_generate_flag, title=None, notes=None, editor=None): + self._check_if_can_add_records() + + # If the editor was passed in, assume it doesn't need blocking. + editor_use_blocking = False + editor_process_name = None + + # If the editor was not passed in, use the editor set in the config. If not set, the code will + # attempt to find and editor later. + if editor is None: + editor = self.cli.editor + editor_use_blocking = self.cli.editor_use_blocking + editor_process_name = self.cli.editor_process_name + + # Build a templated record with placeholders <#ADD> + template = Record(version).get_template( + record_type=record_type, + output_format=output_format, + title=title, + notes=notes + ) + + temp_filename = None + try: + # Write the template file and close it. Windows doesn't like to share open files. The finally will handle + # deleting the file, so set delete=False so the tempfile doesn't delete it when closed. + tf = tempfile.NamedTemporaryFile("w+", suffix=f".{output_format}", delete=False) + temp_filename = tf.name + tf.write(template) + tf.close() + + launch_the_editor = True + + while True: + + if launch_the_editor is True: + + # Launch the editor + launch_editor( + file=temp_filename, + editor=editor, + use_blocking=editor_use_blocking, + process_name=editor_process_name + ) + + with open(temp_filename, 'r') as fh: + record_data = fh.read() + fh.close() + if re.search(r'<#ADD', record_data, re.MULTILINE) is not None: + print(Fore.RED + "Found template markers (#ADD) still in the record data. Either " + + "add a value or remove the line completely. Enter 'r' to recheck " + + "the file if the file was processed before you finished editing. " + Style.RESET_ALL) + ynq = input("Do you wish to edit? Y/n/r/q: ") + if ynq == "" or ynq[0].lower() == "y": + launch_the_editor = True + continue + if ynq[0].lower() == "r": + # If rechecking, don't launch the editor + launch_the_editor = False + continue + if ynq[0].lower() == "q": + print("Not adding record.") + return + + try: + # When saved, import the file + self.add_record_from_file( + folder_uid=folder_uid, + file=temp_filename, + password_generate_flag=password_generate_flag + ) + # All is good break out of the loop + break + except FileSyntaxException as err: + ynq = input(Fore.RED + str(err) + Style.RESET_ALL + + "Do you wish to edit and try again? Y/n/q: ") + except Exception as err: + ynq = input(Fore.RED + f"Could not create the record: {err}. " + Style.RESET_ALL + + "Do you wish to edit and try again? Y/n/q: ") + + if ynq == "" or ynq[0].lower() == "y": + launch_the_editor = True + continue + if ynq[0].lower() == "q": + print("Not adding record.") + return + + except Exception as err: + raise KsmCliException(f"Could not edit the record template file: {err}") + finally: + if temp_filename is not None: + os.unlink(temp_filename) + + def add_record_from_file(self, folder_uid, file, password_generate_flag): + + self._check_if_can_add_records() + + try: + records = Record.create_from_file(file, password_generate=password_generate_flag) + record_uids = [] + for record in records: + record_create_obj = record.get_record_create_obj() + record_uid = self.cli.client.create_secret(folder_uid, record_create_obj) + record_uids.append(record_uid) + except FileSyntaxException as err: + raise KsmCliException(str(err)) + except Exception as err: + raise KsmCliException(f"Could not load records from file {file}: {err}") + + print("The following is the new record UIDs in JSON ...", file=sys.stderr) + return self.cli.output(json.dumps(record_uids)) + + def add_record_from_field_args(self, version, folder_uid, password_generate_flag, record_type, + title, notes, field_args): + + self._check_if_can_add_records() + + try: + records = Record(version).create_from_field_args( + record_type=record_type, + title=title, + notes=notes, + field_args=field_args, + password_generate=password_generate_flag + ) + record = records[0] + record_create_obj = record.get_record_create_obj() + record_uid = self.cli.client.create_secret(folder_uid, record_create_obj) + except Exception as err: + raise KsmCliException(f"{err}") + + print("The following is the new record UID ...", file=sys.stderr) + return self.cli.output(record_uid) + def generate_password(self, length, lowercase, uppercase, digits, special_characters): new_password = sdk_generate_password( @@ -562,3 +718,43 @@ def generate_password(self, length, lowercase, uppercase, digits, special_charac ) return self.cli.output(new_password) + + def get_record_type_template(self, record_type, output_format, version, file): + + if file is not None: + self.cli.output_name = file + return self.cli.output(Record(version).get_template( + record_type=record_type, + output_format=output_format + )) + + def get_record_type_list(self, version): + + record_type_list = Record(version).get_template_list() + + table = Table(use_color=self.cli.use_color) + table.add_column("Record Type", allow_wrap=True, data_color=Fore.GREEN) + + for record_type in record_type_list: + table.add_row([record_type]) + + return self.cli.output(table.get_string()) + + def get_field_type_list(self, version): + field_type_list = FieldType.get_field_type_list(version) + + table = Table(use_color=self.cli.use_color) + table.add_column("Field Type", allow_wrap=True, data_color=Fore.GREEN) + + for field_type in field_type_list: + table.add_row([field_type]) + + return self.cli.output(table.get_string()) + + def get_field_type_schema(self, field_type, output_format, version): + schema = FieldType.get_field_type_schema(field_type, version) + + if output_format == "json": + return self.cli.output(json.dumps(schema, indent=4)) + + return self.cli.output(yaml.dump(schema)) diff --git a/integration/keeper_secrets_manager_cli/requirements.txt b/integration/keeper_secrets_manager_cli/requirements.txt index 0fc5420c..cdd3fe70 100644 --- a/integration/keeper_secrets_manager_cli/requirements.txt +++ b/integration/keeper_secrets_manager_cli/requirements.txt @@ -1,4 +1,5 @@ keeper-secrets-manager-core>=16.2.2 +keeper-secrets-manager-helper prompt-toolkit~=2.0 jsonpath-rw-ext colorama @@ -7,4 +8,5 @@ click click_help_colors click-repl pyyaml -update-checker \ No newline at end of file +update-checker +psutil \ No newline at end of file diff --git a/integration/keeper_secrets_manager_cli/setup.py b/integration/keeper_secrets_manager_cli/setup.py index 91fa6968..f763a63c 100644 --- a/integration/keeper_secrets_manager_cli/setup.py +++ b/integration/keeper_secrets_manager_cli/setup.py @@ -17,7 +17,8 @@ 'colorama', 'importlib_metadata', 'pyyaml', - 'update-checker' + 'update-checker', + 'psutil' ] # Version set in the keeper_secrets_manager_cli.version file. diff --git a/integration/keeper_secrets_manager_cli/tests/config_test.py b/integration/keeper_secrets_manager_cli/tests/config_test.py new file mode 100644 index 00000000..d21b7a58 --- /dev/null +++ b/integration/keeper_secrets_manager_cli/tests/config_test.py @@ -0,0 +1,108 @@ +import os +import unittest +from click.testing import CliRunner +import keeper_secrets_manager_cli +from keeper_secrets_manager_cli.__main__ import cli +from keeper_secrets_manager_cli.export import Export +from keeper_secrets_manager_core.mock import MockConfig +import tempfile +from colorama import Fore + + +class ConfigTest(unittest.TestCase): + + def setUp(self) -> None: + self.orig_dir = os.getcwd() + self.temp_dir = tempfile.TemporaryDirectory() + os.chdir(self.temp_dir.name) + + # Make a fake keeper.ini file. + export = Export(config=MockConfig().make_config(), file_format="ini", plain=True) + with open("keeper.ini", "w") as fh: + fh.write(export.run().decode()) + fh.close() + + def tearDown(self) -> None: + os.chdir(self.orig_dir) + + def test_config_color(self): + + runner = CliRunner() + result = runner.invoke(cli, ['config', 'color', "--enable"], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on color enable") + + result = runner.invoke(cli, ['profile', 'list'], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on profile list") + + assert(Fore.YELLOW in result.output) + + result = runner.invoke(cli, ['config', 'color', "--disable"], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on color disable") + + result = runner.invoke(cli, ['profile', 'list'], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on profile list") + + assert(Fore.YELLOW not in result.output) + + def test_config_cache(self): + + runner = CliRunner() + result = runner.invoke(cli, ['config', 'cache', "--enable"], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on record cache enable") + + client = keeper_secrets_manager_cli.KeeperCli() + self.assertEqual(True, client.use_cache, "did not get True value record cache enable") + + result = runner.invoke(cli, ['config', 'cache', "--disable"], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on record cache enable") + + client = keeper_secrets_manager_cli.KeeperCli() + self.assertEqual(False, client.use_cache, "did not get False value record cache disable") + + def test_config_record_type_directory(self): + + runner = CliRunner() + result = runner.invoke(cli, ['config', 'record-type-dir', '-d', self.temp_dir.name], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on record cache enable") + + client = keeper_secrets_manager_cli.KeeperCli() + self.assertEqual(self.temp_dir.name, client.record_type_dir, "did not get the record type directory") + + result = runner.invoke(cli, ['config', 'record-type-dir', "--clear"], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on record cache enable") + + client = keeper_secrets_manager_cli.KeeperCli() + self.assertNotEqual(self.temp_dir.name, client.record_type_dir, "record type directory is not the temp dir") + + def test_config_editor(self): + + runner = CliRunner() + result = runner.invoke(cli, ['config', 'editor', + '--app', 'TextMate', '--blocking'], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on editor set") + + client = keeper_secrets_manager_cli.KeeperCli() + self.assertEqual("TextMate", client.editor, "did not get the correct editor") + self.assertEqual(True, client.editor_use_blocking, "did not get the correct editor blocking") + + result = runner.invoke(cli, ['config', 'editor', '--clear'], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on editor clear") + + client = keeper_secrets_manager_cli.KeeperCli() + self.assertIsNone(client.editor, "editor is not None") + + result = runner.invoke(cli, ['config', 'editor', + '--app', 'code.cmd', '--process-name', "code.exe"], + catch_exceptions=False) + client = keeper_secrets_manager_cli.KeeperCli() + self.assertEqual(0, result.exit_code, "did not get a success on editor set") + self.assertEqual("code.cmd", client.editor, "did not get the correct editor") + self.assertEqual("code.exe", client.editor_process_name, "did not get the correct editor process") + + def test_config_show(self): + + # Just make sure no error is thrown + + runner = CliRunner() + result = runner.invoke(cli, ['config', 'show'], catch_exceptions=False) + self.assertEqual(0, result.exit_code, "did not get a success on editor set") diff --git a/integration/keeper_secrets_manager_cli/tests/secret_inflate_test.py b/integration/keeper_secrets_manager_cli/tests/secret_inflate_test.py index 581100c8..b6ae87bb 100644 --- a/integration/keeper_secrets_manager_cli/tests/secret_inflate_test.py +++ b/integration/keeper_secrets_manager_cli/tests/secret_inflate_test.py @@ -79,6 +79,10 @@ def test_get(self): runner = CliRunner() result = runner.invoke(cli, ['secret', 'get', '-u', main.uid, '--unmask', '--json'], catch_exceptions=False) + + print("-----------------") + print(result.output) + print("-----------------") result = json.loads(result.output) card_ref = next((item for item in result["fields"] if item["type"] == "cardRef"), None) diff --git a/integration/keeper_secrets_manager_cli/tests/secret_test.py b/integration/keeper_secrets_manager_cli/tests/secret_test.py index 1cf0c570..f1086059 100644 --- a/integration/keeper_secrets_manager_cli/tests/secret_test.py +++ b/integration/keeper_secrets_manager_cli/tests/secret_test.py @@ -802,6 +802,99 @@ def test_generate_password(self): print(result.stdout) tf.close() + def test_template_record_types(self): + + """Test the template record type command + """ + + # Get a list of record types, display to terminal + runner = CliRunner() + results = runner.invoke(cli, ['secret', 'template', 'record', '-l'], catch_exceptions=False) + output = results.output + self.assertRegex(output, r'Record Type', 'Did not find the table title') + self.assertRegex(output, r'login', 'found the login record type') + + # Get a record type as JSON and write it to a file + with tempfile.NamedTemporaryFile() as tf: + runner = CliRunner() + runner.invoke(cli, ['secret', 'template', 'record', '-f', tf.name, 'login'], catch_exceptions=False) + tf.seek(0) + schema = json.loads(tf.read().decode()) + print(schema) + self.assertEqual("v3", schema.get("version"), "did not get the correct version") + self.assertEqual("KeeperRecord", schema.get("kind"), "did not get the correct kind") + self.assertIsInstance(schema.get("data"), list, "data is not a list") + + data = schema.get("data")[0] + + self.assertEqual("login", data.get("recordType"), "record type is not login") + self.assertIsNotNone(data.get("title"), "title was None") + self.assertIsNotNone(data.get("notes"), "title was None") + + self.assertIsInstance(data.get("fields"), list, "fields is not a list") + + field = data.get("fields")[0] + self.assertEqual("login", field.get("type"), "field type is not login") + self.assertIsNotNone(field.get("value"), "value was None") + + tf.close() + + def test_template_field_types(self): + + """Test the template field type command + """ + + # Get a list of record types, display to terminal + runner = CliRunner() + results = runner.invoke(cli, ['secret', 'template', 'field', '-l'], catch_exceptions=False) + output = results.output + self.assertRegex(output, r'Field Type', 'Did not find the table title') + self.assertRegex(output, r'accountNumber', 'found the accountNumber field type') + + # Get a record type as JSON and write it to a file + runner = CliRunner() + results = runner.invoke(cli, ['secret', 'template', 'field', 'securityQuestion'], + catch_exceptions=False) + output = results.output + schema = json.loads(output) + + self.assertEqual("securityQuestion", schema.get("type"), "field type is not securityQuestion") + self.assertIsNotNone(schema.get("value"), "value was None") + + def test_add_record_via_field(self): + + mock_config = MockConfig.make_config() + secrets_manager = SecretsManager(config=InMemoryKeyValueStorage(mock_config)) + + profile_init_res = mock.Response() + profile_init_res.add_folder(uid="FAKEUID") + profile_init_res.add_record(title="Profile Init") + + queue = mock.ResponseQueue(client=secrets_manager) + queue.add_response(profile_init_res) + queue.add_response(profile_init_res) + queue.add_response(profile_init_res) + + with patch('keeper_secrets_manager_cli.KeeperCli.get_client') \ + as mock_client: + mock_client.return_value = secrets_manager + + Profile.init(token='MY_TOKEN') + + runner = CliRunner() + results = runner.invoke(cli, ['secret', 'add', 'field', + '--sf', 'FAKEUID', + '--rt', 'login', + '--title', 'My Title', + '-p', + 'login=jsmith', + 'url=http://localhost' + ], catch_exceptions=False) + output = results.output + # stderr and stdout are merged + output_line = output.split('\n') + self.assertRegex(output_line[1], r'^[\w_-]{22}$', "did not get back a record uid") + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/core/keeper_secrets_manager_core/mock.py b/sdk/python/core/keeper_secrets_manager_core/mock.py index c066b9ca..cf8a61ba 100644 --- a/sdk/python/core/keeper_secrets_manager_core/mock.py +++ b/sdk/python/core/keeper_secrets_manager_core/mock.py @@ -14,6 +14,7 @@ import time from collections import deque from requests import Response as RequestResponse +from .keeper_globals import keeper_public_keys import string import random @@ -444,7 +445,7 @@ def dump(self, secret, flags=None): class MockConfig: @staticmethod - def make_config(skip_list=None, token=None, app_key=None): + def make_config(skip_list=None, token=None, app_key=None, owner_key = None): if skip_list is None: skip_list = [] @@ -468,6 +469,10 @@ def make_config(skip_list=None, token=None, app_key=None): app_key = base64.b64encode(random_app_key.encode()).decode() sm_config.set(ConfigKeys.KEY_APP_KEY, app_key) + if owner_key is None: + owner_key = keeper_public_keys["7"] + sm_config.set(ConfigKeys.KEY_OWNER_PUBLIC_KEY, owner_key) + config = {} for key in ConfigKeys: if key.value in skip_list: diff --git a/sdk/python/helper/LICENSE b/sdk/python/helper/LICENSE new file mode 100644 index 00000000..e69de29b diff --git a/sdk/python/helper/MANIFEST.in b/sdk/python/helper/MANIFEST.in new file mode 100644 index 00000000..1720b230 --- /dev/null +++ b/sdk/python/helper/MANIFEST.in @@ -0,0 +1 @@ +include keeper_secrets_manager_helper/v3/default_record_types.yml diff --git a/sdk/python/helper/README.md b/sdk/python/helper/README.md new file mode 100644 index 00000000..24ea941a --- /dev/null +++ b/sdk/python/helper/README.md @@ -0,0 +1,5 @@ +# Keeper Secrets Manager Helper + +The Keeper Secrets Manager helper for creating and managing records. To be used with keeper-secrets-manager-core. + +For more information see our official documentation page https://docs.keeper.io/secrets-manager/secrets-manager diff --git a/sdk/python/helper/keeper_secrets_manager_helper/__init__.py b/sdk/python/helper/keeper_secrets_manager_helper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sdk/python/helper/keeper_secrets_manager_helper/common.py b/sdk/python/helper/keeper_secrets_manager_helper/common.py new file mode 100644 index 00000000..71ff065c --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/common.py @@ -0,0 +1,30 @@ +from .exception import FileSyntaxException +import yaml +import json +import re +import os + + +def load_file(file): + is_json = re.search("json$", file, re.IGNORECASE) is not None + if os.path.exists(file) is False: + raise Exception(f"Cannot find the file {file}") + with open(file, 'r') as fh: + if is_json is True: + try: + record_data = json.loads(fh.read()) + except json.JSONDecodeError as err: + raise FileSyntaxException(f"The JSON had problems: {err.msg} around row " + f"{err.lineno}, column {err.colno}") + else: + try: + record_data = yaml.load(fh.read(), Loader=yaml.BaseLoader) + except yaml.YAMLError as err: + if hasattr(err, 'problem_mark'): + mark = err.problem_mark + raise FileSyntaxException(f"The YAML has problems around row " + f"{mark.line + 1}, column {mark.column + 1}.") + raise FileSyntaxException("The YAML has problems.") + return record_data + + diff --git a/sdk/python/helper/keeper_secrets_manager_helper/exception.py b/sdk/python/helper/keeper_secrets_manager_helper/exception.py new file mode 100644 index 00000000..58b66b32 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/exception.py @@ -0,0 +1,3 @@ + +class FileSyntaxException(Exception): + pass diff --git a/sdk/python/helper/keeper_secrets_manager_helper/field.py b/sdk/python/helper/keeper_secrets_manager_helper/field.py new file mode 100644 index 00000000..0cafce97 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/field.py @@ -0,0 +1,94 @@ +from enum import Enum +import json + + +class FieldSectionEnum(Enum): + STANDARD = "f" + CUSTOM = "c" + + +class Field: + + complete_key = "_complete" + + def __init__(self, **kwargs): + + self.initial_value_was_json = None + + # If an array of values is passed in, then the field is complete. + self.is_complete = False + + self.type = kwargs.pop("type", None) + self.field_section = kwargs.pop("field_section", None) + self.label = kwargs.pop("label", None) + self.allow_multiple = kwargs.pop("allow_multiple", None) + self._value = None + self.value = kwargs.pop("value", None) + + value_key = kwargs.pop("value_key", None) + if value_key is not None and isinstance(self.value, dict) is False: + self.initial_value_was_json = False + self.value = { + value_key: self.value + } + self.value_key = value_key + + # Default at the way way way end. + self.index = kwargs.pop("index", 1_000_000) + + self.group_key = kwargs.pop("group_key", None) + + self.extra = kwargs + + @property + def value(self): + return self._value + + @value.setter + def value(self, val): + if val is not None: + # If the value JSON? Decode it if it is. + if isinstance(val, str) is True: + try: + val = json.loads(val) + + # If the initial value was JSON, then flag it was and mark field as complete. + if self.initial_value_was_json is None: + if isinstance(val, list) is True: + self.is_complete = True + # Is complete, no need to flag any dictionary as comelete + elif isinstance(val, dict) is True: + val[Field.complete_key] = True + self.initial_value_was_json = True + except json.JSONDecodeError: + self.initial_value_was_json = False + if val == "": + val = None + self._value = val + + def __str__(self): + return f'Field(type={self.type}, field_section={self.field_section}, label={self.label}, '\ + f'value_key={self.value_key}, value={self.value}, index={self.index}, extra={self.extra}, '\ + f'group_key={self.group_key}, initial_value_was_json={self.initial_value_was_json}' + + def add_extra(self, key, value): + if hasattr(self, key) is False: + self.extra[key] = value + + @staticmethod + def field_key(field_type, label): + return "{}/{}".format(field_type, label) + + def instance_field_key(self, label=None): + return self.field_key(self.type, label if self.label is None else self.label) + + def can_add_key_value(self): + return isinstance(self.value, dict) is True or self.group_key is not None + + def to_dict(self): + data = { + "type": self.type, + "label": self.label, + "value": self.value, + } + return {**data, **self.extra} diff --git a/sdk/python/helper/keeper_secrets_manager_helper/field_type.py b/sdk/python/helper/keeper_secrets_manager_helper/field_type.py new file mode 100644 index 00000000..d8e42039 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/field_type.py @@ -0,0 +1,20 @@ +from importlib import import_module + + +class FieldType: + + default_version = "v3" + + @staticmethod + def get_field_type_list(version=None): + if version is None: + version = FieldType.default_version + mod = import_module(f"keeper_secrets_manager_helper.{version}.field_type") + return getattr(mod, "get_field_type_list")() + + @staticmethod + def get_field_type_schema(field_type, version=None): + if version is None: + version = FieldType.default_version + mod = import_module(f"keeper_secrets_manager_helper.{version}.field_type") + return getattr(mod, "get_field_type_schema")(field_type) diff --git a/sdk/python/helper/keeper_secrets_manager_helper/format.py b/sdk/python/helper/keeper_secrets_manager_helper/format.py new file mode 100644 index 00000000..bfc6c765 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/format.py @@ -0,0 +1,20 @@ +import iso8601 +from datetime import datetime + + +def date_to_ms(value): + + try: + # Check if already an integer value + try: + value = int(value) + # Else try to parse the string into a date + except ValueError as _: + dt = iso8601.parse_date(value) + # Set the epoch timestamp to have the same timezone as the parsed value + epoch = datetime.utcfromtimestamp(0).replace(tzinfo=dt.tzinfo) + value = int((dt - epoch).total_seconds() * 1000.0) + except iso8601.iso8601.ParseError as err: + raise ValueError("Cannot format date/time as milliseconds: {}".format(err)) + + return value diff --git a/sdk/python/helper/keeper_secrets_manager_helper/record.py b/sdk/python/helper/keeper_secrets_manager_helper/record.py new file mode 100644 index 00000000..d4cc3021 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/record.py @@ -0,0 +1,42 @@ +from .common import load_file +from importlib import import_module + + +class Record: + + @staticmethod + def create_from_file(file, password_generate=False): + record_data = load_file(file) + + if record_data.get("kind") != "KeeperRecord": + raise ValueError(".kind is not 'KeeperRecord'") + + version = record_data.get("version") + if version is None or version == "": + raise ValueError(".version is missing or blank") + + mod = import_module(f"keeper_secrets_manager_helper.{version}.record") + return getattr(mod, "Record").create_from_data(record_data, password_generate=password_generate) + + def __init__(self, version): + + self.version = version + + try: + self.record_mod = import_module(f"keeper_secrets_manager_helper.{self.version}.record") + self.parser_mod = import_module(f"keeper_secrets_manager_helper.{self.version}.parser") + self.record_type_mod = import_module(f"keeper_secrets_manager_helper.{self.version}.record_type") + except ImportError as err: + raise Exception(f"Version {self.version} is not supported: " + str(err)) + + def create_from_field_args(self, **kwargs): + kwargs["fields"] = getattr(self.parser_mod, "Parser")().parse_field(kwargs.get("field_args")) + return [getattr(self.record_mod, "Record")(**kwargs)] + + def get_template(self, record_type, output_format, title=None, notes=None): + get_class_by_type = getattr(self.record_type_mod, "get_class_by_type") + record_type = get_class_by_type(record_type)() + return record_type.generate_template(output_format=output_format, title=title, notes=notes) + + def get_template_list(self): + return getattr(self.record_type_mod, "get_record_type_list")() diff --git a/sdk/python/helper/keeper_secrets_manager_helper/record_type.py b/sdk/python/helper/keeper_secrets_manager_helper/record_type.py new file mode 100644 index 00000000..171b029e --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/record_type.py @@ -0,0 +1,50 @@ +from .common import load_file +from importlib import import_module +import os +import re + + +class RecordType: + + default_version = "v3" + + @staticmethod + def load_record_types(file, version=None): + data = load_file(file) + + data_test = data + if isinstance(data, list) is True: + data_test = data[0] + + # Check if a structured file + if data_test.get("kind") is not None: + return RecordType.load_helper_record_types(data) + else: + if data_test.get("recordTypeId") is not None: + return RecordType.load_commander_record_types(data, version) + else: + raise Exception("Cannot determine the type of record type file.") + + @staticmethod + def load_helper_record_types(data): + version = data.get("version") + mod = import_module(f"keeper_secrets_manager_helper.{version}.record_type") + return getattr(mod, "load_record_type_from_data")(data) + + @staticmethod + def load_commander_record_types(data, version=None): + if version is None: + version = RecordType.default_version + mod = import_module(f"keeper_secrets_manager_helper.{version}.record_type") + return getattr(mod, "load_commander_record_type_from_data")(data) + + @staticmethod + def find_and_load_record_type_schema_files(path): + + for root, dirs, files in os.walk(path): + for file in files: + if re.search("(json|ya*ml)$", file, re.IGNORECASE) is not None: + try: + RecordType.load_record_types(os.path.join(root, file)) + except (Exception,): + pass diff --git a/sdk/python/helper/keeper_secrets_manager_helper/v3/__init__.py b/sdk/python/helper/keeper_secrets_manager_helper/v3/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sdk/python/helper/keeper_secrets_manager_helper/v3/default_record_types.yml b/sdk/python/helper/keeper_secrets_manager_helper/v3/default_record_types.yml new file mode 100644 index 00000000..af29d66e --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/v3/default_record_types.yml @@ -0,0 +1,152 @@ +--- +version: v3 +kind: KeeperRecordType +data: + - class: Login + name: login + fields: + - type: login + - type: password + - type: url + - type: fileRef + - type: oneTimeCode + - class: BankAccount + name: bankAccount + fields: + - type: bankAccount + required: true + - type: name + - type: login + - type: password + - type: url + - type: cardRef + - type: fileRef + allow_multiple: True + - type: oneTimeCode + - class: Address + name: address + fields: + - type: address + - type: fileRef + - class: BankCard + name: bankCard + fields: + - type: paymentCard + - type: text + label: "Cardholder Name" + - type: pinCode + - type: addressRef + - type: fileRef + - class: BirthCertificate + name: birthCertificate + fields: + - type: name + - type: birthDate + - type: fileRef + - class: Contact + name: contact + fields: + - type: name + required: true + - type: text + label: "Company" + - type: email + - type: phone + - type: addressRef + - type: fileRef + - class: DriverLicense + name: driverLicense + fields: + - type: accountNumber + label: "Driver's License Number" + - type: name + - type: birthDate + - type: addressRef + - type: expirationDate + - type: fileRef + - class: EncryptedNotes + name: encryptedNotes + fields: + - type: note + - type: date + - type: fileRef + - class: File + name: file + fields: + - type: fileRef + - class: HealthInsurance + name: healthInsurance + fields: + - type: accountNumber + - type: name + label: "Insured's Name" + - type: login + - type: password + - type: url + - type: fileRef + - type: securityQuestion + - class: Membership + name: membership + fields: + - type: accountNumber + - type: name + - type: password + - type: fileRef + - type: securityQuestion + - class: Passport + name: passport + fields: + - type: accountNumber + label: "Passport Number" + - type: name + - type: birthDate + - type: addressRef + - type: expirationDate + - type: date + label: "Date Issued" + - type: password + - type: fileRef + - class: Photo + name: photo + fields: + - type: fileRef + - class: ServerCredentials + name: serverCredentials + fields: + - type: host + - type: login + - type: password + - type: fileRef + - class: SoftwareLicense + name: softwareLicense + fields: + - type: licenseNumber + - type: expirationDate + - type: date + label: "Date Active" + - type: fileRef + - type: securityQuestion + - class: SsnCard + name: ssnCard + fields: + - type: accountNumber + label: "Identity Number" + - type: name + - type: fileRef + - class: SshKeys + name: sshKeys + fields: + - type: login + - type: keyPair + - type: password + - type: host + - type: fileRef + - class: DatabaseCredentials + name: databaseCredentials + fields: + - type: text + label: "type" + - type: host + - type: login + - type: password + - type: fileRef \ No newline at end of file diff --git a/sdk/python/helper/keeper_secrets_manager_helper/v3/enum.py b/sdk/python/helper/keeper_secrets_manager_helper/v3/enum.py new file mode 100644 index 00000000..3ae81d0b --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/v3/enum.py @@ -0,0 +1,299 @@ +import enum + + +class BaseEnum(enum.Enum): + + @classmethod + def enum_exists(cls, value): + for e in cls: + if e == value or (type(value) is str and e.value.lower() == value.lower()): + return True + return False + + @classmethod + def get_value(cls, value): + for e in cls: + if e == value or (type(value) is str and e.value.lower() == value.lower()): + return e.value + raise ValueError(f"The value {value} is valid choice.") + + @classmethod + def build_example(cls): + values = [] + for e in cls: + values.append(e.value) + return " | ".join(sorted(values)) + + +class PhoneTypeEnum(BaseEnum): + MOBILE = "Mobile" + HOME = "Home" + WORK = "Work" + + +class AccountTypeEnum(BaseEnum): + CHECKING = "Checking" + SAVINGS = "Savings" + OTHER = "Other" + + +# from iso3166 import countries +# for c in countries: +# print("\t{} = \"{}\"".format(c.alpha2, c.alpha2)) + +# Was trying to dynamically create this enum, however too much work and it might not match +# the list on the Vault UI. So just list, so we can add missing one or remove ones that don't match +# the Vault + + +class CountryEnum(BaseEnum): + AF = "AF" + AX = "AX" + AL = "AL" + DZ = "DZ" + AS = "AS" + AD = "AD" + AO = "AO" + AI = "AI" + AQ = "AQ" + AG = "AG" + AR = "AR" + AM = "AM" + AW = "AW" + AU = "AU" + AT = "AT" + AZ = "AZ" + BS = "BS" + BH = "BH" + BD = "BD" + BB = "BB" + BY = "BY" + BE = "BE" + BZ = "BZ" + BJ = "BJ" + BM = "BM" + BT = "BT" + BO = "BO" + BQ = "BQ" + BA = "BA" + BW = "BW" + BV = "BV" + BR = "BR" + IO = "IO" + BN = "BN" + BG = "BG" + BF = "BF" + BI = "BI" + KH = "KH" + CM = "CM" + CA = "CA" + CV = "CV" + KY = "KY" + CF = "CF" + TD = "TD" + CL = "CL" + CN = "CN" + CX = "CX" + CC = "CC" + CO = "CO" + KM = "KM" + CG = "CG" + CD = "CD" + CK = "CK" + CR = "CR" + CI = "CI" + HR = "HR" + CU = "CU" + CW = "CW" + CY = "CY" + CZ = "CZ" + DK = "DK" + DJ = "DJ" + DM = "DM" + DO = "DO" + EC = "EC" + EG = "EG" + SV = "SV" + GQ = "GQ" + ER = "ER" + EE = "EE" + ET = "ET" + FK = "FK" + FO = "FO" + FJ = "FJ" + FI = "FI" + FR = "FR" + GF = "GF" + PF = "PF" + TF = "TF" + GA = "GA" + GM = "GM" + GE = "GE" + DE = "DE" + GH = "GH" + GI = "GI" + GR = "GR" + GL = "GL" + GD = "GD" + GP = "GP" + GU = "GU" + GT = "GT" + GG = "GG" + GN = "GN" + GW = "GW" + GY = "GY" + HT = "HT" + HM = "HM" + VA = "VA" + HN = "HN" + HK = "HK" + HU = "HU" + IS = "IS" + IN = "IN" + ID = "ID" + IR = "IR" + IQ = "IQ" + IE = "IE" + IM = "IM" + IL = "IL" + IT = "IT" + JM = "JM" + JP = "JP" + JE = "JE" + JO = "JO" + KZ = "KZ" + KE = "KE" + KI = "KI" + KP = "KP" + KR = "KR" + XK = "XK" + KW = "KW" + KG = "KG" + LA = "LA" + LV = "LV" + LB = "LB" + LS = "LS" + LR = "LR" + LY = "LY" + LI = "LI" + LT = "LT" + LU = "LU" + MO = "MO" + MK = "MK" + MG = "MG" + MW = "MW" + MY = "MY" + MV = "MV" + ML = "ML" + MT = "MT" + MH = "MH" + MQ = "MQ" + MR = "MR" + MU = "MU" + YT = "YT" + MX = "MX" + FM = "FM" + MD = "MD" + MC = "MC" + MN = "MN" + ME = "ME" + MS = "MS" + MA = "MA" + MZ = "MZ" + MM = "MM" + NA = "NA" + NR = "NR" + NP = "NP" + NL = "NL" + NC = "NC" + NZ = "NZ" + NI = "NI" + NE = "NE" + NG = "NG" + NU = "NU" + NF = "NF" + MP = "MP" + NO = "NO" + OM = "OM" + PK = "PK" + PW = "PW" + PS = "PS" + PA = "PA" + PG = "PG" + PY = "PY" + PE = "PE" + PH = "PH" + PN = "PN" + PL = "PL" + PT = "PT" + PR = "PR" + QA = "QA" + RE = "RE" + RO = "RO" + RU = "RU" + RW = "RW" + BL = "BL" + SH = "SH" + KN = "KN" + LC = "LC" + MF = "MF" + PM = "PM" + VC = "VC" + WS = "WS" + SM = "SM" + ST = "ST" + SA = "SA" + SN = "SN" + RS = "RS" + SC = "SC" + SL = "SL" + SG = "SG" + SX = "SX" + SK = "SK" + SI = "SI" + SB = "SB" + SO = "SO" + ZA = "ZA" + GS = "GS" + SS = "SS" + ES = "ES" + LK = "LK" + SD = "SD" + SR = "SR" + SJ = "SJ" + SZ = "SZ" + SE = "SE" + CH = "CH" + SY = "SY" + TW = "TW" + TJ = "TJ" + TZ = "TZ" + TH = "TH" + TL = "TL" + TG = "TG" + TK = "TK" + TO = "TO" + TT = "TT" + TN = "TN" + TR = "TR" + TM = "TM" + TC = "TC" + TV = "TV" + UG = "UG" + UA = "UA" + AE = "AE" + GB = "GB" + US = "US" + UM = "UM" + UY = "UY" + UZ = "UZ" + VU = "VU" + VE = "VE" + VN = "VN" + VG = "VG" + VI = "VI" + WF = "WF" + EH = "EH" + YE = "YE" + ZM = "ZM" + ZW = "ZW" diff --git a/sdk/python/helper/keeper_secrets_manager_helper/v3/field_type.py b/sdk/python/helper/keeper_secrets_manager_helper/v3/field_type.py new file mode 100644 index 00000000..83e4d007 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/v3/field_type.py @@ -0,0 +1,620 @@ +from keeper_secrets_manager_helper.v3.enum import BaseEnum, PhoneTypeEnum, CountryEnum, AccountTypeEnum +import keeper_secrets_manager_helper.format +import re +import json +from importlib import import_module +import inspect +import sys + + +UID_REGEX = r'^[a-zA-Z0-9\-_]{22}$' + +field_map = {} + + +# Find all the field type classes and create a map from the the camelcase name to the FieldType class +def get_field_type_map(): + global field_map + if len(field_map) == 0: + for item in inspect.getmembers(sys.modules[__name__], inspect.isclass): + mod_class = getattr(sys.modules[__name__], item[0]) + if mod_class is not None: + field_type_class = getattr(sys.modules[__name__], "FieldType") + if issubclass(mod_class, field_type_class) is True: + class_name = getattr(mod_class, "name") + if class_name is None: + continue + field_map[class_name] = mod_class + field_map = field_map + + return field_map + + +def get_class_by_type(class_name): + get_field_type_map() + if class_name in field_map: + return field_map[class_name] + raise ImportError("Field type class {} does not exists.".format(class_name)) + + +def get_field_type_list(): + return list(get_field_type_map().keys()) + + +def get_field_type_schema(field_type): + get_field_type_map() + + def _expand_value_type(schema): + + allow_multiple = schema.get("allow_multiple", False) + value_type = schema.get("value_type") + # If the record doesn't set allow_multiple to True/False, allow the field to set the value. + if issubclass(value_type, FieldType) is True: + new_schema = value_type.schema + return _expand_value_type(new_schema) + elif issubclass(value_type, BaseEnum) is True: + value = "<#ADD: " + value_type.build_example() + ">" + return value + elif issubclass(value_type, dict): + value_block = {} + for key, info in schema.get("schema").items(): + value_block[key] = _expand_value_type(info) + return value_block + else: + value = "<#ADD: " + schema.get("desc", "Insert a {}".format(value_type.__name__)) + ">" + if allow_multiple is True: + value = [value] + return value + + data = { + "type": field_type + } + + if field_type not in field_map: + raise ValueError("Field type '{}' does not exists.".format(field_type)) + field_type_obj = field_map[field_type] + field_schema = field_type_obj.schema + data["value"] = _expand_value_type(field_schema) + data["privacyScreen"] = field_schema.get("privacy_screen", False) + field_type_obj.add_template_specifics(data) + + return data + + +class PasswordComplexity: + + def __init__(self, value=None, length=64, caps=0, lowercase=0, digits=0, special=0): + + # If a dictionary is passed in, use that get the attributes. + if value is not None and type(value) is dict: + length = value.get("length", length) + caps = value.get("caps", caps) + lowercase = value.get("lowercase", lowercase) + digits = value.get("digits", digits) + special = value.get("special", special) + + self.length = length + self.caps = caps + self.lowercase = lowercase + self.digits = digits + self.special = special + + def to_dict(self): + + return { + "length": self.length, + "caps": self.caps, + "lowercase": self.lowercase, + "digits": self.digits, + "special": self.special + } + + def generate_password(self): + try: + mod = import_module("keeper_secrets_manager_core.utils") + password = getattr(mod, "generate_password")( + length=self.length, + lowercase=self.lowercase, + uppercase=self.caps, + digits=self.digits, + special_characters=self.special + ) + except ImportError as _: + raise Exception("Cannot generate a random password. Requires keeper-secrets-manager-core module.") + + return password + + +class FieldType: + schema = {"value_type": str} + name = None + + # In a field that allows multiple FieldType, is there a "primary key" that makes a dictionary entry unique. + # This use for Phones, which can many Phone entries, and if we need to figure out what is unique. + group_key = None + + # Are multiple values allowed + allow_multiple = False + + def __init__(self, *args, **kwargs): + self._value = None + + # If the value is passed in, set it/overwrite the value in kwargs. + if len(args) > 0: + kwargs["value"] = args[0] + + # Get and remove the common keys + value = kwargs.pop("value", None) + label = kwargs.pop("label", None) + required = kwargs.pop("required", None) + privacy_screen = kwargs.pop("privacy_screen", None) + + # This will validate and set the value. If the value passed in is invalid, a ValueError exception is thrown. + if value is not None: + self.value = value + + self.label = label + self.required = required + self.privacy_screen = privacy_screen + + schema = self.get_schema() + if schema.get("value_type") is dict: + + # Default to the passed in args for attribute variables. However if the value args exists, then use that + # to get the attribute values. + attr_dict = kwargs + if self.value is not None: + attr_dict = self.value[0] + + # This will create and set the attribute variables. + for key in schema.get("schema", {}): + setattr(self, key, attr_dict.get(key, None)) + + def __str__(self): + return f'{self.__class__.__name__}(label={self.label}, value={self.value}, required={self.required}, '\ + f'privacy_screen={self.privacy_screen})' + + @property + def value(self): + return self._value + + @value.setter + def value(self, val): + schema = self.get_schema() + value_type = schema.get("value_type") + + if isinstance(val, list) is False: + val = [val] + + # If the value of the field is another FieldType, convert raw value into an instance of that FieldType. + if issubclass(value_type, FieldType) is True: + for index in range(len(val)): + if isinstance(val[index], value_type) is False: + # Create an instance, pass in value as the value and let the FieldType + val[index] = value_type(val[index]) + + if self._is_valid(val) is True: + self._value = val + + def add_value(self, new_value): + val = self.value + if val is None: + val = [] + val.append(new_value) + self.value = val + + def get_schema(self): + schema = FieldType.schema + if hasattr(self.__class__, "schema") is True: + schema = self.__class__.schema + + return schema + + def get_type(self): + return self.__class__.name + + @staticmethod + def is_value_valid(value, schema, field_class): + validator = schema.get("validate") + value_type = schema.get("value_type") + + # The field value contains a dictionary of values. + if value_type is dict: + if type(value) is not dict: + raise ValueError("The value is not a dictionary.") + dict_schema = schema.get("schema") + for key, info in dict_schema.items(): + # If the value is required, then make sure it exists and is not blank. + if info.get("required", False) is True and (value.get(key) is None or value.get(key) == ""): + raise ValueError(f"The value key '{key}' is missing and it is required.") + + FieldType.is_value_valid(value.get(key), dict_schema.get(key), field_class) + + # The field value has an enumeration. Make sure the value is a valid value of the enumeration. + elif value is not None and issubclass(value_type, BaseEnum): + + if value_type.enum_exists(value) is False: + raise ValueError(f"The value of '{value}' for '{field_class}' is not valid.") + + # If the field value is a string, then it doesn't matter if an int, float, etc is passed in. However + # we can do some validation on the value, so treat as string. + elif value_type is str: + if value is not None and validator is not None: + value = str(value) + validator_params = schema.get("validate_params", re.IGNORECASE) + if re.match(str(validator), value, validator_params) is None: + raise ValueError(f"The value for' {field_class}' is not valid.") + + # Check if the value type is another FieldType, check the field type + elif value is not None and isinstance(value_type, FieldType) is True: + value_type.is_value_valid(value, schema, field_class) + + def _is_valid(self, value): + schema = self.get_schema() + for item in value: + self.is_value_valid(item, schema, self.__class__.__name__) + return True + + def build_value(self, schema, value=None): + + value_type = schema.get("value_type") + + # If the value is an instance of FieldType + if issubclass(value_type, FieldType) is True: + return value.build_value(value_type.schema, None) + + # If the value type is an enumeration, get the value of the enumeration. The actual value may be + # the enum or possible value. Find the right enum and get that enums value. + elif issubclass(value_type, BaseEnum) is True: + if value is None: + return None + return value_type.get_value(value) + + # If the value is a dictionary, then we are getting the values from the instance's attributes. + elif value_type is dict: + + new_schema = schema.get("schema") + value_dict = {} + # Get the key and that's keys schema. + for key, info in new_schema.items(): + # Get the value from the instance's attribute. + dict_value = self.build_value(info, getattr(self, key)) + if dict_value is not None: + value_dict[key] = dict_value + return value_dict + + # At this point, we are a simple data type (99.999% we are str). + else: + if value is not None: + if schema.get("format") is not None: + # This will take the value from the attribute and format, and validate it, into the desired + # format + value = self.format_value( + format_type=schema.get("format"), + value=value + ) + return value + + def to_dict(self): + schema = self.get_schema() + + # Add the field camel case name of the class. + field_dict = { + "type": self.get_type() + } + # Add additional properties only if they are not blank. This is based on Vault UI behavior. + if self.label is not None and self.label != "": + field_dict["label"] = self.label + if self.required is not None: + field_dict["required"] = self.required + if self.privacy_screen is not None: + field_dict["privacyScreen"] = self.privacy_screen + + new_values = [] + + # self.value is a list of values. We need to check them all. + values = self.value + if values is None: + values = [] + + # If the value is a dictionary, there is no value in self.value since the value comes from the + # attributes. We need to fake vales, so set it one item of None. It won't be used, but a for loop will need + # it. + if schema.get("value_type") is dict: + values = [None] + + # Build add the values. + for item in values: + new_value = self.build_value(schema, item) + self.is_value_valid(new_value, schema, self.__class__.__name__) + new_values.append(new_value) + + field_dict["value"] = new_values + + return field_dict + + def to_json(self): + return json.dumps(self.to_dict()) + + @staticmethod + def format_value(format_type, value): + if hasattr(keeper_secrets_manager_helper.format, format_type) is True: + return getattr(keeper_secrets_manager_helper.format, format_type)(value) + raise ValueError("Could not find formatter {}".format(format_type)) + + @staticmethod + def add_template_specifics(data): + pass + +# ------------------------------------------------------------------------------------------------------------------- + + +class Text(FieldType): + name = "text" + + +class Url(FieldType): + name = "url" + + +class PinCode(FieldType): + name = "pinCode" + + +class Multiline(FieldType): + name = "multiline" + + +class FileRef(FieldType): + name = "fileRef" + # The validation checks to see if value is a Record UID. + schema = {"value_type": str, "validate": UID_REGEX, "desc": "Record UID of File record."} + + +class Email(FieldType): + name = "email" + + +class Phone(FieldType): + name = "phoneItem" + schema = { + "value_type": dict, + "schema": { + "region": {"value_type": str, "desc": "Region"}, + "number": {"value_type": str, "desc": "Number"}, + "ext": {"value_type": str, "desc": "Extension"}, + "type": {"value_type": PhoneTypeEnum} + } + } + + +class Phones(FieldType): + name = "phone" + group_key = "type" + allow_multiple = True + schema = {"value_type": Phone} + + +class Name(FieldType): + name = "name" + schema = { + "value_type": dict, + "schema": { + "first": {"value_type": str, "desc": "First Name"}, + "middle": {"value_type": str, "desc": "Middle name"}, + "last": {"value_type": str, "desc": "Last Name"} + } + } + + +class Address(FieldType): + name = "address" + schema = { + "value_type": dict, + "schema": { + "street1": {"value_type": str, "desc": "Street"}, + "street2": {"value_type": str, "desc": "Street 2"}, + "city": {"value_type": str, "desc": "City"}, + "zip": {"value_type": str, "desc": "Zip/Postal Code"}, + "country": {"value_type": CountryEnum, "desc": "ISO3166 Alpha-2 Country Code"}, + } + } + + +class AddressRef(FieldType): + name = "addressRef" + # The validation checks to see if value is a Record UID. + schema = {"value_type": str, "validate": UID_REGEX, "desc": "Record UID for Address record."} + + +class AccountNumber(FieldType): + name = "accountNumber" + + +class Login(FieldType): + name = "login" + + +class HiddenField(FieldType): + name = "secret" + + +class Password(FieldType): + name = "password" + schema = {"value_type": str, "desc": "Password or Remove If Generating"} + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._enforce_generation = None + self._complexity = None + + self.enforce_generation = kwargs.pop("enforce_generation", None) + self.complexity = kwargs.pop("complexity", None) + self.password_generate = kwargs.pop("password_generate", None) + + def __str__(self): + return f'{self.__class__.__name__}(label={self.label}, value={self.value}, required={self.required}, '\ + f'privacy_screen={self.privacy_screen}, enforce_generation={self.enforce_generation}' + + @property + def enforce_generation(self): + return self._enforce_generation + + @enforce_generation.setter + def enforce_generation(self, value): + if value is not None and type(value) is not bool: + raise ValueError("enforce_generation needs to be a boolean value.") + self._enforce_generation = value + + @property + def complexity(self): + return self._complexity + + @complexity.setter + def complexity(self, value): + if value is not None: + if type(value) is dict: + value = PasswordComplexity(value) + if isinstance(value, PasswordComplexity) is not True: + raise ValueError("complexity needs to be a PasswordComplexity instance.") + self._complexity = value + + @staticmethod + def add_template_specifics(data): + data["enforceGeneration"] = False + data["complexity"] = PasswordComplexity().to_dict() + + def to_dict(self): + + # If enforceGeneration is enabled in the record type or password_generate is True and the password has not + # been set, then generate a password, ignore the value that is already there. + if self.enforce_generation is True or (self.value is None and self.password_generate is True): + self.generate_password() + + field_dict = super().to_dict() + if self.enforce_generation is not None: + field_dict["enforceGeneration"] = self.enforce_generation + if self.complexity is not None: + field_dict["complexity"] = self.complexity.to_dict() + return field_dict + + def generate_password(self): + if self.complexity is None: + self.complexity = PasswordComplexity() + self.value = [self.complexity.generate_password()] + + +class SecurityQuestions(FieldType): + name = "securityQuestion" + schema = { + "value_type": dict, + "schema": { + "question": {"value_type": str, "desc": "Security Question"}, + "answer": {"value_type": str, "desc": "Answer To The Question"} + } + } + + +class OneTimePassword(FieldType): + name = "otp" + schema = {"value_type": str, "validate": r'^otpauth://', "desc": "URL starting with otpauth://"} + + +class OneTimeCode(FieldType): + name = "oneTimeCode" + schema = {"value_type": str, "validate": r'^otpauth://', "desc": "URL starting with otpauth://"} + + +class CardRef(FieldType): + name = "cardRef" + # The validation checks to see if value is a Record UID. + schema = {"value_type": str, "validate": UID_REGEX, "desc": "Record UID of PaymentCard record."} + + +class PaymentCard(FieldType): + name = "paymentCardItem" + schema = { + "value_type": dict, + "schema": { + "cardNumber": {"value_type": str, "desc": "Card Number"}, + "cardExpirationDate": {"value_type": str, "validate": r'^\d{2}\/\d{4}$', + "desc": "Expiration Date as MM/YYYY"}, + "cardSecurityCode": {"value_type": str} + } + } + + +class PaymentCards(FieldType): + name = "paymentCard" + schema = {"value_type": PaymentCard} + + +class Date(FieldType): + name = "date" + schema = {"value_type": str, "format": "date_to_ms", "desc": "Date in ISO8601 Format or Epoch Milliseconds"} + + +class BirthDate(FieldType): + name = "birthDate" + schema = {"value_type": str, "format": "date_to_ms", "desc": "Birth Date in ISO8601 Format or Epoch Milliseconds"} + + +class ExpirationDate(FieldType): + name = "expirationDate" + schema = {"value_type": str, "format": "date_to_ms", + "desc": "Expiration Date in ISO8601 Format or Epoch Milliseconds"} + + +class BankAccount(FieldType): + name = "bankAccountItem" + schema = { + "value_type": dict, + "schema": { + "accountType": {"value_type": AccountTypeEnum}, + "otherType": {"value_type": str, "desc": "Other Type Description"}, + "routingNumber": {"value_type": str, "desc": "Routing Number"}, + "accountNumber": {"value_type": str, "desc": "Account Number"}, + } + } + + +class BankAccounts(FieldType): + name = "bankAccount" + schema = {"value_type": BankAccount} + + +class KeyPair(FieldType): + name = "keyPair" + schema = { + "value_type": dict, + "schema": { + "publicKey": {"value_type": str, "desc": "Public Key"}, + "privateKey": {"value_type": str, "desc": "Private Key. Normally a PEM file."}, + } + } + + +class Host(FieldType): + name = "host" + schema = { + "value_type": dict, + "schema": { + "hostName": {"value_type": str, "desc": "Hostname or IP"}, + "port": {"value_type": str, "desc": "Port"}, + } + } + +# AppFiller? + + +class LicenseNumber(FieldType): + name = "licenseNumber" + schema = {"value_type": str, "desc": "License Number"} + + +# privateKey? + +class SecureNote(FieldType): + name = "note" + schema = {"value_type": str, "desc": "Secret Note"} diff --git a/sdk/python/helper/keeper_secrets_manager_helper/v3/parser.py b/sdk/python/helper/keeper_secrets_manager_helper/v3/parser.py new file mode 100644 index 00000000..d7a6cf96 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/v3/parser.py @@ -0,0 +1,150 @@ +from keeper_secrets_manager_helper.v3.field_type import FieldType, get_field_type_map +from keeper_secrets_manager_helper.field import Field, FieldSectionEnum +import re + + +class Parser: + + def __init__(self): + # Find all the field type classes and create a map from the the camelcase name to the FieldType class + self.field_map = get_field_type_map() + + def parse_field(self, field_args): + + field_objs = [] + + # Might look like a list, but it could be a set, convert it + if isinstance(field_args, set) is True: + field_args = list(field_args) + + if isinstance(field_args, list) is False: + field_args = [field_args] + + for arg in field_args: + + # First check if the field section is defined in the arg. The default is the standard fields, or 'f' + field_section = FieldSectionEnum.STANDARD + field_section_match = re.match(r'^(?P.)\.', arg) + if field_section_match is not None: + field_section_value = field_section_match.group('field_section') + if field_section_value == "f": + field_section = FieldSectionEnum.STANDARD + elif field_section_value == "c": + field_section = FieldSectionEnum.CUSTOM + else: + raise ValueError(f"Field section can only be 'f' or 'c'. The value '{field_section_value}' is not" + "not valid.") + # Remove the f. or c. from the arg + arg = arg[2:] + + # At this point a camelcase field type name should be at the front of the arg, followed by a "[" or "=" + field_type_match = re.match(r'^(?P\w+)[.\[=]', arg) + if field_type_match is None: + raise ValueError("Cannot find the field type name.") + field_type = field_type_match.group("field_type") + if field_type not in self.field_map: + raise ValueError("Field type '{}' does not exists.".format(field_type)) + + # This key designating what would make a complete field record unique. It used in grouping data into + # a field record. It's like a primary key. See the Phones/Phone field type. + group_key = self.field_map[field_type].group_key + allow_multiple = self.field_map[field_type].allow_multiple + + # Remove the field type from the arg + arg = arg[len(field_type):] + + field_label = None + value_key = None + + while True: + next_char = arg[0] + arg = arg[1:] + + # Is the next thing a label for the field. + if next_char == "[": + + # Labels are weird. We need to character nibble this because the label contains a ] + # ie f.phone[\[BRACKETS\]]=... label = [BRACKETS] + # So we need to handle [, ], \ with special care. + index = 0 + found_end = False + escape_mode = False + buffer = "" + while index < len(arg): + c = arg[index] + index += 1 + # If we get the end ] and we are not in escape mode, we are done + if c == "]" and escape_mode is False: + found_end = True + break + # If we get a \ and we are not in escape mode, then turn on escape mode and disregard this char. + # This basically prevent use completing if we get a ] + elif c == "\\" and escape_mode is False: + escape_mode = True + else: + buffer += c + # Turn off escape mode since we got a character. + if escape_mode is True: + escape_mode = False + + if found_end is False: + raise ValueError("Could not find the end of the label.") + + if buffer != "": + field_label = buffer + + # The + 1 is the end ] + arg = arg[index:] + + # If the next thing is a sub value of the value (ie street1 of Address) + elif next_char == ".": + + # If we have already gotten the sub value, then there is an error in the argument being parsed. + if value_key is not None: + raise ValueError(f"The value key '{value_key}' has already been found.") + + # Get the text left of the =. Including the [ too. + sub_value_match = re.match(r'^(?P[\w_]+)[\[=.]', arg) + if sub_value_match is not None: + value_key = sub_value_match.group("sub_value_key") + schema = self.field_map[field_type].schema + value_type = schema.get("value_type") + + # The value type might be another FieldType, get that FieldType's schema and value type + if issubclass(value_type, FieldType): + schema = value_type.schema + value_type = schema.get("value_type") + + # If the value type is not a dictionary then we couldn't be using a value key. Throw + # an exception. + if value_type is not dict: + raise ValueError("The field type '{}' does not have value keys. " + "Cannot set the value key {}.".format(field_type, value_key)) + + # If the value key is not in the dictionary, throw an exception. + schema = schema.get("schema") + if value_key not in schema: + raise ValueError("The field type '{}' does not have the value key '{}'. ".format( + field_type, value_key)) + + # The value key was found, use it. + arg = arg[len(value_key):] + + # Else we have the value + else: + value = arg + break + + field_objs.append( + Field( + field_section=field_section, + type=field_type, + label=field_label, + value_key=value_key, + group_key=group_key, + value=value, + allow_multiple=allow_multiple + ) + ) + + return field_objs diff --git a/sdk/python/helper/keeper_secrets_manager_helper/v3/record.py b/sdk/python/helper/keeper_secrets_manager_helper/v3/record.py new file mode 100644 index 00000000..a474ade9 --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/v3/record.py @@ -0,0 +1,367 @@ +from keeper_secrets_manager_helper.field import Field, FieldSectionEnum +from keeper_secrets_manager_helper.common import load_file +from keeper_secrets_manager_helper.v3.record_type import get_class_by_type as get_record_type_class +from keeper_secrets_manager_helper.v3.field_type import get_class_by_type as get_field_type_class +from importlib import import_module + + +class Record: + + @staticmethod + def create_from_file(file, password_generate=False): + record_data = load_file(file) + return Record.create_from_data(record_data, password_generate=password_generate) + + @staticmethod + def create_from_data(record_data, password_generate=False): + + records = [] + + if record_data.get("version") != "v3": + raise ValueError(".version is not 'v3'") + if record_data.get("kind") != "KeeperRecord": + raise ValueError(".kind is not 'KeeperRecord'") + data = record_data.get("data") + if data is None: + raise ValueError(".data[] is missing") + if isinstance(data, list) is False: + raise ValueError(".data[] is not an array") + + record_count = 0 + for record_item in data: + record_type = record_item.get("recordType", record_item.get("record_type")) + if record_type is None or record_type == "": + raise ValueError(f".data[{record_count}].recordType is missing or blank") + title = record_item.get("title") + if title is None or title == "": + raise ValueError(f".data[{record_count}].title is missing or blank") + + record = Record( + record_type=record_type, + title=title, + notes=record_item.get("notes"), + password_generate=password_generate + ) + + all_fields = [] + + fields = record_item.get("fields") + if fields is None: + raise ValueError(f".data[{record_count}].fields[] is missing") + if isinstance(fields, list) is False: + raise ValueError(f".data[{record_count}].fields[] is not an array") + + for field_item in fields: + field = Field( + type=field_item.get("type"), + field_section=FieldSectionEnum.STANDARD, + label=field_item.get("label"), + value=field_item.get("value"), + ) + all_fields.append(field) + + custom_fields = record_item.get("customFields", record_item.get("custom_fields")) + if custom_fields is not None: + if isinstance(custom_fields, list) is False: + raise ValueError(f".data[{record_count}].fields[] is not an array") + + for field_item in custom_fields: + field = Field( + type=field_item.get("type"), + field_section=FieldSectionEnum.CUSTOM, + label=field_item.get("label"), + value=field_item.get("value"), + ) + all_fields.append(field) + + record.add_fields(all_fields) + record.build_record() + records.append(record) + + return records + + def __init__(self, *args, **kwargs): + + # If there is an arg, then assume it's a dictionary with record data. + if len(args) > 0: + pass + + self.record_type = kwargs.get("record_type") + self.title = kwargs.get("title") + self.notes = kwargs.get("notes") + self.fields = [] + self.custom_fields = [] + + if self.record_type is None or self.record_type == "": + raise ValueError("record_type is missing or blank.") + + try: + record_type = get_record_type_class(self.record_type)() + + # Make a quick lookup for the standard fields. + self._valid_fields = [{"type": x.get("type"), "label": x.get("label"), "has_value": False} + for x in record_type.get_standard_fields()] + except ImportError as err: + raise ValueError(err) + + if self.title is None or self.title == "": + raise ValueError("title is missing or blank.") + + # The fields are mapped here in an attempt to make unique fields. + self._fields = { + FieldSectionEnum.STANDARD: {}, + FieldSectionEnum.CUSTOM: {} + } + + self.password_generate = kwargs.get("password_generate", False) + + self.valid_fields = [] + + # All the fields (standard/custom) to be passed in with the constructor. + fields = kwargs.get("fields") + if fields is not None: + self.add_fields(fields) + self.build_record() + + def _add_new_field(self, field, field_key, group_key): + # Count the number of keys in the dictionary and use that for an index. That will be used determine + # the order. + field.index = len(self._fields[field.field_section]) + + # If the group key is not None, then convert the value to an array. + if group_key is not None and isinstance(field.value, list) is False: + field.value = [field.value] + + self._fields[field.field_section][field_key] = field + + def _is_valid_standard_field(self, field_type): + for item in self._valid_fields: + if item.get("type") == field_type and item.get("has_value") is False: + return True + return False + + def _flag_standard_field_used(self, field_type): + for item in self._valid_fields: + if item.get("type") == field_type and item.get("has_value") is False: + item["has_value"] = True + break + + def _get_label_for_standard_field(self, field_type): + for item in self._valid_fields: + if item.get("type") == field_type and item.get("has_value") is False: + return item.get("label") + return None + + def add_fields(self, fields): + if isinstance(fields, list) is False: + fields = [fields] + + for field in fields: + + if isinstance(field, Field) is False: + raise ValueError("The method add_field requires instance(s) of Field") + + # + label = None + if field.field_section == FieldSectionEnum.STANDARD: + label = self._get_label_for_standard_field(field.type) + + field_key = field.instance_field_key(label=label) + group_key = field.group_key + + # Does this key already exists? And can we add values to the dictionary value? + if field_key in self._fields[field.field_section] and field.can_add_key_value(): + + # If out value is a string we should not be in here. + if isinstance(field.value, str) is True: + raise ValueError(f"The {field.type} is a string. If JSON check to see if JSON is valid.") + + # Get the existing field and copy any values in it's dictionary into the existing. + existing_field = self._fields[field.field_section][field_key] + + # If the field is completely set + if existing_field.is_complete is True and existing_field.field_section == FieldSectionEnum.STANDARD: + raise ValueError("Attempting to set a standard field that has already been set.") + + # The existing field is complete and a custom field, so add + if existing_field.is_complete is True: + raise ValueError("Cannot add this field due to it not being unique. To make unique add a label to " + "the field or make sure the label is not being duplicated.") + + # If the existing_field is JSON and the current field is JSON, then add to existing. This allows + # the value to be set with multiple objects. + if existing_field.initial_value_was_json and field.initial_value_was_json: + if isinstance(existing_field.value, dict) is True: + existing_field.value = [existing_field.value] + if isinstance(field.value, list) is True: + for item in field.value: + existing_field.value.append(item) + else: + existing_field.value.append(field.value) + continue + + for k, v in field.value.items(): + + # If tke group key is set. The value can be multiple dictionaries that have a specific key + # which indicates its uniqueness. If that key does not exist, values can be inserted into the + # last dictionary in the list. If does exists, then a new dictionary is created. + if group_key is not None: + found_a_place = False + for item in existing_field.value: + if group_key not in item and item.get(Field.complete_key) is not True: + item[k] = v + found_a_place = True + else: + item[Field.complete_key] = True + if found_a_place is False and isinstance(existing_field.value, list) is True: + new_object = {k: v} + existing_field.value.append(new_object) + elif isinstance(existing_field.value, dict) is True: + existing_field.value[k] = v + + # Else we are creating a new entry. + else: + # Standard fields are defined. Don't insert a field that doesn't belong. + if field.field_section == FieldSectionEnum.STANDARD: + if self._is_valid_standard_field(field.type): + self._flag_standard_field_used(field.type) + else: + raise ValueError(f"The standard fields do not have a '{field.type}' " + "field type or they all have values.") + + self._add_new_field(field, field_key, group_key) + + @staticmethod + def _copy_record_type_settings(field_obj, standard_field): + # Copy extra values from the record type schema to the field. These are unique field type params like + # required, enforce_generation and complexity. + for key, value in standard_field.items(): + field_obj.add_extra(key, value) + + def _get_standard_fields(self, record_type): + + # Add the standard fields in the order defined by record type schema. + + fields_list = [] + # Get a list of standard fields in the Record Type. + for standard_field in record_type.get_standard_fields(): + # First check if we have a key with a label, if it exists, and then use that. + field_key = Field.field_key(standard_field.get("type"), standard_field.get("label")) + if field_key in self._fields[FieldSectionEnum.STANDARD]: + field_obj = self._fields[FieldSectionEnum.STANDARD][field_key] + self._copy_record_type_settings(field_obj, standard_field) + fields_list.append(field_obj) + else: + # Find the field by it's field type. + field_key = Field.field_key(standard_field.get("type"), None) + if field_key in self._fields[FieldSectionEnum.STANDARD]: + field_obj = self._fields[FieldSectionEnum.STANDARD][field_key] + self._copy_record_type_settings(field_obj, standard_field) + fields_list.append(field_obj) + else: + # If nothing exists, make an empty field for the field type + field_obj = Field( + type=standard_field.get("type"), + field_section=FieldSectionEnum.STANDARD, + value=None + ) + self._copy_record_type_settings(field_obj, standard_field) + fields_list.append(field_obj) + + return fields_list + + def _get_custom_fields(self): + + def get_index_key(obj): + return obj.index + + # Add the custom fields in the order they were added. + fields_list = [self._fields[FieldSectionEnum.CUSTOM][x] for x in self._fields[FieldSectionEnum.CUSTOM]] + fields_list.sort(key=get_index_key) + return fields_list + + @staticmethod + def _remove_private_keys(obj): + """ + The value might contain dictionaries what contain private key. This will remove any that exists. Right + now it's just one. + """ + if isinstance(obj, list): + for item in obj: + Record._remove_private_keys(item) + elif isinstance(obj, dict): + obj.pop(Field.complete_key, None) + + def build_record(self): + + record_type = get_record_type_class(self.record_type)() + + # Take all the standard fields from the user's input and populate the field type to validate it. Then + # the dictionary used in the V3 records for a field to the list. + self.fields = [] + for field in self._get_standard_fields(record_type): + field_type_kwargs = field.to_dict() + self._remove_private_keys(field_type_kwargs.get("value")) + field_type_kwargs["password_generate"] = self.password_generate + field_type_obj = get_field_type_class(field.type)(**field_type_kwargs) + self.fields.append(field_type_obj.to_dict()) + + # Do the same with the custom fields. + self.custom_fields = [] + for field in self._get_custom_fields(): + field_type_kwargs = field.to_dict() + self._remove_private_keys(field_type_kwargs.get("value")) + field_type_kwargs["password_generate"] = self.password_generate + field_type_obj = get_field_type_class(field.type)(**field_type_kwargs) + self.custom_fields.append(field_type_obj.to_dict()) + + def get_record_create_obj(self): + try: + # Make sure the classes we need are in the KSM Python SDK. + mod = import_module("keeper_secrets_manager_core.dto.dtos") + if hasattr(mod, "RecordCreate") is False: + raise ImportError("Cannot find the RecordCreate in the KSM Python SDK. Please update the SDK.") + record_field_class = getattr(mod, "RecordField") + if record_field_class is None: + raise ImportError("Cannot find the RecordField in the KSM Python SDK. Please update the SDK.") + + # Make an instance of the SDK's RecordCreate + new_record = getattr(mod, "RecordCreate")( + record_type=self.record_type, + title=self.title + ) + + # Add the standard fields thru RecordField constructor + record_field = [] + for field in self.fields: + + # Translate dictionary to RecordField + field["field_type"] = field.pop("type") + + # V3 does take complexity or enforceGeneration + field.pop("complexity", None) + field.pop("enforceGeneration", None) + + record_field.append(record_field_class(**field)) + new_record.fields = record_field + + # Add the custom fields thru RecordField constructor + record_field = [] + for field in self.custom_fields: + # Translate dictionary to RecordField + field["field_type"] = field.pop("type") + + # V3 does take complexity or enforceGeneration + field.pop("complexity", None) + field.pop("enforceGeneration", None) + + record_field.append(record_field_class(**field)) + new_record.custom = record_field + + # Add the notes + new_record.notes = self.notes + + except ImportError as _: + raise Exception("Cannot build a CreateRecord instance. Cannot find the KSM Python SDK.") + + return new_record diff --git a/sdk/python/helper/keeper_secrets_manager_helper/v3/record_type.py b/sdk/python/helper/keeper_secrets_manager_helper/v3/record_type.py new file mode 100644 index 00000000..4d66987a --- /dev/null +++ b/sdk/python/helper/keeper_secrets_manager_helper/v3/record_type.py @@ -0,0 +1,209 @@ +from keeper_secrets_manager_helper.v3.field_type import FieldType, get_field_type_map +from keeper_secrets_manager_helper.v3.enum import BaseEnum +from keeper_secrets_manager_helper.common import load_file +import os +import inspect +import yaml +import json +import re + + +# Make a base class for all our record types +class RecordType: + + def __init__(self): + self.field_type_map = get_field_type_map() + + # self.schema and self.name works, however the dynamic class generation will add it. To make PyCharm happy, + # get the schema and name using getattr. + + def get_schema(self): + return getattr(self, "schema") + + def get_name(self): + return getattr(self, "name") + + def get_standard_fields(self): + return self.get_schema().get("fields", []) + + def get_custom_fields(self): + return self.get_schema().get("custom_fields", []) + + def _expand_value_type(self, schema, allow_multiple=None, is_required=False): + + value_type = schema.get("value_type") + # If the record doesn't set allow_multiple to True/False, allow the field to set the value. + if allow_multiple is None: + allow_multiple = schema.get("allow_multiple", False) + if issubclass(value_type, FieldType) is True: + new_schema = value_type.schema + return self._expand_value_type(new_schema, is_required=is_required) + elif issubclass(value_type, BaseEnum) is True: + value = "<#ADD: " + value_type.build_example() + ">" + return value + elif issubclass(value_type, dict): + value_block = {} + for key, info in schema.get("schema").items(): + value_block[key] = self._expand_value_type(info, is_required=is_required) + return value_block + else: + value = "<#ADD: " + schema.get("desc", "Insert a {}".format(value_type.__name__)) + ">" + if allow_multiple is True: + value = [value] + return value + + def generate_template_dict(self, title=None, notes=None): + fields = [] + + for field in self.get_schema().get("fields"): + + field_type = field.get("type") + + data = { + "type": field_type + } + + if field_type not in self.field_type_map: + raise ValueError("Field type '{}' does not exists.".format(field_type)) + field_type_obj = self.field_type_map[field_type] + field_schema = field_type_obj.schema + data["value"] = self._expand_value_type(field_schema, + allow_multiple=field.get("allow_multiple", None), + is_required=field.get("required", False)) + data["privacyScreen"] = field_schema.get("privacy_screen", False) + field_type_obj.add_template_specifics(data) + fields.append(data) + + template = { + "version": "v3", + "kind": "KeeperRecord", + "data": [{ + "recordType": self.get_name(), + "title": title if title is not None else "<#ADD: The title of record here. This is required.>", + "notes": notes if notes is not None else "<#ADD: Add some notes or remove.>", + "fields": fields, + }] + } + + return template + + @staticmethod + def generate_yaml_template(template_dict): + return yaml.dump(template_dict, sort_keys=False) + + @staticmethod + def generate_json_template(template_dict): + return json.dumps(template_dict, indent=4, sort_keys=False) + + def generate_template(self, output_format, title=None, notes=None): + template_dict = self.generate_template_dict(title=title, notes=notes) + if output_format == "json": + return self.generate_json_template(template_dict) + return self.generate_yaml_template(template_dict) + + +# This maps a record type/name to a type class +class_map_by_type = {} + + +def load_record_type_from_file(file): + load_record_type_from_data(load_file(file)) + + +def load_record_type_from_data(record_types): + + for item in record_types.get("data", []): + + # Make sure we haven't loaded this class already. Cannot overwrite classes. + if item.get("name") in class_map_by_type: + raise ValueError("Cannot overwrite class {}".format(item.get("class"))) + record_type_class = type(item.get("class"), (RecordType,), { + "name": item.get("name"), + "schema": { + "fields": item.get("fields", []) + } + }) + globals()[record_type_class.__name__] = record_type_class + class_map_by_type[item.get("name")] = record_type_class + + +# We are going to make some classes based off a YAML file. +default_record_type_file = "default_record_types.yml" +module_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +load_record_type_from_file(os.path.join(module_dir, default_record_type_file)) + + +def get_class_by_type(class_name): + if class_name in class_map_by_type: + return class_map_by_type[class_name] + raise ImportError("Record type class {} is not loaded.".format(class_name)) + + +def get_record_type_list(): + return class_map_by_type.keys() + + +def make_class_name(name): + name = name.lower() + name = re.sub(r'[^a-zA-Z0-9\s]', '', name) + name_parts = re.split(' +', name) + name = "" + for item in name_parts: + if item is None or item == "": + continue + name += item[0].upper() + item[1:] + return name + + +def camel_case_split(value): + matches = re.finditer('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)', value) + return "_".join([m.group(0) for m in matches]).lower() + + +def load_commander_record_type_from_file(file): + load_commander_record_type_from_data(load_file(file)) + + +def load_commander_record_type_from_data(data): + + if isinstance(data, dict) is True: + data = [data] + + record_schema = { + "data": [] + } + + index = 0 + for item in data: + if item.get("recordTypeId") is None: + raise ValueError(f"Missing recordTypeId for record index {index}. Is this an export of a Keeper " + f"Commander record type information.") + if item.get("content") is None: + raise ValueError(f"Missing content for record index {index}. Is this an export of a Keeper " + f"Commander record type information.") + + content = json.loads(item.get("content")) + + name = content.get('$id') + class_name = make_class_name(name) + + record_data = { + "class": class_name, + "name": name, + "fields": [] + } + + for field in content.get("fields"): + field_data = { + "type": field.get("$ref") + } + for param in ["label", "required", "privacyScreen", "enforceGeneration", "privacyScreen", "complexity"]: + value = field.get(param) + if value is not None: + field_data[camel_case_split(param)] = value + + record_data["fields"].append(field_data) + + record_schema["data"].append(record_data) + + load_record_type_from_data(record_schema) diff --git a/sdk/python/helper/requirements.txt b/sdk/python/helper/requirements.txt new file mode 100644 index 00000000..2a2c75f3 --- /dev/null +++ b/sdk/python/helper/requirements.txt @@ -0,0 +1,3 @@ +keeper-secrets-manager-core>=16.2.2 +pyyaml +iso8601 \ No newline at end of file diff --git a/sdk/python/helper/setup.py b/sdk/python/helper/setup.py new file mode 100644 index 00000000..6e6d1d1b --- /dev/null +++ b/sdk/python/helper/setup.py @@ -0,0 +1,52 @@ +from setuptools import setup, find_packages +import os + +here = os.path.abspath(os.path.dirname(__file__)) + +# Get the long description from the README.md file +with open(os.path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = f.read() + +install_requires = [ + 'keeper-secrets-manager-core>=16.2.2', + 'pyyaml', + 'iso8601' +] + +setup( + name="keeper-secrets-manager-helper", + version="1.0.1", + description="Keeper Secrets Manager SDK helper for managing records.", + long_description=long_description, + long_description_content_type="text/markdown", + author="Keeper Security", + author_email="ops@keepersecurity.com", + url="https://github.com/Keeper-Security/secrets-manager", + license="MIT", + keywords="Keeper Password Secrets Manager Helper Record", + packages=find_packages(exclude=["tests", "tests.*"]), + zip_safe=False, + package_data={}, + include_package_data=True, + install_requires=install_requires, + python_requires='>=3.6', + project_urls={ + "Bug Tracker": "https://github.com/Keeper-Security/secrets-manager/issues", + "Documentation": "https://app.gitbook.com/" + "@keeper-security/s/secrets-manager/secrets-manager", + "Source Code": "https://github.com/Keeper-Security/secrets-manager", + }, + classifiers=[ + "Development Status :: 1 - Planning", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Topic :: Security", + ] +) diff --git a/sdk/python/helper/tests/record_file_test.py b/sdk/python/helper/tests/record_file_test.py new file mode 100644 index 00000000..80878c89 --- /dev/null +++ b/sdk/python/helper/tests/record_file_test.py @@ -0,0 +1,125 @@ +import unittest +from keeper_secrets_manager_helper.record import Record +from keeper_secrets_manager_helper.exception import FileSyntaxException +import yaml +import json +import tempfile + + +class RecordFileTest(unittest.TestCase): + + @staticmethod + def _make_record_data(): + + return { + "version": "v3", + "kind": "KeeperRecord", + "data": [ + { + "recordType": "bankAccount", + "title": "Bank Account", + "fields": [ + { + "type": "bankAccount", + "value": { + "accountType": "Checking", + "routingNumber": "ROUTING", + "accountNumber": "ACCOUNT" + } + }, + { + "type": "name", + "value": { + "first": "John", + "last": "Smith" + } + }, + { + "type": "login", + "value": "my_login" + } + ] + }, + { + "recordType": "login", + "title": "Login", + "fields": [ + { + "type": "login", + "value": "my_login" + } + ] + } + ] + } + + def test_load_template_files(self): + + data = self._make_record_data() + + with tempfile.NamedTemporaryFile("w", suffix=".yaml") as fh: + fh.write(yaml.dump(data)) + fh.seek(0) + records = Record.create_from_file(fh.name) + self.assertEqual(2, len(records), "did not get 2 records") + fh.close() + + with tempfile.NamedTemporaryFile("w", suffix=".json") as fh: + fh.write(json.dumps(data)) + fh.seek(0) + records = Record.create_from_file(fh.name) + self.assertEqual(2, len(records), "did not get 2 records") + fh.close() + + def test_bad_yaml_template_file(self): + + # Bad spot is the tab + bad_yaml = """ + version: v3 + kind: KeeperRecord + data: + - recordType: Login + \ttitle: My Title + fields: + - type: login + """ + + try: + with tempfile.NamedTemporaryFile("w", suffix=".yaml") as fh: + fh.write(bad_yaml) + fh.seek(0) + Record.create_from_file(fh.name) + fh.close() + except FileSyntaxException as err: + self.assertRegex(str(err), r'The YAML has problems around row 6, column 9') + except Exception as err: + self.fail("Got an exception: " + str(err)) + + def test_bad_json_template_file(self): + + # Bad spot is missing quote for title key + bad_json = """ + { + "version": "v3", + "kind": "KeeperRecord", + "data": [{ + "recordType": "Login", + title": "My Title", + "fields": [{ + "type": "login" + }] + }] + } + """ + + try: + with tempfile.NamedTemporaryFile("w", suffix=".json") as fh: + fh.write(bad_json) + fh.seek(0) + Record.create_from_file(fh.name) + fh.close() + except FileSyntaxException as err: + self.assertRegex(str(err), r'The JSON had problems: Expecting property name enclosed in double quotes' + r' around row 7, column 13') + except Exception as err: + self.fail("Got an exception: " + str(err)) diff --git a/sdk/python/helper/tests/record_test.py b/sdk/python/helper/tests/record_test.py new file mode 100644 index 00000000..78e41322 --- /dev/null +++ b/sdk/python/helper/tests/record_test.py @@ -0,0 +1,47 @@ +import unittest +from keeper_secrets_manager_helper.record import Record +from keeper_secrets_manager_core.dto.dtos import RecordCreate + + +class RecordTest(unittest.TestCase): + + def test_build_record_simple(self): + + """Test the ingress in the helper create record + + Instead of calling the version modules directory, this will call them. Basically one entry + point for all versions. + + """ + + contact_data = [ + "f.name.first=John", + "f.name.last=Smith", + "f.text=ACME", + "f.email=admin@localhost", + 'f.phone={"number": "5552223333", "type": "Work"}', + 'f.phone={"number": "5551111111", "type": "Home"}', + 'f.addressRef=PpR0AKIZAtUiyvq1r2BC1w', + 'c.password=' + ] + + kwargs = dict( + record_type='contact', + title="Contact Record", + notes="My Note", + field_args=contact_data, + password_generate=True + ) + + r = Record(version="v3").create_from_field_args(**kwargs) + record_create_obj = r[0].get_record_create_obj() + self.assertIsInstance(record_create_obj, RecordCreate) + self.assertEqual("Contact Record", record_create_obj.title) + self.assertEqual("My Note", record_create_obj.notes) + self.assertListEqual(record_create_obj.fields[0].value, [{'first': 'John', 'last': 'Smith'}]) + self.assertListEqual(record_create_obj.fields[1].value, ["ACME"]) + self.assertListEqual(record_create_obj.fields[2].value, ["admin@localhost"]) + self.assertListEqual(record_create_obj.fields[3].value, [{"number": "5552223333", "type": "Work"}, + {"number": "5551111111", "type": "Home"}]) + self.assertListEqual(record_create_obj.fields[4].value, ["PpR0AKIZAtUiyvq1r2BC1w"]) + self.assertNotEqual(0, len(record_create_obj.custom[0].value)) diff --git a/sdk/python/helper/tests/v3/v3_field_type_all_fields_test.py b/sdk/python/helper/tests/v3/v3_field_type_all_fields_test.py new file mode 100644 index 00000000..dfce627c --- /dev/null +++ b/sdk/python/helper/tests/v3/v3_field_type_all_fields_test.py @@ -0,0 +1,309 @@ +import unittest +from keeper_secrets_manager_helper.v3.field_type import * +from keeper_secrets_manager_helper.v3.enum import CountryEnum, AccountTypeEnum + + +class FieldTypeTest(unittest.TestCase): + + def _check_dict(self, field_type, value=None, label=None, extra_params=None, check_value=True): + field_dict = field_type.to_dict() + self.assertEqual(field_type.name, field_dict.get("type"), "type is not correct") + if check_value is True: + if isinstance(value, dict) is True: + self.assertDictEqual(field_dict.get("value")[0], value, "value (dict) is not correct") + elif isinstance(value, list) is True: + self.assertListEqual(field_dict.get("value"), value, "value (list) is not correct") + else: + self.assertEqual(field_dict.get("value")[0], value, "value (str) is not correct") + else: + self.assertIsNotNone(field_dict.get("value"), "value doesn't exist") + if label is not None: + self.assertEqual(label, field_dict.get("label"), "label is not correct") + if extra_params is not None: + for k, v in extra_params.items(): + self.assertEqual(v, field_dict.get(k), f"{k} is not correct") + + def test_text(self): + # Set as str + text = Text("My Text", label="MY LABEL") + self._check_dict(text, value="My Text", label="MY LABEL") + + def test_url(self): + # Set using attributes + url = Url() + url.value = "http://localhost" + self._check_dict(url, value="http://localhost") + + def test_pin_code(self): + pc = PinCode() + pc.value = "111111" + self._check_dict(pc, value="111111") + + def test_multiline(self): + # Set as array value + ml = Multiline(["this\nhas\ntext"]) + self._check_dict(ml, value="this\nhas\ntext") + + def test_file_ref(self): + f = FileRef() + f.value = "OlLZ6JLjnyMOS3CiIPHBjw" + self._check_dict(f, value="OlLZ6JLjnyMOS3CiIPHBjw") + + def test_email(self): + # Set as array value + e = Email("smith@localhost") + self._check_dict(e, value="smith@localhost") + + def test_phone(self): + + # Set Phone via attributes + p = Phone() + p.number = "5555551234" + p.ext = "7777" + p.type = "Mobile" + p.region = "US" + self._check_dict(p, value={"number": "5555551234", "ext": "7777", "type": "Mobile", "region": "US"}) + + # Set phone via constructor args + p = Phone(number="123456", type="Work", region="US", ext="1234") + self.assertEqual("123456", p.number, "Phone number is not correct") + self.assertEqual("Work", p.type, "Phone type is not correct") + self.assertEqual("US", p.region, "Phone region is not correct") + self.assertEqual("1234", p.ext, "Phone region is not correct") + self._check_dict(p, value={"number": "123456", "ext": "1234", "type": "Work", "region": "US"}) + + # Set Phone via constructor value + p = Phone({"number": "1234567890", "type": "Home"}) + self.assertEqual("1234567890", p.number, "Phone number is not correct") + self.assertEqual("Home", p.type, "Phone type is not correct") + self._check_dict(p, value={"number": "1234567890", "type": "Home"}) + + # Test bad enum via constructor + try: + Phone({"number": "1234567890", "type": "Bad"}) + raise Exception("Should have failed due to bad Enum") + except ValueError as _: + pass + except Exception as err: + self.fail(str(err)) + + def test_phones(self): + p1 = Phone() + p1.number = "5555551234" + + p2 = Phone() + p2.number = "6666661234" + + # Add value via add_value method. Appends a value. + ps = Phones() + ps.add_value(p1) + ps.add_value(p2) + self._check_dict(ps, value=[{"number": "5555551234"}, {"number": "6666661234"}]) + + # Set value via constructor + ps = Phones([p2, p1]) + self._check_dict(ps, value=[{"number": "6666661234"}, {"number": "5555551234"}]) + + ps = Phones([ + {"number": "1234567890", "type": "Home"}, + {"number": "5555555555", "type": "Work", "region": "US"} + ]) + self._check_dict(ps, value=[ + {"number": "1234567890", "type": "Home"}, + {"number": "5555555555", "type": "Work", "region": "US"} + ]) + + def test_name(self): + # Set as array value + n = Name({"first": "John", "middle": "X", "last": "Doe"}, label="A LABEL") + self._check_dict(n, value={"first": "John", "middle": "X", "last": "Doe"}, label="A LABEL") + + def test_address(self): + a = Address() + a.street1 = "North Main Street" + a.street2 = "Apt B" + a.city = "Gotham" + a.zip = "11111-2222" + a.country = "CA" + self._check_dict(a, value={ + "street1": "North Main Street", + "street2": "Apt B", + "city": "Gotham", + "zip": "11111-2222", + "country": "CA" + }) + + def test_address_ref(self): + a = AddressRef() + a.value = "OlLZ6JLjnyMOS3CiIPHBjw" + self._check_dict(a, value="OlLZ6JLjnyMOS3CiIPHBjw") + + def test_account_number(self): + a = Email("111111") + self._check_dict(a, value="111111") + + def test_login(self): + ml = Login("my_login") + self._check_dict(ml, value="my_login") + + def test_hidden_field(self): + ft = HiddenField("HIDDEN") + self._check_dict(ft, value="HIDDEN") + + def test_password(self): + + p = Password("MY PASSWORD") + self._check_dict(p, value="MY PASSWORD") + + p = Password() + p.enforce_generation = True + self._check_dict(p, check_value=False, extra_params={ + "enforceGeneration": True, + "complexity": PasswordComplexity().to_dict() + }) + + def test_security_question(self): + ft = SecurityQuestions() + ft.question = "Question" + ft.answer = "Answer" + self._check_dict(ft, {"question": "Question", "answer": "Answer"}) + + def test_one_time_password(self): + ft = OneTimePassword("otpauth://localhost") + self._check_dict(ft, value="otpauth://localhost") + + def test_one_time_code(self): + ft = OneTimeCode("otpauth://localhost") + self._check_dict(ft, value="otpauth://localhost") + + def test_card_ref(self): + ft = CardRef() + ft.value = "OlLZ6JLjnyMOS3CiIPHBjw" + self._check_dict(ft, value="OlLZ6JLjnyMOS3CiIPHBjw") + + def test_payment_card(self): + + ft = PaymentCard() + ft.cardNumber = "5555 5555 5555 5555" + ft.cardExpirationDate = "01/2007" + ft.cardSecurityCode = "555" + self._check_dict(ft, value={"cardNumber": "5555 5555 5555 5555", "cardExpirationDate": "01/2007", + "cardSecurityCode": "555"}) + + # Test bad field format + try: + ft = PaymentCard() + ft.cardNumber = "5555 5555 5555 5555" + ft.cardExpirationDate = "BAD" + ft.to_dict() + raise Exception("Should have failed due to bad cardExpirationDate format") + except ValueError as _: + pass + except Exception as err: + self.fail(str(err)) + + def test_payment_cards(self): + + pc = PaymentCard() + pc.cardNumber = "5555 5555 5555 5555" + pc.cardExpirationDate = "01/2007" + pc.cardSecurityCode = "555" + + pcs = PaymentCards(pc) + self._check_dict(pcs, value={"cardNumber": "5555 5555 5555 5555", "cardExpirationDate": "01/2007", + "cardSecurityCode": "555"}) + + pcs = PaymentCards([{"cardNumber": "5555 5555 5555 5555"}]) + self._check_dict(pcs, value={"cardNumber": "5555 5555 5555 5555"}) + + def test_date(self): + + d = Date("2021-07-01 12:00:00") + self._check_dict(d, value=1625140800000) + + d = Date("2021-07-01T12:34:56.1234+06:00") + self._check_dict(d, value=1625142896123) + + d = Date(1625140800000) + self._check_dict(d, value=1625140800000) + + d = Date(["1625140800000"]) + self._check_dict(d, value=1625140800000) + + def test_birth_date(self): + + d = BirthDate("2021-07-01 12:00:00") + self._check_dict(d, value=1625140800000) + + d = Date("2021-07-01T12:34:56.1234+06:00") + self._check_dict(d, value=1625142896123) + + d = Date(1625140800000) + self._check_dict(d, value=1625140800000) + + d = Date(["1625140800000"]) + self._check_dict(d, value=1625140800000) + + def test_expiration_date(self): + + d = ExpirationDate("2021-07-01 12:00:00") + self._check_dict(d, value=1625140800000) + + d = Date("2021-07-01T12:34:56.1234+06:00") + self._check_dict(d, value=1625142896123) + + d = Date(1625140800000) + self._check_dict(d, value=1625140800000) + + d = Date(["1625140800000"]) + self._check_dict(d, value=1625140800000) + + def test_bank_account(self): + + b = BankAccount() + b.accountType = AccountTypeEnum.CHECKING + b.routingNumber = "12345" + b.accountNumber = "ABCDE" + self._check_dict(b, value={"accountType": "Checking", "routingNumber": "12345", "accountNumber": "ABCDE"}) + + # Test bad field format + try: + b = BankAccount() + b.accountType = "BAD" + b.routingNumber = "12345" + b.accountNumber = "ABCDE" + b.to_dict() + raise Exception("Should have failed due to bad enum for account type") + except ValueError as _: + pass + except Exception as err: + self.fail(str(err)) + + def test_bank_accounts(self): + + b = BankAccount({"accountType": "SAVINGS", "routingNumber": "12345", "accountNumber": "ABCDE"}) + ba = BankAccounts() + ba.add_value(b) + self._check_dict(b, value={"accountType": "Savings", "routingNumber": "12345", "accountNumber": "ABCDE"}) + + def test_key_pair(self): + + ft = KeyPair() + ft.publicKey = "PUBLIC KEY" + ft.privateKey = "PRIVATE KEY" + self._check_dict(ft, value={"publicKey": "PUBLIC KEY", "privateKey": "PRIVATE KEY"}) + + def test_host(self): + ft = Host() + ft.hostName = "localhost" + ft.port = "22" + self._check_dict(ft, value={"hostName": "localhost", "port": "22"}) + + def test_license_number(self): + ft = LicenseNumber("LIC123") + self._check_dict(ft, value="LIC123") + + def test_secret_note(self): + ft = SecureNote("Secret Note") + self._check_dict(ft, value="Secret Note") + diff --git a/sdk/python/helper/tests/v3/v3_field_type_test.py b/sdk/python/helper/tests/v3/v3_field_type_test.py new file mode 100644 index 00000000..438057a8 --- /dev/null +++ b/sdk/python/helper/tests/v3/v3_field_type_test.py @@ -0,0 +1,52 @@ +import unittest +from keeper_secrets_manager_helper.v3.field_type import * + + +class FieldTypeTest(unittest.TestCase): + + def test_password_complexity(self): + + pc = PasswordComplexity() + d = pc.to_dict() + self.assertEqual(64, d.get("length")) + self.assertEqual(0, d.get("caps")) + self.assertEqual(0, d.get("lowercase")) + self.assertEqual(0, d.get("digits")) + self.assertEqual(0, d.get("special")) + + pc = PasswordComplexity(length=32, caps=5, lowercase=5, digits=5, special=5) + d = pc.to_dict() + self.assertEqual(32, d.get("length")) + self.assertEqual(5, d.get("caps")) + self.assertEqual(5, d.get("lowercase")) + self.assertEqual(5, d.get("digits")) + self.assertEqual(5, d.get("special")) + + pc = PasswordComplexity({"length": 16, "caps": 2, "lowercase": 3, "digits": 4, "special": 0}) + d = pc.to_dict() + self.assertEqual(16, d.get("length")) + self.assertEqual(2, d.get("caps")) + self.assertEqual(3, d.get("lowercase")) + self.assertEqual(4, d.get("digits")) + self.assertEqual(0, d.get("special")) + + # Uhm, so how a password is generate is up in the air. This is based on the Python SDK, which will use the + # counts over the length. So the password is going to be 9 characters, not 16 :/ + pc = PasswordComplexity({"length": 16, "caps": 2, "lowercase": 3, "digits": 4, "special": 0}) + password = pc.generate_password() + self.assertEqual(9, len(password), "password is too short") + + def test_load_map(self): + get_field_type_map() + + # Nice test to make sure we loaded all the fields, if we add more fields this will fail ... but in a good way. + self.assertEqual(30, len(field_map.keys())) + + # Check if we get a Login class + self.assertEqual(get_class_by_type("login"), Login) + + try: + get_class_by_type("BAD BAD BAD") + self.fail("Should have gotten an exception get bad field class") + except ImportError as _: + pass diff --git a/sdk/python/helper/tests/v3/v3_parser_test.py b/sdk/python/helper/tests/v3/v3_parser_test.py new file mode 100644 index 00000000..2ec02e08 --- /dev/null +++ b/sdk/python/helper/tests/v3/v3_parser_test.py @@ -0,0 +1,105 @@ +import unittest +from keeper_secrets_manager_helper.v3.parser import Parser +from keeper_secrets_manager_helper.field import FieldSectionEnum + + +class ParserTest(unittest.TestCase): + + def test_parser_field(self): + + p = Parser() + field = p.parse_field("login=My Login")[0] + self.assertEqual(field.field_section, FieldSectionEnum.STANDARD) + self.assertEqual(field.type, "login") + self.assertEqual(field.value, "My Login") + self.assertIsNone(field.label) + self.assertIsNone(field.value_key) + + field = p.parse_field("f.password[My Password]=****")[0] + self.assertEqual(field.field_section, FieldSectionEnum.STANDARD) + self.assertEqual(field.type, "password") + self.assertEqual(field.value, "****") + self.assertEqual(field.label, "My Password") + self.assertIsNone(field.value_key) + + field = p.parse_field('c.phone[My Phone Numbers]=' + '{"region": "US", "number": "55555512324", "ext": "7777", "type":"Mobile"}')[0] + self.assertEqual(field.field_section, FieldSectionEnum.CUSTOM) + self.assertEqual(field.type, "phone") + self.assertDictEqual(field.value, {"ext": "7777", "number": "55555512324", "region": "US", "type": "Mobile", + "_complete": True}) + self.assertEqual(field.label, "My Phone Numbers") + self.assertIsNone(field.value_key) + + field = p.parse_field("name[My Name].first=John")[0] + self.assertEqual(field.field_section, FieldSectionEnum.STANDARD) + self.assertEqual(field.type, "name") + self.assertDictEqual(field.value, {'first': 'John'}) + self.assertEqual(field.label, "My Name") + self.assertEqual(field.value_key, "first") + + # Test crazy [] in the label, and test the escape of the escape character + field = p.parse_field(r"name[\[\[\[My\\ Name\]\[\]].first=John")[0] + self.assertEqual(field.field_section, FieldSectionEnum.STANDARD) + self.assertEqual(field.type, "name") + self.assertDictEqual(field.value, {'first': 'John'}) + self.assertEqual(field.label, r"[[[My\ Name][]") + self.assertEqual(field.value_key, "first") + + # JSON Value + field = p.parse_field('f.name={"first": "John", "last": "name"}')[0] + self.assertEqual(field.field_section, FieldSectionEnum.STANDARD) + self.assertEqual(field.type, "name") + self.assertDictEqual(field.value, {"first": "John", "last": "name", "_complete": True}) + self.assertIsNone(field.label) + self.assertIsNone(field.value_key) + + def test_bad_syntax(self): + p = Parser() + + # Is this allowed? It's bad JSON ... but is it JSON. Maybe a warning? + p.parse_field('f.text={"first": "John", "last": "name"') + + # Label is not terminated + try: + p.parse_field('f.text[BLAH=OK') + self.fail("Bad label should have failed") + except ValueError as err: + self.assertRegex(str(err), r'Could not find the end of the label') + + # Bad field section + try: + p.parse_field('k.text=OK') + self.fail("Bad label should have failed") + except ValueError as err: + self.assertRegex(str(err), r"Field section can only be 'f' or 'c'") + + # Text is not a dictionary. The key 'value' doesn't exist. + try: + p.parse_field('c.text.value=OK') + self.fail("Bad key should have failed") + except ValueError as err: + self.assertRegex(str(err), r"does not have value keys") + + # Double value keys + try: + p.parse_field('c.phone.number.number=5551234567') + self.fail("Duplicate value keys should have failed") + except ValueError as err: + self.assertRegex(str(err), r"has already been found") + + # Bad field type + try: + p.parse_field('c.aaaaa=BAD') + self.fail("Bad field type should have failed") + except ValueError as err: + self.assertRegex(str(err), r"does not exists") + + # Bad value type + try: + p.parse_field('c.phone.i_dont_exists=5551234567') + self.fail("Bad value keys should have failed") + except ValueError as err: + self.assertRegex(str(err), r"does not have the value key") + + diff --git a/sdk/python/helper/tests/v3/v3_record_test.py b/sdk/python/helper/tests/v3/v3_record_test.py new file mode 100644 index 00000000..7fd8f23a --- /dev/null +++ b/sdk/python/helper/tests/v3/v3_record_test.py @@ -0,0 +1,255 @@ +import unittest +from keeper_secrets_manager_helper.record_type import RecordType +from keeper_secrets_manager_helper.v3.record import Record +from keeper_secrets_manager_helper.v3.parser import Parser +from keeper_secrets_manager_helper.v3.field_type import PasswordComplexity +import tempfile +import json + + +class ParserTest(unittest.TestCase): + + def test_build_record_simple(self): + + p = Parser() + + login_data = [ + "login=My Login", + "password=My Password", + "url=http://localhost:80" + ] + + fields = p.parse_field(login_data) + + r = Record( + record_type='login', + title="My Login Record", + notes="This is my note", + fields=fields + ) + # Passing fields in on the constructor will automatically call build_record. + # r.build_record() + self.assertEqual("login", r.record_type, "helper type is not correct") + self.assertEqual("My Login Record", r.title, "title is not correct") + self.assertEqual("This is my note", r.notes, "notes is not correct") + + fields = r.fields + # There is 5 fields per helper type. We only set 3 in the test, but there should be 5 + self.assertEqual(5, len(fields), "got 5 fields") + index = 0 + for field_type in ["login", "password", "url", "fileRef", "oneTimeCode"]: + self.assertEqual(field_type, fields[index].get("type"), "first helper is the wrong type") + index += 1 + + def test_build_record_complex(self): + + bank_account_data = [ + # Build in pieces + "f.bankAccount.accountType=sAvInGs", + "f.bankAccount.routingNumber=Routing", + "f.bankAccount.accountNumber=Account", + + # Build using JSON object + 'f.name={"first": "John", "last": "Doe"}', + + # The standard + "url=https://mybank.localhost.com", + + # Build using JSON array + 'cardRef=["PpR0AKIZAtUiyvq1r2BC1w"]', + + # Custom with label + 'c.name[NAME].first=John', + 'c.name[NAME].middle=X', + 'c.name[NAME].last=Smith', + + # Phone with 4 numbers :) + 'c.phone[Phone]={"region": "CA", "number": "ONE", "type": "Work"}', + 'c.phone[Phone]={"region": "US", "number": "TWO", "type": "Home"}', + + 'c.phone[Phone].number=THREE', + 'c.phone[Phone].type=Home', + + 'c.phone[Phone].number=FOUR', + ] + + p = Parser() + fields = p.parse_field(bank_account_data) + + r = Record( + record_type='bankAccount', + title="Bank", + fields=fields, + password_generate=True + ) + # No need to call this since passing in fields in the constructor will do build_record. But run it to make + # sure we don't get dups. + r.build_record() + + # This includes the fields not set + self.assertEqual(8, len(r.fields), "there were not 4 fields") + + self.assertEqual(2, len(r.custom_fields), "there were not 2 custom fields") + + self.assertEqual("password", r.fields[3]["type"], "password is not the 4th field") + + # password_generate will cause the password to be set + pc = PasswordComplexity() + self.assertEqual(pc.length, len(r.fields[3]["value"][0]), "password is not set correctly") + + self.assertEqual(4, len(r.custom_fields[1]["value"]), "custom field phone does have 4 values") + self.assertEqual("ONE", r.custom_fields[1]["value"][0]["number"], "first number is not ONE") + self.assertEqual("TWO", r.custom_fields[1]["value"][1]["number"], "second number is not TWO") + self.assertEqual("THREE", r.custom_fields[1]["value"][2]["number"], "third number is not THREE") + self.assertEqual("FOUR", r.custom_fields[1]["value"][3]["number"], "fourth number is not FOUR") + + def test_value_key_rule_label_grouping(self): + + # Make sure the labels make the field unique + custom_fields = [ + "c.name[My Doctor].first=Jane", + "c.name[My Doctor].last=Smith", + "c.name[My Lawyer].first=John", + "c.name[My Lawyer].last=Doe" + ] + p = Parser() + fields = p.parse_field(custom_fields) + + r = Record( + record_type='login', + title="Custom Fields", + fields=fields, + password_generate=True + ) + + self.assertEqual(2, len(r.custom_fields), "there were not 2 custom fields") + field = r.custom_fields[0] + self.assertEqual("name", field.get("type"), "field type is not name") + self.assertEqual("My Doctor", field.get("label"), "field label is not My Doctor") + value = field.get("value") + self.assertEqual(1, len(value), "there is not 1 value in the first field's value") + self.assertDictEqual(value[0], {'first': 'Jane', 'last': 'Smith'}, "first field's value is not correct") + + field = r.custom_fields[1] + self.assertEqual("name", field.get("type"), "field type is not name") + self.assertEqual("My Lawyer", field.get("label"), "field label is not My Lawyer") + value = field.get("value") + self.assertEqual(1, len(value), "there is not 1 value in the second field's value") + self.assertDictEqual(value[0], {'first': 'John', 'last': 'Doe'}, "second field's value is not correct") + + def test_value_key_rule_no_label_grouping(self): + + # Make sure an initial JSON value completes the field. We should get two fields + custom_fields = [ + # Since this is JSON, this is value one in the field + 'c.phone={"number": "5551231234"}', + # Since this is JSON, this is value two in the field + 'c.phone={"number": "5559999999"}', + # Since the first record are considered complete due to being set by JSON, this creates a third value + "c.phone.number=5551111111" + ] + p = Parser() + fields = p.parse_field(custom_fields) + + r = Record( + record_type='login', + title="Custom Fields", + fields=fields, + password_generate=True + ) + + self.assertEqual(1, len(r.custom_fields), "there were not 1 custom fields") + self.assertEqual(3, len(r.custom_fields[0].get("value")), "did not find three phone numbers") + + def test_adding_to_complete_field(self): + + # Since the phone is being set with a list, the field is considered complete. Nothing else can be added to it. + custom_fields = [ + 'c.phone=[{"number": "5551231234"}]', + "c.phone.number=5551111111" + ] + p = Parser() + fields = p.parse_field(custom_fields) + + try: + Record( + record_type='login', + title="Custom Fields", + fields=fields, + password_generate=True + ) + self.fail("This should have failed due to the field not being unique") + except ValueError as err: + self.assertRegex(str(err), r'Cannot add this field due to it not being unique') + + def test_custom_record_with_duplicate_fields(self): + + data = { + "version": "v3", + "kind": "KeeperRecordType", + "data": [ + { + "class": "MyCustom2", + "name": "myCustom2", + "fields": [ + {"type": "text", "label": "Text One"}, + {"type": "text", "label": "Text Two"}, + {"type": "text", "label": "Text Three"} + ] + } + ] + } + + custom_fields = [ + 'text=ONE', + 'text=TWO', + 'text=THREE', + ] + + with tempfile.NamedTemporaryFile("w", suffix=".json") as fh: + fh.write(json.dumps(data)) + fh.seek(0) + RecordType.load_record_types(fh.name) + fh.close() + + p = Parser() + fields = p.parse_field(custom_fields) + + r = Record( + record_type='myCustom2', + title="Custom Record", + fields=fields, + password_generate=True + ) + self.assertEqual(3, len(r.fields), "did not find 3 text fields") + self.assertEqual("ONE", r.fields[0].get("value")[0], "first field is ONE") + self.assertEqual("TWO", r.fields[1].get("value")[0], "first field is TWO") + self.assertEqual("THREE", r.fields[2].get("value")[0], "first field is THREE") + + def test_invalid_field(self): + + """Attempt to add field that doesn't exist in the record type schema""" + + p = Parser() + + login_data = [ + "login=My Login", + "password=My Password", + # Bad Field + "text=RANDOM TEXT" + ] + + fields = p.parse_field(login_data) + + r = Record( + record_type='login', + title="Bad Record", + ) + # Add field using method instead of constructor + try: + for field in fields: + r.add_fields(field) + r.build_record() + self.fail("Should have failed due to text not being in standard fields.") + except ValueError as err: + self.assertRegex(str(err), 'The standard fields do not have a ') diff --git a/sdk/python/helper/tests/v3/v3_record_type_test.py b/sdk/python/helper/tests/v3/v3_record_type_test.py new file mode 100644 index 00000000..c74ea22c --- /dev/null +++ b/sdk/python/helper/tests/v3/v3_record_type_test.py @@ -0,0 +1,86 @@ +import unittest +from keeper_secrets_manager_helper.record_type import RecordType +from keeper_secrets_manager_helper.v3.record_type import get_class_by_type, make_class_name +import json +import tempfile + + +class ParserTest(unittest.TestCase): + + # def test_did_i_work(self): + # p = rt.Phone() + # print(p.field_map) + # print(p.generate_template()) + + def test_get_class(self): + b = get_class_by_type("bankAccount")() + print(b.generate_template("json")) + print(b.generate_template("yaml")) + + def test_custom_class_name(self): + + name = make_class_name("Keeper VPN - Azure Login") + self.assertEqual("KeeperVpnAzureLogin", name) + + name = make_class_name("Record 123 !!!! #####") + self.assertEqual("Record123", name) + + name = make_class_name("#####") + self.assertEqual("", name) + + def test_load_record_type_file(self): + + data = { + "version": "v3", + "kind": "KeeperRecordType", + "data": [ + { + "class": "MyCustom", + "name": "myCustom", + "fields": [ + {"type": "text", "label": "Text One"}, + {"type": "text", "label": "Text Two"} + ] + } + ] + } + with tempfile.NamedTemporaryFile("w", suffix=".json") as fh: + fh.write(json.dumps(data)) + fh.seek(0) + RecordType.load_record_types(fh.name) + fh.close() + + try: + get_class_by_type("myCustom") + except ImportError as err: + self.fail("Could not find class MyCustom: " + str(err)) + + def test_load_commander_record_type_file(self): + + data = [ + { + "recordTypeId": 35, + "content": + "{\"$id\":\"Azure Login\",\"fields\":" + "[{\"$ref\":\"fileRef\",\"label\":\"File or Photo\"}," + "{\"$ref\":\"login\",\"label\":\"Login\"}," + "{\"$ref\":\"password\",\"label\":\"Password\",\"required\":true," + "\"enforceGeneration\":false,\"privacyScreen\":false," + "\"complexity\":{\"length\":8,\"caps\":0,\"lowercase\":0,\"digits\":0,\"special\":0}}," + "{\"$ref\":\"text\",\"label\":\"System Login\",\"required\":true}," + "{\"$ref\":\"secret\",\"label\":\"System Password / Pin Code\"}," + "{\"$ref\":\"url\",\"label\":\"Keeper VPN Wiki\",\"required\":true}," + "{\"$ref\":\"url\",\"label\":\"Password Best Practices FAQ's and Tips\",\"required\":true}]}" + } + ] + + with tempfile.NamedTemporaryFile("w", suffix=".json") as fh: + fh.write(json.dumps(data)) + fh.seek(0) + RecordType.load_record_types(fh.name) + fh.close() + + try: + get_class_by_type("Azure Login") + except ImportError as err: + self.fail("Could not find class Azure Login: " + str(err))