diff --git a/schema/ingestion_config/v1.0.0/ingestion_config_models_extended.py b/schema/ingestion_config/v1.0.0/ingestion_config_models_extended.py index 56698e0cf..b661b25ba 100644 --- a/schema/ingestion_config/v1.0.0/ingestion_config_models_extended.py +++ b/schema/ingestion_config/v1.0.0/ingestion_config_models_extended.py @@ -79,6 +79,7 @@ ("Gatan", "GATAN"): ["K2", "K2 SUMMIT", "K3", "K3 BIOQUANTUM", "UltraCam", "UltraScan"], ("simulated"): ["simulated"], } +SOURCE_CHILDREN_NON_FINDERS = {"parent_filters", "exclude"} # Flag to determine if network validation should be run (set by provided Container arg) running_network_validation = False @@ -504,43 +505,27 @@ def validate_publications(publications: Optional[str]) -> Optional[str]: def validate_sources(source_list: List[StandardSource] | List[VoxelSpacingSource]) -> None: total_errors = [] - # For verifying that all source entries each only have one finder type - multiple_finders_in_each_source_entries_errors = [] - only_parent_filter_in_source_entry_errors = [] - only_exclude_in_source_entry_errors = [] - - for i, source_element in enumerate(source_list): + for index, source_element in enumerate(source_list): finders_in_source_entry = [] - has_parent_filters = False - has_exclude = False for finder in source_element.model_fields: - # If the finder is not None, add it to the list of finders in the source entry - if getattr(source_element, finder) is None: - continue - if finder == "exclude": - has_exclude = True - elif finder == "parent_filters": - has_parent_filters = True - else: + # If the finder is not None and an actual finder, add it to the list of finders in the source entry + if getattr(source_element, finder) is not None and finder not in SOURCE_CHILDREN_NON_FINDERS: finders_in_source_entry.append(finder) # If there are more than one finder in the source entry, add the source entry index and the finders to the error set if len(finders_in_source_entry) > 1: - multiple_finders_in_each_source_entries_errors.append((i, finders_in_source_entry)) - if len(finders_in_source_entry) == 0 and has_parent_filters: + total_errors.append( + ValueError( + f"Source entry {index} cannot have multiple finders (split into multiple entries): {finders_in_source_entry}", + ), + ) + if len(finders_in_source_entry) == 0: # means there's only a parent_filters entry - only_parent_filter_in_source_entry_errors.append(i) - if len(finders_in_source_entry) == 0 and has_exclude: - # means there's only an exclude entry - only_exclude_in_source_entry_errors.append(i) - - for i, finders in multiple_finders_in_each_source_entries_errors: - total_errors.append( - ValueError(f"Source entry {i} cannot have multiple finders (split into multiple entries): {finders}"), - ) - for index in only_parent_filter_in_source_entry_errors: - total_errors.append(ValueError(f"Source entry {index} cannot have a parent_filters entry without a finder")) - for index in only_exclude_in_source_entry_errors: - total_errors.append(ValueError(f"Source entry {index} cannot have an exclude entry without a finder")) + for source_child in SOURCE_CHILDREN_NON_FINDERS: + if getattr(source_element, source_child) is None: + continue + total_errors.append( + ValueError(f"Source entry {index} cannot have a {source_child} entry without a finder"), + ) if total_errors: raise ValueError(total_errors) @@ -633,8 +618,6 @@ def valid_sources(cls: Self, source_list: List[AnnotationSource]) -> List[Annota used_shapes.add(shape) # For verifying that all source entries each only have one shape entry - multiple_shapes_in_each_source_entries_errors = [] - for i, source_element in enumerate(source_list): shapes_in_source_entry = [] for shape in source_element.model_fields: @@ -647,11 +630,13 @@ def valid_sources(cls: Self, source_list: List[AnnotationSource]) -> List[Annota shapes_in_source_entry.append(shape) # If there are more than one shape in the source entry, add the source entry index and the shapes to the error set if len(shapes_in_source_entry) > 1: - multiple_shapes_in_each_source_entries_errors.append((i, shapes_in_source_entry)) + total_errors.append( + ValueError( + f"Source entry {i} cannot have multiple shapes (split into multiple entries): {shapes_in_source_entry}", + ), + ) # For verifying that all source entries have exactly one of glob_string and glob_strings - multiple_glob_strings_errors = [] - for i, source_element in enumerate(source_list): for shape in source_element.model_fields: source_entry: AnnotationSourceFile | None = getattr(source_element, shape) @@ -661,12 +646,13 @@ def valid_sources(cls: Self, source_list: List[AnnotationSource]) -> List[Annota has_glob_string = getattr(source_entry, "glob_string", None) is not None has_glob_strings = getattr(source_entry, "glob_strings", None) is not None and source_entry.glob_strings if has_glob_string and has_glob_strings: - multiple_glob_strings_errors.append((i, shape)) + total_errors.append( + ValueError( + f"Source entry {i} shape {shape} must have exactly one of glob_string or glob_strings", + ), + ) # For verifying that all source entries do not only have parent_filters or exclude - only_parent_filter_in_source_entry_errors = [] - only_exclude_in_source_entry_errors = [] - for i, source_element in enumerate(source_list): has_parent_filters = False has_exclude = False @@ -682,26 +668,14 @@ def valid_sources(cls: Self, source_list: List[AnnotationSource]) -> List[Annota has_shape = True if not has_shape and has_parent_filters: - only_parent_filter_in_source_entry_errors.append(i) + total_errors.append(ValueError(f"Source entry {i} cannot have a parent_filters entry without a shape")) if not has_shape and has_exclude: - only_exclude_in_source_entry_errors.append(i) + total_errors.append(ValueError(f"Source entry {i} cannot have an exclude entry without a shape")) if shapes_used_multiple_times_errors: total_errors.append( ValueError(f"Annotation cannot have multiple same-shape sources: {shapes_used_multiple_times_errors}"), ) - for i, shapes in multiple_shapes_in_each_source_entries_errors: - total_errors.append( - ValueError(f"Source entry {i} cannot have multiple shapes (split into multiple entries): {shapes}"), - ) - for i, shape in multiple_glob_strings_errors: - total_errors.append( - ValueError(f"Source entry {i} shape {shape} must have exactly one of glob_string or glob_strings"), - ) - for index in only_parent_filter_in_source_entry_errors: - total_errors.append(ValueError(f"Source entry {index} cannot have a parent_filters entry without a shape")) - for index in only_exclude_in_source_entry_errors: - total_errors.append(ValueError(f"Source entry {index} cannot have an exclude entry without a shape")) if total_errors: raise ValueError(total_errors) diff --git a/schema/ingestion_config/v1.0.0/ingestion_config_validate.py b/schema/ingestion_config/v1.0.0/ingestion_config_validate.py index f14703c79..65a4807c7 100644 --- a/schema/ingestion_config/v1.0.0/ingestion_config_validate.py +++ b/schema/ingestion_config/v1.0.0/ingestion_config_validate.py @@ -12,6 +12,7 @@ from ingestion_config_models_extended import ExtendedValidationContainer from pydantic import ValidationError +ORIGINAL_DIR = os.getcwd() os.chdir(os.path.dirname(os.path.abspath(__file__))) ROOT_DIR = "../../../" sys.path.append(ROOT_DIR) # To import the helper function from common.py @@ -153,7 +154,7 @@ def main( # if input files are relative paths, make them absolute if input_files: - input_files = [os.path.abspath(file) for file in input_files] + input_files = [os.path.join(ORIGINAL_DIR, file) if not os.path.isabs(file) else file for file in input_files] files_to_validate = get_yaml_config_files( input_files=input_files,