diff --git a/ascmhl/commands.py b/ascmhl/commands.py index 47ad7db..43cf4fe 100644 --- a/ascmhl/commands.py +++ b/ascmhl/commands.py @@ -27,10 +27,12 @@ ascmhl_default_hashformat, ) from .generator import MHLGenerationCreationSession -from .hasher import hash_file, DirectoryHashContext +from .hasher import hash_file, DirectoryHashContext, multiple_format_hash_file from .hashlist import MHLMediaHash, MHLCreatorInfo, MHLProcessInfo, MHLTool, MHLProcess, MHLAuthor from .history import MHLHistory from .traverse import post_order_lexicographic +from typing import Dict +from collections import namedtuple @click.command() @@ -43,12 +45,13 @@ is_flag=True, help="Verbose output", ) +# FIXME: refactor to allow for multiple hash formats @click.option( "--hash_format", "-h", type=click.Choice(ascmhl_supported_hashformats), - multiple=False, - default=ascmhl_default_hashformat, + multiple=True, + default=[ascmhl_default_hashformat], help="Algorithm", ) @click.option( @@ -171,7 +174,7 @@ def create( def create_for_folder_subcommand( root_path, verbose, - hash_format, + hash_formats, no_directory_hashes, author_name, author_email, @@ -214,39 +217,76 @@ def create_for_folder_subcommand( num_failed_verifications = 0 # store the directory hashes of sub folders so we can use it when calculating the hash of the parent folder - dir_content_hash_mappings = {} - dir_structure_hash_mappings = {} + # the mapping lookups will follow the dictionary format of [string: [hash_format: hash_value]] where string + # is a file sub-path + dir_content_hash_mapping_lookup = {} + dir_structure_hash_mapping_lookup = {} + hash_format_list = sorted(hash_formats) + for folder_path, children in post_order_lexicographic(root_path, session.ignore_spec.get_path_spec()): # generate directory hashes - dir_hash_context = None + dir_hash_context_lookup = {} + if not no_directory_hashes: - dir_hash_context = DirectoryHashContext(hash_format) + # Create a DirectoryHashContext for each hash format and store in the lookup + for hash_format in hash_format_list: + dir_hash_context_lookup[hash_format] = DirectoryHashContext(hash_format) for item_name, is_dir in children: file_path = os.path.join(folder_path, item_name) not_found_paths.discard(file_path) if is_dir: - if not dir_hash_context: - continue - if dir_hash_context: - dir_hash_context.append_directory_hashes( - file_path, dir_content_hash_mappings.pop(file_path), dir_structure_hash_mappings.pop(file_path) - ) - else: - hash_string, success = seal_file_path(existing_history, file_path, hash_format, session) - if not success: - num_failed_verifications += 1 if not no_directory_hashes: - dir_hash_context.append_file_hash(file_path, hash_string) - dir_content_hash = None - dir_structure_hash = None - if dir_hash_context: - dir_content_hash = dir_hash_context.final_content_hash_str() - dir_structure_hash = dir_hash_context.final_structure_hash_str() - dir_content_hash_mappings[folder_path] = dir_content_hash - dir_structure_hash_mappings[folder_path] = dir_structure_hash + path_content_hash_lookup = dir_content_hash_mapping_lookup.pop(file_path) + path_structure_hash_lookup = dir_structure_hash_mapping_lookup.pop(file_path) + + for hash_format, dir_hash_context in dir_hash_context_lookup.items(): + dir_hash_context.append_directory_hashes( + file_path, + path_content_hash_lookup[hash_format], + path_structure_hash_lookup[hash_format], + ) + else: + seal_result = seal_file_path(existing_history, file_path, hash_format_list, session) + + for hash_format, result_tuple in seal_result.items(): + dir_hash_context = None + + if not no_directory_hashes: + dir_hash_context = dir_hash_context_lookup[hash_format] + + hash_string = result_tuple.hash_value + success = result_tuple.success + if not success: + num_failed_verifications += 1 + if dir_hash_context is not None: + dir_hash_context.append_file_hash(file_path, hash_string) + + # Calculate the directory hashes for each format + dir_content_hash_lookup = {} + dir_structure_hash_lookup = {} + + if not no_directory_hashes: + for hash_format, dir_hash_context in dir_hash_context_lookup.items(): + dir_content_hash = dir_hash_context.final_content_hash_str() + dir_structure_hash = dir_hash_context.final_structure_hash_str() + + if dir_content_hash_mapping_lookup and folder_path in dir_content_hash_mapping_lookup.keys(): + dir_content_hash_mapping_lookup[folder_path][hash_format] = dir_content_hash + else: + dir_content_hash_mapping_lookup[folder_path] = {hash_format: dir_content_hash} + + if dir_structure_hash_mapping_lookup and folder_path in dir_structure_hash_mapping_lookup.keys(): + dir_structure_hash_mapping_lookup[folder_path][hash_format] = dir_structure_hash + else: + dir_structure_hash_mapping_lookup[folder_path] = {hash_format: dir_structure_hash} + + dir_content_hash_lookup[hash_format] = dir_content_hash + dir_structure_hash_lookup[hash_format] = dir_structure_hash + modification_date = datetime.datetime.fromtimestamp(os.path.getmtime(folder_path)) - session.append_directory_hashes( - folder_path, modification_date, hash_format, dir_content_hash, dir_structure_hash + + session.append_multiple_format_directory_hashes( + folder_path, modification_date, dir_content_hash_lookup, dir_structure_hash_lookup ) commit_session(session, author_name, author_email, author_phone, author_role, location, comment) @@ -262,7 +302,7 @@ def create_for_folder_subcommand( def create_for_single_files_subcommand( root_path, verbose, - hash_format, + hash_formats, single_file, author_name, author_email, @@ -302,6 +342,9 @@ def create_for_single_files_subcommand( session = MHLGenerationCreationSession(existing_history) num_failed_verifications = 0 + + hash_format_list = sorted(hash_formats) + for path in single_file: if not os.path.isabs(path): path = os.path.join(os.getcwd(), path) @@ -311,11 +354,16 @@ def create_for_single_files_subcommand( file_path = os.path.join(folder_path, item_name) if is_dir: continue - _, success = seal_file_path(existing_history, file_path, hash_format, session) + seal_result = seal_file_path(existing_history, file_path, hash_format_list, session) + # Determine success based on the first format in the list + # TODO: Consider checking all results. Would it be practical to do so? + # For instance, are we concerned about one format failing while another one succeeds? + success = seal_result[hash_format_list[0]].success if not success: num_failed_verifications += 1 else: - _, success = seal_file_path(existing_history, path, hash_format, session) + seal_result = seal_file_path(existing_history, path, hash_format_list, session) + success = seal_result[hash_format_list[0]].success if not success: num_failed_verifications += 1 @@ -376,6 +424,7 @@ def create_for_single_files_subcommand( ) # options @click.option( + # FIXME: Update to permit multiple hash formats "--hash_format", "-h", type=click.Choice(ascmhl_supported_hashformats), @@ -547,23 +596,47 @@ def verify_directory_hash_subcommand( ignore_spec = ignore.MHLIgnoreSpec(existing_history.latest_ignore_patterns(), ignore_list, ignore_spec_file) + # FIXME: Update once argument signature has been modified to supply a list of formats + hash_formats = [] + # choose the hash format of the latest root directory hash if hash_format is None: generation = -1 + # inspect the history and use all documented algorithms as the basis of verification for hash_list in existing_history.hash_lists: if hash_list.generation_number > generation: + # add each hash entry's format to the list of formats if len(hash_list.process_info.root_media_hash.hash_entries) > 0: - hash_format = hash_list.process_info.root_media_hash.hash_entries[0].hash_format - - if hash_format is None: + for entry in hash_list.process_info.root_media_hash.hash_entries: + entry_hash_format = entry.hash_format + # do not permit duplicate entries in the list + if entry_hash_format not in hash_formats: + hash_formats.append(entry_hash_format) + if not hash_formats: + hash_formats.append("c4") logger.verbose(f"default hash format: c4") - hash_format = "c4" else: logger.verbose(f"hash format from latest generation with directory hashes: {hash_format}") else: + hash_formats.append(hash_format) logger.verbose(f"hash format: {hash_format}") # start a verification session on the existing history + hash_format_list = sorted(hash_formats) + # a lookup containing the number of failures detected per hash format - will match the format Dict[str, int] + failures_per_format_lookup = {} + + # an inner function responsible for adding format failures to the lookup + def add_detected_failure_for_format(failed_hash_format): + nonlocal failures_per_format_lookup + if failures_per_format_lookup: + if failed_hash_format in failures_per_format_lookup.keys(): + failures_per_format_lookup[failed_hash_format] = failures_per_format_lookup[failed_hash_format] + 1 + else: + failures_per_format_lookup[failed_hash_format] = 1 + else: + failures_per_format_lookup = {failed_hash_format: 1} + session = MHLGenerationCreationSession(existing_history) num_failed_verifications = 0 @@ -571,87 +644,141 @@ def verify_directory_hash_subcommand( dir_content_hash_mappings = {} dir_structure_hash_mappings = {} for folder_path, children in post_order_lexicographic(root_path, ignore_spec.get_path_spec()): - # generate directory hashes - dir_hash_context = None - dir_hash_context = DirectoryHashContext(hash_format) + # generate directory hashes - will match the format dict[str, DirectoryHashContext] + dir_hash_context_lookup = {} + + # create a DirectoryHashContext for each hash format and store in the lookup + for hash_format in hash_format_list: + dir_hash_context_lookup[hash_format] = DirectoryHashContext(hash_format) + for item_name, is_dir in children: file_path = os.path.join(folder_path, item_name) if is_dir: - file_path = os.path.join(folder_path, item_name) relative_path = existing_history.get_relative_file_path(file_path) history, history_relative_path = existing_history.find_history_for_path(relative_path) # check if there are directory hashes in the generations directory_hash_entries = history.find_directory_hash_entries_for_path(history_relative_path) - content_hash = dir_content_hash_mappings.pop(file_path) - structure_hash = dir_structure_hash_mappings.pop(file_path) - dir_hash_context.append_directory_hashes(file_path, content_hash, structure_hash) + content_hash_lookup = dir_content_hash_mappings.pop(file_path) + structure_hash_lookup = dir_structure_hash_mappings.pop(file_path) + + # Add the content and hash values to the appropriate context + if dir_hash_context_lookup: + for hash_format, dir_hash_context in dir_hash_context_lookup.items(): + content_hash = None + structure_hash = None + if content_hash_lookup: + content_hash = content_hash_lookup[hash_format] + if structure_hash_lookup: + structure_hash = structure_hash_lookup[hash_format] + + dir_hash_context.append_directory_hashes(file_path, content_hash, structure_hash) num_successful_verifications = 0 - found_hash_format = False for directory_hash_entry in directory_hash_entries: - if directory_hash_entry.hash_format != hash_format: - continue - found_hash_format = True + content_hash = None + structure_hash = None - num_current_successful_verifications = -1 - if not root_only: - num_current_successful_verifications = _compare_and_log_directory_hashes( - relative_path, directory_hash_entry, content_hash, structure_hash - ) + if content_hash_lookup: + content_hash = content_hash_lookup[directory_hash_entry.hash_format] + if structure_hash_lookup: + structure_hash = structure_hash_lookup[directory_hash_entry.hash_format] - if num_current_successful_verifications == 2: - num_successful_verifications += 1 - if num_current_successful_verifications == 1: - num_failed_verifications += 1 + found_hash_format = False - if not calculate_only: - if not found_hash_format: - logger.error( - f"ERROR: verification of folder {relative_path}: No directory hash of type" - f" {hash_format} found" - ) - num_failed_verifications += 1 + if content_hash: + found_hash_format = True + + num_current_successful_verifications = -1 + if not root_only: + num_current_successful_verifications = _compare_and_log_directory_hashes( + relative_path, directory_hash_entry, content_hash, structure_hash + ) + + if num_current_successful_verifications == 2: + num_successful_verifications += 1 + if num_current_successful_verifications == 1: + num_failed_verifications += 1 + add_detected_failure_for_format(directory_hash_entry.hash_format) + + if not calculate_only: + if not found_hash_format: + logger.error( + f"ERROR: verification of folder {relative_path}: No directory hash of type" + f" {hash_format} found" + ) + num_failed_verifications += 1 + add_detected_failure_for_format(directory_hash_entry.hash_format) else: - hash_string = hash_file(file_path, hash_format) - dir_hash_context.append_file_hash(file_path, hash_string) - dir_content_hash = None - dir_structure_hash = None - if dir_hash_context: - dir_content_hash = dir_hash_context.final_content_hash_str() - dir_structure_hash = dir_hash_context.final_structure_hash_str() - dir_content_hash_mappings[folder_path] = dir_content_hash - dir_structure_hash_mappings[folder_path] = dir_structure_hash + # generate the hashes for the file + file_hash_lookup = multiple_format_hash_file(file_path, hash_format_list) + # add each hash to the appropriate context + for hash_format, hash_value in file_hash_lookup.items(): + dir_hash_context_lookup[hash_format].append_file_hash(file_path, hash_value) + + # all children have been handled. create the directory hashes + dir_content_hash_lookup = {} + dir_structure_hash_lookup = {} + + if dir_hash_context_lookup: + for hash_format, dir_hash_context in dir_hash_context_lookup.items(): + dir_content_hash = dir_hash_context.final_content_hash_str() + dir_structure_hash = dir_hash_context.final_structure_hash_str() + # add the hashes to the appropriate hash lookups to be added to the session + dir_content_hash_lookup[hash_format] = dir_content_hash + dir_structure_hash_lookup[hash_format] = dir_structure_hash + # add the hash lookups to the appropriate mappings for the folder + if dir_content_hash_lookup: + dir_content_hash_mappings[folder_path] = dir_content_hash_lookup + if dir_structure_hash_lookup: + dir_structure_hash_mappings[folder_path] = dir_structure_hash_lookup + modification_date = datetime.datetime.fromtimestamp(os.path.getmtime(folder_path)) logger.verbose_logging = calculate_only relative_path = session.root_history.get_relative_file_path(folder_path) if root_only and relative_path != ".": logger.verbose_logging = verbose - session.append_directory_hashes( - folder_path, modification_date, hash_format, dir_content_hash, dir_structure_hash + + session.append_multiple_format_directory_hashes( + folder_path, modification_date, dir_content_hash_lookup, dir_structure_hash_lookup ) logger.verbose_logging = verbose # compare root hashes, works differently if folder_path == root_path: - found_hash_format = False + for hash_list in existing_history.hash_lists: root_hash_entries = hash_list.process_info.root_media_hash.hash_entries if len(root_hash_entries) > 0: for root_hash_entry in root_hash_entries: - if root_hash_entry.hash_format == hash_format: + hash_format = root_hash_entry.hash_format + found_hash_format = False + dir_content_hash = None + dir_structure_hash = None + + if dir_content_hash_lookup: + dir_content_hash = dir_content_hash_lookup[hash_format] + if dir_structure_hash_lookup: + dir_structure_hash = dir_structure_hash_lookup[hash_format] + + if dir_content_hash: + found_hash_format = True _compare_and_log_directory_hashes( ".", root_hash_entry, dir_content_hash, dir_structure_hash ) - found_hash_format = True - if not calculate_only: - if not found_hash_format: - logger.error(f"ERROR: verification of root folder: No directory hash of type {hash_format} found") + if not calculate_only: + if not found_hash_format: + logger.error( + f"ERROR: verification of root folder: No directory hash of type {hash_format} found" + ) exception = None - if num_failed_verifications > 0: - exception = errors.VerificationDirectoriesFailedException() + + # check the failure lookup. if even one format verified, consider the entire process verified + if failures_per_format_lookup: + if len(failures_per_format_lookup.keys()) == len(hash_format_list): + exception = errors.VerificationDirectoriesFailedException() if exception: raise exception @@ -719,6 +846,7 @@ def _compare_and_log_directory_hashes( @click.command() @click.argument("file_path", type=click.Path(exists=True)) @click.option( + # FIXME: Update to permit multiple hash formats "--hash_format", "-h", type=click.Choice(ascmhl_supported_hashformats), @@ -1227,7 +1355,27 @@ def commit_session_for_collection( session.commit(creator_info, process_info) -def seal_file_path(existing_history, file_path, hash_format, session) -> (str, bool): +""" +A tuple for returning the result of a seal file path operation +attributes: +hash_value -- string value, a hash +success -- boolean value, indicates if the update was successful +""" +SealPathResult = namedtuple("SealPathResult", ["hash_value", "success"]) + + +def seal_file_path(existing_history, file_path, hash_formats: [str], session) -> Dict[str, SealPathResult]: + """ + Generates hashes for a file path. + Compares the generated hashes to any existing hash records + Adds the generated hashes to the hash history of the file path + :param existing_history: The existing hash record + :param file_path: The path for which to generate hashes + :param hash_formats: The hash formats to generate + :param session: The session to which the generated hashes will be added + :return: A dictionary keyed by hash_format strings. + Each entry contains the hash value and a boolean indicating if updating was successful + """ relative_path = existing_history.get_relative_file_path(file_path) file_size = os.path.getsize(file_path) file_modification_date = datetime.datetime.fromtimestamp(os.path.getmtime(file_path)) @@ -1236,22 +1384,72 @@ def seal_file_path(existing_history, file_path, hash_format, session) -> (str, b existing_child_history, existing_history_relative_path = existing_history.find_history_for_path(relative_path) existing_hash_formats = existing_child_history.find_existing_hash_formats_for_path(existing_history_relative_path) - # in case there is no hash in the required format to use yet, we need to verify also against - # one of the existing hash formats, we for simplicity use always the first hash format in this example - # but one could also use a different one if desired - success = True - if len(existing_hash_formats) > 0 and hash_format not in existing_hash_formats: - existing_hash_format = existing_hash_formats[0] - hash_in_existing_format = hash_file(file_path, existing_hash_format) - # FIXME: test what happens if the existing hash verification fails in other format fails - # should we then really create two entries - success &= session.append_file_hash( - file_path, file_size, file_modification_date, existing_hash_format, hash_in_existing_format - ) - current_format_hash = hash_file(file_path, hash_format) - # in case the existing hash verification failed we don't want to add the current format hash to the generation - # but we need to return it for directory hash creation - if not success: - return current_format_hash, False - success &= session.append_file_hash(file_path, file_size, file_modification_date, hash_format, current_format_hash) - return current_format_hash, success + # create a separate list of hash formats for which hashes will be generated + hash_formats_to_generate = [] + + # if there are existing entries for this path, the recorded formats must also be generated + if existing_hash_formats and len(existing_hash_formats) > 0: + # existing hash formats should be verified prior to any new formats + for hash_format in existing_hash_formats: + if hash_format in hash_formats: + hash_formats_to_generate.append(hash_format) + # if no formats are being carried over from the previous generation to this one, at least + # one of the previous generation hashes needs to at least be verified as correct + # TODO: Consider making the selection of the previous generation bench mark format less arbitrary + # TODO: Instead of selecting the first format, perhaps choose the most efficient or robust format + if not hash_formats_to_generate or len(hash_formats_to_generate) == 0: + hash_formats_to_generate.append(existing_hash_formats[0]) + + for hash_format in hash_formats: + if hash_format not in hash_formats_to_generate: + hash_formats_to_generate.append(hash_format) + + # generate the file hashes + current_hash_lookup = multiple_format_hash_file(file_path, hash_formats_to_generate) + + # the lookup where the results will be stored + hash_result_lookup = {} + + # any newly generated hash will require any previous hashes to be verified + existing_hashes_verified = True + + # verify the existing hash entries first + if existing_hash_formats: + for hash_format in existing_hash_formats: + # make sure the existing format was included in the hashes which were generated + if hash_format not in hash_formats_to_generate: + continue + + success = True + success &= session.append_file_hash( + file_path, file_size, file_modification_date, hash_format, current_hash_lookup[hash_format] + ) + + # if even one existing hash failed to verify existing hashes will be deemed corrupt + if not success: + existing_hashes_verified = False + + # if the existing format was requested add it to the results lookup + if hash_format in hash_formats: + hash_result_lookup[hash_format] = SealPathResult(current_hash_lookup[hash_format], success) + + # add the new hash formats + for hash_format in hash_formats_to_generate: + # existing hashes have already been handled + if existing_hash_formats and hash_format in existing_hash_formats: + continue + + # if this format has never been recorded the previous hashes must be verified for this hash to be verified + success = existing_hashes_verified + + # only add the new hash format to the session if the previous hashes are verified + if success: + success &= session.append_file_hash( + file_path, file_size, file_modification_date, hash_format, current_hash_lookup[hash_format] + ) + + # If the existing hash was requested by the caller it needs to be added to the lookup + if hash_format in hash_formats: + hash_result_lookup[hash_format] = SealPathResult(current_hash_lookup[hash_format], success) + + return hash_result_lookup diff --git a/ascmhl/generator.py b/ascmhl/generator.py index bae6c92..a41b4cf 100644 --- a/ascmhl/generator.py +++ b/ascmhl/generator.py @@ -41,6 +41,78 @@ def __init__(self, history: MHLHistory, ignore_spec: MHLIgnoreSpec = MHLIgnoreSp self.new_hash_lists = defaultdict(MHLHashList) self.ignore_spec = ignore_spec + def append_multiple_format_file_hashes( + self, file_path, file_size, hash_lookup: Dict[str, str], file_modification_date, action=None, hash_date=None + ) -> bool: + """ + Adds file hashes to the history + :param file_path: a string value representing the path to a file + :param file_size: size of the file path in bytes + :param hash_lookup: a dictionary of hash values keyed by the respective hash format + :param file_modification_date: date the file was last modified + :param action: a predetermined action for the entry. defaults to none + :param hash_date: date the hashes were generated + :return a bool indicating if the hashes were successfully appended. returns false if any failures occur + """ + relative_path = self.root_history.get_relative_file_path(file_path) + # TODO: handle if path is outside of history root path + # Keep track of the number of failures + failures = 0 + history, history_relative_path = self.root_history.find_history_for_path(relative_path) + # for collections we cannot create a valid relative path (we are in the "wrong" history), but in that case + # the file_path is inputted already as the relative path (a bit of implicit functionality here) + if history_relative_path == None: + history_relative_path = file_path + + # check if there is an existing hash in the other generations and verify + original_hash_entry = history.find_original_hash_entry_for_path(history_relative_path) + + hash_entries = [MHLHashEntry] + # TODO: sort the format keys into a standard order for consistent output + for hash_format, hash_string in hash_lookup.items(): + hash_entry = MHLHashEntry(hash_format, hash_string, hash_date=hash_date) + if original_hash_entry is None: + hash_entry.action = "original" + logger.verbose(f" created original hash for {relative_path} {hash_format}: {hash_string}") + else: + existing_hash_entry = history.find_first_hash_entry_for_path(history_relative_path, hash_format) + if existing_hash_entry is not None: + if existing_hash_entry.hash_string == hash_string: + hash_entry.action = "verified" + logger.verbose(f" verified {relative_path} {hash_format}: OK") + else: + hash_entry.action = "failed" + failures += 1 + logger.error( + f"ERROR: hash mismatch for {relative_path} " + f"{hash_format} (old): {existing_hash_entry.hash_string}, " + f"{hash_format} (new): {hash_string}" + ) + else: + # in case there is no hash entry for this hash format yet + hash_entry.action = ( # mark as 'new' here, will be changed to verified in _validate_new_hash_list + "new" + ) + logger.verbose(f" created new (verif.) hash for {relative_path} {hash_format}: {hash_string}") + # collection behavior: overwrite action with action from flattened history + if action != None: + hash_entry.action = action + + # Add the generated entry to the list + hash_entries.append(hash_entry) + + # in case the same file is hashes multiple times we want to add all hash entries + new_hash_list = self.new_hash_lists[history] + media_hash = new_hash_list.find_or_create_media_hash_for_path( + history_relative_path, file_size, file_modification_date + ) + + # Add the new hash entries + for hash_entry in hash_entries: + media_hash.append_hash_entry(hash_entry) + + return failures == 0 + def append_file_hash( self, file_path, file_size, file_modification_date, hash_format, hash_string, action=None, hash_date=None ) -> bool: @@ -66,7 +138,7 @@ def append_file_hash( if existing_hash_entry is not None: if existing_hash_entry.hash_string == hash_string: hash_entry.action = "verified" - logger.verbose(f" verified {relative_path} OK") + logger.verbose(f" verified {relative_path} {hash_format}: OK") else: hash_entry.action = "failed" logger.error( @@ -77,9 +149,7 @@ def append_file_hash( else: # in case there is no hash entry for this hash format yet hash_entry.action = "new" # mark as 'new' here, will be changed to verified in _validate_new_hash_list - logger.verbose( - f" created new, verified hash for {relative_path} {hash_format}: {hash_string}" - ) + logger.verbose(f" created new (verif.) hash for {relative_path} {hash_format}: {hash_string}") # in case the same file is hashes multiple times we want to add all hash entries new_hash_list = self.new_hash_lists[history] @@ -94,6 +164,69 @@ def append_file_hash( media_hash.append_hash_entry(hash_entry) return hash_entry.action != "failed" + def append_multiple_format_directory_hashes( + self, path, modification_date, content_hash_lookup: Dict[str, str], structure_hash_lookup: Dict[str, str] + ) -> None: + """ + Adds directory hashes to the history + :param path: a string value representing the path to a file + :param modification_date: date the file was last modified + :param content_hash_lookup: a dictionary of content hash values keyed by the respective hash format + :param structure_hash_lookup: a dictionary of structure hash values keyed by the respective hash format + :return: none + """ + relative_path = self.root_history.get_relative_file_path(path) + # TODO: handle if path is outside of history root path + + history, history_relative_path = self.root_history.find_history_for_path(relative_path) + + # in case the same file is hashes multiple times we want to add all hash entries + new_hash_list = self.new_hash_lists[history] + media_hash = new_hash_list.find_or_create_media_hash_for_path(history_relative_path, None, modification_date) + media_hash.is_directory = True + + # Add the content entries + if content_hash_lookup: + for hash_format, content_hash_string in content_hash_lookup.items(): + # Find the structure hash string + structure_hash_string = structure_hash_lookup[hash_format] + + hash_entry = MHLHashEntry(hash_format, content_hash_string) + # Attempt to add the structure, if available + hash_entry.structure_hash_string = structure_hash_string + media_hash.append_hash_entry(hash_entry) + + if relative_path == ".": + logger.verbose( + f" calculated root hash {hash_format}: " + f"{content_hash_string} (content), " + f"{structure_hash_string} (structure)" + ) + else: + logger.verbose( + f" calculated directory hash for {relative_path} {hash_format}: " + f"{content_hash_string} (content), " + f"{structure_hash_string} (structure)" + ) + else: + logger.verbose(f" added directory entry for {relative_path}") + + # in case we just created the root media hash of the current hash list we also add it one history level above + if new_hash_list.process_info.root_media_hash is media_hash and history.parent_history: + parent_history = history.parent_history + parent_relative_path = parent_history.get_relative_file_path(path) + parent_hash_list = self.new_hash_lists[parent_history] + parent_media_hash = parent_hash_list.find_or_create_media_hash_for_path( + parent_relative_path, None, modification_date + ) + parent_media_hash.is_directory = True + if content_hash_lookup: + for hash_format, content_hash_string in content_hash_lookup.items(): + structure_hash_string = structure_hash_lookup[hash_format] + hash_entry = MHLHashEntry(hash_format, content_hash_string) + hash_entry.structure_hash_string = structure_hash_string + parent_media_hash.append_hash_entry(hash_entry) + def append_directory_hashes( self, path, modification_date, hash_format, content_hash_string, structure_hash_string ) -> None: diff --git a/ascmhl/hasher.py b/ascmhl/hasher.py index e583f51..5a84c8c 100644 --- a/ascmhl/hasher.py +++ b/ascmhl/hasher.py @@ -13,6 +13,7 @@ import os from enum import Enum, unique from abc import ABC, abstractmethod +from typing import Dict class Hasher(ABC): @@ -242,12 +243,84 @@ class HashType(Enum): c4 = C4 +class AggregateHasher: + def __init__(self, hash_formats: [str]): + + # Build a hasher for each format + hasher_lookup = Dict[str, str] + for hash_format in hash_formats: + hasher_lookup[hash_format] = new_hasher_for_hash_type(hash_format) + + self.hash_formats = hash_formats + self.hasher_lookup = hasher_lookup + + """ + Handles multiple hashing to facilitate a read-once create-many hashing paradigm + """ + + @classmethod + def hash_file(cls, file_path: str, hash_formats: [str]) -> Dict[str, str]: + """ + computes and returns new hash strings for a file + + arguments: + file_path -- string value, path of file to generate hash for. + hash_formats -- array string values, each entry should be one of the supported hash formats, e.g. 'md5', 'xxh64' + """ + + # Build a hasher for each supplied format + hasher_lookup = {} + for hash_format in hash_formats: + hasher = new_hasher_for_hash_type(hash_format) + hasher_lookup[hash_format] = hasher + + # Open the file + with open(file_path, "rb") as fd: + # process files in chunks so that large files won't cause excessive memory consumption. + size = 1024 * 1024 # chunk size 1MB + chunk = fd.read(size) + while chunk: + # Update each stored hasher with the read chunk + for hash_format in hasher_lookup: + hasher_lookup[hash_format].update(chunk) + + chunk = fd.read(size) + + # Get the digest from each hasher + hash_output_lookup = {} + for hash_format in hasher_lookup: + hash_output_lookup[hash_format] = hasher_lookup[hash_format].string_digest() + + return hash_output_lookup + + @classmethod + def hash_data(cls, input_data: bytes, hash_formats: [str]) -> Dict[str, str]: + """ + computes and returns new hash strings for a file + + arguments: + input_data -- the bytes to compute the hash from. + hash_formats -- array string values, each entry should be one of the supported hash formats, e.g. 'md5', 'xxh64' + """ + + # Build a hash for each supplied format + hash_output_lookup = {} + for hash_format in hash_formats: + hash_generator = new_hasher_for_hash_type(hash_format) + hash_generator.update(input_data) + computed_hash = hash_generator.string_digest() + hash_output_lookup[hash_format] = computed_hash + + return hash_output_lookup + + class DirectoryHashContext: """ DirectoryHashContext wraps the data necessary to compute directory checksums. """ def __init__(self, hash_format: str): + self.hash_format = hash_format self.hasher = new_hasher_for_hash_type(hash_format) self.content_hash_strings = [] @@ -318,6 +391,17 @@ def hash_of_hash_list(hash_list: [str], hash_format: str) -> str: return hasher.hash_of_hash_list(hash_list) +def multiple_format_hash_file(file_path: str, hash_formats: [str]) -> Dict[str, str]: + """ + computes and returns a new hash strings for a file + + arguments: + file_path -- string value, path of file to generate hash for. + hash_formats -- string values, each entry is one of the supported hash formats, e.g. 'md5', 'xxh64' + """ + return AggregateHasher.hash_file(file_path, hash_formats) + + def hash_file(filepath: str, hash_format: str) -> str: """ computes and returns a new hash string for a file @@ -342,6 +426,16 @@ def hash_data(input_data: bytes, hash_format: str) -> str: return hasher.hash_data(input_data) +def multiple_format_hash_data(input_data: bytes, hash_formats: [str]) -> Dict[str, str]: + """ + computes and returns new hash strings from the input data + arguments: + input_data -- the bytes to compute the hash from + hash_formats -- string values, each entry is one of the supported hash formats, e.g. 'md5', 'xxh64' + """ + return AggregateHasher.hash_data(input_data, hash_formats) + + def bytes_for_hash_string(hash_string: str, hash_format: str) -> bytes: """ wraps the different Hasher string to byte conversions diff --git a/examples/scenarios/Output/scenario_02/log.txt b/examples/scenarios/Output/scenario_02/log.txt index 2a89b86..66ea089 100644 --- a/examples/scenarios/Output/scenario_02/log.txt +++ b/examples/scenarios/Output/scenario_02/log.txt @@ -25,10 +25,10 @@ this will verify all hashes, check for completeness and create a second generati $ ascmhl.py create -v /file_server/A002R2EC -h xxh64 Creating new generation for folder at path: /file_server/A002R2EC ... - verified Clips/A002C006_141024_R2EC.mov OK - verified Clips/A002C007_141024_R2EC.mov OK + verified Clips/A002C006_141024_R2EC.mov xxh64: OK + verified Clips/A002C007_141024_R2EC.mov xxh64: OK calculated directory hash for Clips xxh64: 4c226b42e27d7af3 (content), 906faa843d591a9f (structure) - verified Sidecar.txt OK + verified Sidecar.txt xxh64: OK calculated root hash xxh64: 8d02114c32e28cbe (content), f557f8ca8e5a88ef (structure) Created new generation ascmhl/0002_A002R2EC_2020-01-17_143000.mhl diff --git a/examples/scenarios/Output/scenario_03/log.txt b/examples/scenarios/Output/scenario_03/log.txt index 243fc88..e178200 100644 --- a/examples/scenarios/Output/scenario_03/log.txt +++ b/examples/scenarios/Output/scenario_03/log.txt @@ -28,13 +28,13 @@ and create a second generation with additional (new) MD5 hashes. $ ascmhl.py create -v -h md5 /file_server/A002R2EC Creating new generation for folder at path: /file_server/A002R2EC ... - verified Clips/A002C006_141024_R2EC.mov OK - created new, verified hash for Clips/A002C006_141024_R2EC.mov md5: f5ac8127b3b6b85cdc13f237c6005d80 - verified Clips/A002C007_141024_R2EC.mov OK - created new, verified hash for Clips/A002C007_141024_R2EC.mov md5: 614dd0e977becb4c6f7fa99e64549b12 + verified Clips/A002C006_141024_R2EC.mov xxh64: OK + created new (verif.) hash for Clips/A002C006_141024_R2EC.mov md5: f5ac8127b3b6b85cdc13f237c6005d80 + verified Clips/A002C007_141024_R2EC.mov xxh64: OK + created new (verif.) hash for Clips/A002C007_141024_R2EC.mov md5: 614dd0e977becb4c6f7fa99e64549b12 calculated directory hash for Clips md5: 202a2d71b56b080d9b089c1f4f29a4ba (content), 4a739024fd19d928e9dea6bb5c480200 (structure) - verified Sidecar.txt OK - created new, verified hash for Sidecar.txt md5: 6425c5a180ca0f420dd2b25be4536a91 + verified Sidecar.txt xxh64: OK + created new (verif.) hash for Sidecar.txt md5: 6425c5a180ca0f420dd2b25be4536a91 calculated root hash md5: 6fae2da9bc6dca45486cb91bfea6db70 (content), be1f2eaed208efbed061845a64cacdfa (structure) Created new generation ascmhl/0002_A002R2EC_2020-01-17_143000.mhl diff --git a/examples/scenarios/Output/scenario_04/log.txt b/examples/scenarios/Output/scenario_04/log.txt index d92593f..9a8e4e4 100644 --- a/examples/scenarios/Output/scenario_04/log.txt +++ b/examples/scenarios/Output/scenario_04/log.txt @@ -29,8 +29,8 @@ An error is shown and create a new generation that documents the failed verifica $ ascmhl.py create -v /file_server/A002R2EC -h xxh64 Creating new generation for folder at path: /file_server/A002R2EC ... - verified Clips/A002C006_141024_R2EC.mov OK - verified Clips/A002C007_141024_R2EC.mov OK + verified Clips/A002C006_141024_R2EC.mov xxh64: OK + verified Clips/A002C007_141024_R2EC.mov xxh64: OK calculated directory hash for Clips xxh64: 4c226b42e27d7af3 (content), 906faa843d591a9f (structure) ERROR: hash mismatch for Sidecar.txt xxh64 (old): 3ab5a4166b9bde44, xxh64 (new): 70d2cf31aaa3eac4 calculated root hash xxh64: 8e52e9c3d15e055c (content), 32706d5f4b48f047 (structure) diff --git a/examples/scenarios/Output/scenario_05/log.txt b/examples/scenarios/Output/scenario_05/log.txt index b382a85..97850ef 100644 --- a/examples/scenarios/Output/scenario_05/log.txt +++ b/examples/scenarios/Output/scenario_05/log.txt @@ -45,15 +45,15 @@ of the card sub folders. $ ascmhl.py create -v /file_server/Reels -h xxh64 Creating new generation for folder at path: /file_server/Reels ... - verified A002R2EC/Clips/A002C006_141024_R2EC.mov OK - verified A002R2EC/Clips/A002C007_141024_R2EC.mov OK + verified A002R2EC/Clips/A002C006_141024_R2EC.mov xxh64: OK + verified A002R2EC/Clips/A002C007_141024_R2EC.mov xxh64: OK calculated directory hash for A002R2EC/Clips xxh64: 4c226b42e27d7af3 (content), 906faa843d591a9f (structure) - verified A002R2EC/Sidecar.txt OK + verified A002R2EC/Sidecar.txt xxh64: OK calculated directory hash for A002R2EC xxh64: 8d02114c32e28cbe (content), f557f8ca8e5a88ef (structure) - verified A003R2EC/Clips/A003C011_141024_R2EC.mov OK - verified A003R2EC/Clips/A003C012_141024_R2EC.mov OK + verified A003R2EC/Clips/A003C011_141024_R2EC.mov xxh64: OK + verified A003R2EC/Clips/A003C012_141024_R2EC.mov xxh64: OK calculated directory hash for A003R2EC/Clips xxh64: f2afc6434255a53d (content), a25d5ca89c95f9e2 (structure) - verified A003R2EC/Sidecar.txt OK + verified A003R2EC/Sidecar.txt xxh64: OK calculated directory hash for A003R2EC xxh64: 7a82373c131cf40a (content), 1131a950fcc55e4b (structure) created original hash for Summary.txt xxh64: 0ac48e431d4538ba calculated root hash xxh64: 92950bc8fda076ec (content), 2c2ce52605558158 (structure) diff --git a/examples/scenarios/README.md b/examples/scenarios/README.md index ff3fad6..2899782 100644 --- a/examples/scenarios/README.md +++ b/examples/scenarios/README.md @@ -52,10 +52,10 @@ this will verify all hashes, check for completeness and create a second generati $ ascmhl.py create -v /file_server/A002R2EC -h xxh64 Creating new generation for folder at path: /file_server/A002R2EC ... - verified Clips/A002C006_141024_R2EC.mov OK - verified Clips/A002C007_141024_R2EC.mov OK + verified Clips/A002C006_141024_R2EC.mov xxh64: OK + verified Clips/A002C007_141024_R2EC.mov xxh64: OK calculated directory hash for Clips xxh64: 4c226b42e27d7af3 (content), 906faa843d591a9f (structure) - verified Sidecar.txt OK + verified Sidecar.txt xxh64: OK calculated root hash xxh64: 8d02114c32e28cbe (content), f557f8ca8e5a88ef (structure) Created new generation ascmhl/0002_A002R2EC_2020-01-17_143000.mhl @@ -94,13 +94,13 @@ and create a second generation with additional (new) MD5 hashes. $ ascmhl.py create -v -h md5 /file_server/A002R2EC Creating new generation for folder at path: /file_server/A002R2EC ... - verified Clips/A002C006_141024_R2EC.mov OK - created new, verified hash for Clips/A002C006_141024_R2EC.mov md5: f5ac8127b3b6b85cdc13f237c6005d80 - verified Clips/A002C007_141024_R2EC.mov OK - created new, verified hash for Clips/A002C007_141024_R2EC.mov md5: 614dd0e977becb4c6f7fa99e64549b12 + verified Clips/A002C006_141024_R2EC.mov xxh64: OK + created new (verif.) hash for Clips/A002C006_141024_R2EC.mov md5: f5ac8127b3b6b85cdc13f237c6005d80 + verified Clips/A002C007_141024_R2EC.mov xxh64: OK + created new (verif.) hash for Clips/A002C007_141024_R2EC.mov md5: 614dd0e977becb4c6f7fa99e64549b12 calculated directory hash for Clips md5: 202a2d71b56b080d9b089c1f4f29a4ba (content), 4a739024fd19d928e9dea6bb5c480200 (structure) - verified Sidecar.txt OK - created new, verified hash for Sidecar.txt md5: 6425c5a180ca0f420dd2b25be4536a91 + verified Sidecar.txt xxh64: OK + created new (verif.) hash for Sidecar.txt md5: 6425c5a180ca0f420dd2b25be4536a91 calculated root hash md5: 6fae2da9bc6dca45486cb91bfea6db70 (content), be1f2eaed208efbed061845a64cacdfa (structure) Created new generation ascmhl/0002_A002R2EC_2020-01-17_143000.mhl @@ -140,8 +140,8 @@ An error is shown and create a new generation that documents the failed verifica $ ascmhl.py create -v /file_server/A002R2EC -h xxh64 Creating new generation for folder at path: /file_server/A002R2EC ... - verified Clips/A002C006_141024_R2EC.mov OK - verified Clips/A002C007_141024_R2EC.mov OK + verified Clips/A002C006_141024_R2EC.mov xxh64: OK + verified Clips/A002C007_141024_R2EC.mov xxh64: OK calculated directory hash for Clips xxh64: 4c226b42e27d7af3 (content), 906faa843d591a9f (structure) ERROR: hash mismatch for Sidecar.txt xxh64 (old): 3ab5a4166b9bde44, xxh64 (new): 70d2cf31aaa3eac4 calculated root hash xxh64: 8e52e9c3d15e055c (content), 32706d5f4b48f047 (structure) @@ -200,15 +200,15 @@ of the card sub folders. $ ascmhl.py create -v /file_server/Reels -h xxh64 Creating new generation for folder at path: /file_server/Reels ... - verified A002R2EC/Clips/A002C006_141024_R2EC.mov OK - verified A002R2EC/Clips/A002C007_141024_R2EC.mov OK + verified A002R2EC/Clips/A002C006_141024_R2EC.mov xxh64: OK + verified A002R2EC/Clips/A002C007_141024_R2EC.mov xxh64: OK calculated directory hash for A002R2EC/Clips xxh64: 4c226b42e27d7af3 (content), 906faa843d591a9f (structure) - verified A002R2EC/Sidecar.txt OK + verified A002R2EC/Sidecar.txt xxh64: OK calculated directory hash for A002R2EC xxh64: 8d02114c32e28cbe (content), f557f8ca8e5a88ef (structure) - verified A003R2EC/Clips/A003C011_141024_R2EC.mov OK - verified A003R2EC/Clips/A003C012_141024_R2EC.mov OK + verified A003R2EC/Clips/A003C011_141024_R2EC.mov xxh64: OK + verified A003R2EC/Clips/A003C012_141024_R2EC.mov xxh64: OK calculated directory hash for A003R2EC/Clips xxh64: f2afc6434255a53d (content), a25d5ca89c95f9e2 (structure) - verified A003R2EC/Sidecar.txt OK + verified A003R2EC/Sidecar.txt xxh64: OK calculated directory hash for A003R2EC xxh64: 7a82373c131cf40a (content), 1131a950fcc55e4b (structure) created original hash for Summary.txt xxh64: 0ac48e431d4538ba calculated root hash xxh64: 92950bc8fda076ec (content), 2c2ce52605558158 (structure) diff --git a/tests/test_create.py b/tests/test_create.py index 29e1074..5339b2c 100644 --- a/tests/test_create.py +++ b/tests/test_create.py @@ -98,13 +98,13 @@ def test_create_directory_hashes(fs): file.write("!!") runner = CliRunner() - result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-h", "xxh64"]) + result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-h", "xxh64", "-h", "md5"]) assert "ERROR: hash mismatch for A/A2.txt" in result.output hash_list = MHLHistory.load_from_path("/root").hash_lists[-1] # an altered file leads to a different root directory hash - assert hash_list.process_info.root_media_hash.hash_entries[0].hash_string == "28ed09733f793dfc" + assert hash_list.process_info.root_media_hash.hash_entries[1].hash_string == "28ed09733f793dfc" # structure hash stays the same - assert hash_list.process_info.root_media_hash.hash_entries[0].structure_hash_string == "89e4debdb80cc068" + assert hash_list.process_info.root_media_hash.hash_entries[1].structure_hash_string == "89e4debdb80cc068" # test that the directory-hash command creates the same root hash # FIXME: command doesn't exist any more, replace with tests of verify directory hashes command? @@ -112,29 +112,44 @@ def test_create_directory_hashes(fs): # assert result.exit_code == 0 # assert "root hash: xxh64: adf18c910489663c" in result.output - assert hash_list.find_media_hash_for_path("B").hash_entries[0].hash_string == "aab0eba57cd1aca9" - assert hash_list.find_media_hash_for_path("B").hash_entries[0].structure_hash_string == "fac2a2ceb0fa0c0b" - assert hash_list.process_info.root_media_hash.hash_entries[0].hash_string == "28ed09733f793dfc" - assert hash_list.process_info.root_media_hash.hash_entries[0].structure_hash_string == "89e4debdb80cc068" + assert hash_list.find_media_hash_for_path("B").hash_entries[0].hash_string == "d6df246725efff6ceaee31f663a32cf8" + assert hash_list.find_media_hash_for_path("B").hash_entries[1].hash_string == "aab0eba57cd1aca9" + + assert ( + hash_list.find_media_hash_for_path("B").hash_entries[0].structure_hash_string + == "a21e164c1df944733e5e3d4e4ed64f90" + ) + assert hash_list.find_media_hash_for_path("B").hash_entries[1].structure_hash_string == "fac2a2ceb0fa0c0b" + + assert hash_list.process_info.root_media_hash.hash_entries[1].hash_string == "28ed09733f793dfc" + assert hash_list.process_info.root_media_hash.hash_entries[1].structure_hash_string == "89e4debdb80cc068" # rename one file os.rename("/root/B/B1.txt", "/root/B/B2.txt") runner = CliRunner() - result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-h", "xxh64"]) + result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-h", "xxh64", "-h", "c4"]) assert "ERROR: hash mismatch for A/A2.txt" in result.output # in addition to the failing verification we also have a missing file B1/B1.txt assert "missing file(s):\n B/B1.txt" in result.output hash_list = MHLHistory.load_from_path("/root").hash_lists[-1] # the file name is part of the structure directory hash of the containing directory so it's hash changes - assert hash_list.find_media_hash_for_path("B").hash_entries[0].structure_hash_string == "7ae620e883160eb3" + assert ( + hash_list.find_media_hash_for_path("B").hash_entries[0].structure_hash_string + == "c42qegPDBxh16Vqi4qFGh1EQv39nEbVmZ9R1LGkaVr1dEBRcD69pH3r5vdGDSwceQLEZc872kQho5Cforb95s2wjH8" + ) + assert hash_list.find_media_hash_for_path("B").hash_entries[1].structure_hash_string == "7ae620e883160eb3" # .. and the content hash stays the same - assert hash_list.find_media_hash_for_path("B").hash_entries[0].hash_string == "aab0eba57cd1aca9" + assert ( + hash_list.find_media_hash_for_path("B").hash_entries[0].hash_string + == "c43X1ve8nmicwGit4fnhs428pTCV6ZjXQsorxPLNx3396oRuQFaq79iLR2ZsPoWN8yckFzZdkqZ21igH8K7rWAoDMa" + ) + assert hash_list.find_media_hash_for_path("B").hash_entries[1].hash_string == "aab0eba57cd1aca9" # a renamed file also leads to a different root structure directory hash - assert hash_list.process_info.root_media_hash.hash_entries[0].structure_hash_string == "0bba67923d19d36b" + assert hash_list.process_info.root_media_hash.hash_entries[1].structure_hash_string == "0bba67923d19d36b" # and an unchanged content hash - assert hash_list.process_info.root_media_hash.hash_entries[0].hash_string == "28ed09733f793dfc" + assert hash_list.process_info.root_media_hash.hash_entries[1].hash_string == "28ed09733f793dfc" # test that the directory-hash command creates the same root hash # FIXME: command doesn't exist any more, replace with tests of verify directory hashes command? @@ -310,3 +325,39 @@ def test_creator_info(fs, simple_mhl_history): assert "franz@example.com" in result.output assert "123-4567" in result.output assert "Data Manager" in result.output + + +def test_create_mulitple_hashformats(fs, simple_mhl_history): + runner = CliRunner() + result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-n", "-h", "md5", "-h", "sha1"]) + assert result.exit_code == 0 + + assert "A/A1.txt md5: fe6975a937016c20b43b17540e6c6246" in result.output + assert "A/A1.txt sha1: 4a5b95edbea7de5ed2367432645df88cd4f1d1b6" in result.output + + +def test_create_mulitple_hashformats_no_dash_n(fs, simple_mhl_history): + runner = CliRunner() + result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-h", "md5", "-h", "sha1"]) + assert result.exit_code == 0 + + assert "A/A1.txt md5: fe6975a937016c20b43b17540e6c6246" in result.output + assert "A/A1.txt sha1: 4a5b95edbea7de5ed2367432645df88cd4f1d1b6" in result.output + + +@freeze_time("2020-01-16 09:15:00") +def test_create_mulitple_hashformats_double_hashformat(fs, simple_mhl_history): + runner = CliRunner() + result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-h", "c4"]) + result = runner.invoke(ascmhl.commands.create, ["/root", "-v", "-h", "md5", "-h", "sha1"]) + + # check if mhl file exists + mhlfilepath = "/root/ascmhl/0003_root_2020-01-16_091500.mhl" + assert os.path.isfile(mhlfilepath) + + # check if mhl file validates + result = runner.invoke(ascmhl.commands.xsd_schema_check, [mhlfilepath]) + if result.exit_code != 0: + print(result.output) + + assert result.exit_code == 0 diff --git a/tests/test_hasher.py b/tests/test_hasher.py index 853fa11..62ca4f6 100644 --- a/tests/test_hasher.py +++ b/tests/test_hasher.py @@ -37,6 +37,36 @@ def test_hash_data(): assert h == v # assert our computed hash equals expected hash +def test_aggregate_hashing_of_data(fs): + # the data to hash + data = b"media-hash-list" + + # map of hash algorithm to expected hash value + hash_type_and_value = { + "md5": "9db0fc9f30f5ee70041a7538809e2858", + "sha1": "7b57673ac5633937a55b59009ad0c57ee08188b7", + "xxh32": "f67c5a4f", + "xxh64": "584b2ea1974f2b7c", + "xxh3": "6d4cbd75905c81aa", + "xxh128": "61a67c014f703a456ee7a776fd8c06bd", + "c4": "c456LycWwpMMS7VDZEKvYv2L1uJS6s4qAFnaJdnQiy5JVbBFZMA8aLDS6SPaJjLqxXH4qZdnbuktopMt9frtC2qL1R", + } + + # Generate the hash pairings for the file and specified formats + hash_lookup = multiple_format_hash_data(data, hash_type_and_value.keys()) + + # Make sure each pair's value matches the known hash value + evaluated_formats = [] + for hash_format, hash_value in hash_lookup.items(): + known_hash = hash_type_and_value[hash_format] + evaluated_formats.append(hash_format) + assert hash_value == known_hash + + # Make sure each stored format was represented in the hash pairings + for k in hash_type_and_value: + assert k in evaluated_formats + + def test_hash_file(fs): # write some data to a file so we can hash it. file, data = "/data-file.txt", "media-hash-list" @@ -57,6 +87,36 @@ def test_hash_file(fs): assert h == v # assert our computed hash equals expected hash +def test_aggregate_hashing_of_file(fs): + # write some data to a file so we can hash it. + file, data = "/data-file.txt", "media-hash-list" + fs.create_file(file, contents=data) + # map of hash algorithm to expected hash value + hash_type_and_value = { + "md5": "9db0fc9f30f5ee70041a7538809e2858", + "sha1": "7b57673ac5633937a55b59009ad0c57ee08188b7", + "xxh32": "f67c5a4f", + "xxh64": "584b2ea1974f2b7c", + "xxh3": "6d4cbd75905c81aa", + "xxh128": "61a67c014f703a456ee7a776fd8c06bd", + "c4": "c456LycWwpMMS7VDZEKvYv2L1uJS6s4qAFnaJdnQiy5JVbBFZMA8aLDS6SPaJjLqxXH4qZdnbuktopMt9frtC2qL1R", + } + + # Generate the hash pairings for the file and specified formats + hash_lookup = multiple_format_hash_file(file, hash_type_and_value.keys()) + + # Make sure each pair's value matches the known hash value + evaluated_formats = [] + for hash_format, hash_value in hash_lookup.items(): + known_hash = hash_type_and_value[hash_format] + evaluated_formats.append(hash_format) + assert hash_value == known_hash + + # Make sure each stored format was represented in the hash pairings + for k in hash_type_and_value: + assert k in evaluated_formats + + def test_hash_of_hash_list(): # the hash list to hash - purposefully in reverse order here to ensure data ends up sorted. hash_list = ["bb", "aa10", "aa01", "aa"]