diff --git a/nlu/pipe/col_substitution/col_name_substitution_utils.py b/nlu/pipe/col_substitution/col_name_substitution_utils.py index 6ffb2a80..d4532dd4 100644 --- a/nlu/pipe/col_substitution/col_name_substitution_utils.py +++ b/nlu/pipe/col_substitution/col_name_substitution_utils.py @@ -6,17 +6,16 @@ - we remove all _ suffixex - replace all '@' with '_' """ +import logging from typing import List +from nlu.universe.feature_node_ids import NLP_NODE_IDS from sparknlp.annotator import * import nlu -from nlu.universe.feature_universes import NLP_FEATURES from nlu.pipe.col_substitution import substitution_map_OS -from nlu.pipe.col_substitution import col_substitution_OS -import logging - from nlu.pipe.extractors.extractor_base_data_classes import SparkOCRExtractorConfig +from nlu.universe.feature_universes import NLP_FEATURES from nlu.universe.logic_universes import AnnoTypes from nlu.universe.universes import Licenses @@ -47,7 +46,6 @@ class ColSubstitutionUtils: """Utils for substituting col names in Pythonify to short and meaningful names. Uses custom rename methods for either PySpark or Pandas """ - from sparknlp.annotator import MarianTransformer cleanable_splits = ['ner_converter', 'spell', 'ner_to_chunk_converter', 'train', 'classify', 'ner', 'med_ner', 'dl', 'match', 'clean', 'sentiment', 'embed', 'embed_sentence', 'embed_chunk', 'explain', 'pos', 'resolve_chunk', 'resolve', ] @@ -66,7 +64,6 @@ def substitute_col_names(df, anno_2_ex, pipe, stranger_cols=[], get_embeddings=F anno2final_cols = {} # mapping of final col names to annotator class Key=AnnoModel, Value=List of Result cols new_cols = {} if pipe.has_licensed_components: - from nlu.pipe.col_substitution import col_substitution_HC from nlu.pipe.col_substitution import substitution_map_HC deducted_component_names = ColSubstitutionUtils.deduct_component_names(pipe) for c in pipe.components: @@ -79,7 +76,9 @@ def substitute_col_names(df, anno_2_ex, pipe, stranger_cols=[], get_embeddings=F continue if 'embedding' in c.type and get_embeddings == False: continue cols_to_substitute = ColSubstitutionUtils.get_final_output_cols_of_component(c, df, anno_2_ex) - + if len(cols_to_substitute) == 0: + # finisher cleaned components cols + continue if type(c.model) in substitution_map_OS.OS_anno2substitution_fn.keys(): substitution_fn = substitution_map_OS.OS_anno2substitution_fn[type(c.model)]['default'] else: @@ -104,7 +103,8 @@ def substitute_col_names(df, anno_2_ex, pipe, stranger_cols=[], get_embeddings=F for k in cols_to_rename: # some cols might not exist because no annotations generated, so we need to double check it really exists if k not in df.columns: del new_cols[k] - return df.rename(columns=new_cols)[list(set(new_cols.values()).union(set(stranger_cols)))] if drop_debug_cols else \ + return df.rename(columns=new_cols)[ + list(set(new_cols.values()).union(set(stranger_cols)))] if drop_debug_cols else \ df.rename(columns=new_cols) @staticmethod @@ -113,8 +113,15 @@ def get_final_output_cols_of_component(c, df, anno_2_ex) -> List[str]: """Get's a list of all columns that have been derived in the pythonify procedure from the component_to_resolve os_components in dataframe df for anno_2_ex configs """ og_output_col = c.spark_output_column_names[0] - + # may be missing because finisher cleaning + if og_output_col not in df.columns: return [] configs = anno_2_ex[og_output_col] + + if c.name == NLP_NODE_IDS.FINISHER: + result_cols = c.model.getOutputCols() + if c.model.getIncludeMetadata(): + result_cols = result_cols + [f'{col}_metadata' for col in result_cols] + return result_cols result_cols = [] if isinstance(configs, SparkOCRExtractorConfig): # TODO better OCR-EX handling --> Col Name generator function which we use everywhere for unified col naming !!!!! diff --git a/nlu/pipe/col_substitution/col_substitution_OS.py b/nlu/pipe/col_substitution/col_substitution_OS.py index 984a6282..3b450f10 100644 --- a/nlu/pipe/col_substitution/col_substitution_OS.py +++ b/nlu/pipe/col_substitution/col_substitution_OS.py @@ -744,6 +744,12 @@ def substitute_doc_norm_cols(c, cols, nlu_identifier=True): return new_cols +def substitute_finisher_cols(c, cols, nlu_identifier=True): + """ + Substitute col name finisher. For now keeps original name + """ + return dict(zip(cols,cols)) + def substitute_spell_context_cols(c, cols, nlu_identifier=True): """ Substitute col name for normalized, will be new base col namem diff --git a/nlu/pipe/extractors/extractor_base_data_classes.py b/nlu/pipe/extractors/extractor_base_data_classes.py index 738f737f..4a4df6c9 100644 --- a/nlu/pipe/extractors/extractor_base_data_classes.py +++ b/nlu/pipe/extractors/extractor_base_data_classes.py @@ -150,3 +150,14 @@ class SparkOCRExtractorConfig(SparkNLPExtractorConfig): get_exception: bool = field(default=False) # Position struct fields get_img_positions: bool = field(default=False) + +@dataclass +class FinisherExtractorConfig: + """ + Universal Configuration class for defining how to extract data from a Finisher + """ + source_col_name: str + output_as_array: bool = field(default=True) + is_meta_field: bool = field(default=False) + annotation_split_symbol:Optional[bool]=field(default=None) + value_split_symbol:Optional[bool]=field(default=None) diff --git a/nlu/pipe/extractors/extractor_configs_OS.py b/nlu/pipe/extractors/extractor_configs_OS.py index 97bd6089..ddada467 100644 --- a/nlu/pipe/extractors/extractor_configs_OS.py +++ b/nlu/pipe/extractors/extractor_configs_OS.py @@ -10,7 +10,8 @@ """ -from nlu.pipe.extractors.extractor_base_data_classes import SparkNLPExtractor, SparkNLPExtractorConfig +from nlu.pipe.extractors.extractor_base_data_classes import SparkNLPExtractor, SparkNLPExtractorConfig, \ + FinisherExtractorConfig from nlu.pipe.extractors.extractor_methods.helper_extractor_methods import * """ @@ -41,6 +42,16 @@ def default_full_config(output_col_prefix='DEFAULT'): ) +def default_finisher_config(output_col_prefix='DEFAULT'): + return FinisherExtractorConfig( + output_as_array=True, + is_meta_field=False, + annotation_split_symbol=None, + value_split_symbol=None, + # clean_annotations=True, + + ) + def default_document_config(output_col_prefix='document'): return SparkNLPExtractorConfig( diff --git a/nlu/pipe/extractors/extractor_methods/base_extractor_methods.py b/nlu/pipe/extractors/extractor_methods/base_extractor_methods.py index d4e91dec..04384acc 100644 --- a/nlu/pipe/extractors/extractor_methods/base_extractor_methods.py +++ b/nlu/pipe/extractors/extractor_methods/base_extractor_methods.py @@ -206,6 +206,7 @@ def extract_base_sparknlp_features(row: pd.Series, configs: SparkNLPExtractorCon return {**beginnings, **endings, **results, **annotator_types, **embeddings, **origins} # Merge dicts NLP output + def extract_sparknlp_metadata(row: pd.Series, configs: SparkNLPExtractorConfig) -> dict: """ Extract base features common in all saprk NLP annotators @@ -267,7 +268,25 @@ def extract_master(row: pd.Series, configs: SparkNLPExtractorConfig) -> pd.Serie if isinstance(configs, SparkOCRExtractorConfig): base_annos = extract_base_sparkocr_features(row, configs) else: - base_annos = extract_base_sparknlp_features(row, configs) + if isinstance(configs, FinisherExtractorConfig): + # 1. if normal finisher col, just return val -> {finisher_col: [values]} + # 2. if meta finisher col, return grouped meta dict ->{m1: [v1,v2,..], m2: [v1,v2,..], ...} + # 3. double check with other anno behaviour + if configs.is_meta_field: + return pd.Series( + { + configs.source_col_name: row + }) + else: + return pd.Series( + { + configs.source_col_name: row + }) + + else: + base_annos = extract_base_sparknlp_features(row, configs) + + # TODO proper finsiher handling! # Get Metadata all_metas = extract_sparknlp_metadata(row, configs) if configs.get_meta or configs.get_full_meta else {} diff --git a/nlu/pipe/pipeline.py b/nlu/pipe/pipeline.py index f0cfc749..966d03f7 100644 --- a/nlu/pipe/pipeline.py +++ b/nlu/pipe/pipeline.py @@ -16,6 +16,7 @@ from nlu.pipe.utils.data_conversion_utils import DataConversionUtils from nlu.pipe.utils.output_level_resolution_utils import OutputLevelUtils from nlu.pipe.utils.resolution.storage_ref_utils import StorageRefUtils +from nlu.universe.feature_node_ids import NLP_NODE_IDS from nlu.universe.universes import Licenses from nlu.utils.environment.env_utils import is_running_in_databricks, try_import_streamlit @@ -230,6 +231,9 @@ def get_extraction_configs(self, full_meta, positions, get_embeddings, processed continue if 'embedding' in c.type and not get_embeddings: continue + if c.name == NLP_NODE_IDS.FINISHER: + anno_2_ex_config = {**anno_2_ex_config, **self.__get_finisher_conf(c)} + continue for col in c.spark_output_column_names: if 'default' in c.pdf_extractor_methods.keys() and not full_meta: anno_2_ex_config[col] = c.pdf_extractor_methods['default'](output_col_prefix=col) @@ -330,19 +334,19 @@ def pythonify_spark_dataframe(self, processed, self.prediction_output_level = output_level # Get mapping from component to feature extractor method configs - anno_2_ex_config = self.get_extraction_configs(output_metadata, positions, get_embeddings, processed) + col_2_ex_config = self.get_extraction_configs(output_metadata, positions, get_embeddings, processed) # Processed becomes pandas after applying extractors processed = self.unpack_and_apply_extractors(processed, keep_stranger_features, stranger_features, - anno_2_ex_config, self.light_pipe_configured, get_embeddings) + col_2_ex_config, self.light_pipe_configured, get_embeddings) # Get mapping between column_name and pipe_prediction_output_level - same_level = OutputLevelUtils.get_columns_at_same_level_of_pipe(self, processed, anno_2_ex_config, + same_level = OutputLevelUtils.get_columns_at_same_level_of_pipe(self, processed, col_2_ex_config, get_embeddings) logger.info(f"Extracting for same_level_cols = {same_level}\n") processed = zip_and_explode(processed, same_level) processed = self.convert_embeddings_to_np(processed) - processed = ColSubstitutionUtils.substitute_col_names(processed, anno_2_ex_config, self, stranger_features, + processed = ColSubstitutionUtils.substitute_col_names(processed, col_2_ex_config, self, stranger_features, get_embeddings) processed = processed.loc[:, ~processed.columns.duplicated()] @@ -943,3 +947,23 @@ def __configure_light_pipe_usage__(self, data_instances, use_multi=True, force=F self.light_pipe_configured = True logger.info("Enabling light pipeline") self.light_transformer_pipe = LightPipeline(self.vanilla_transformer_pipe, parse_embeddings=True) + + @staticmethod + def __get_finisher_conf(finisher: NluComponent) -> Dict[str, FinisherExtractorConfig]: + """ + returns a dict where key=col name and value=FinisherExtractorConfig for that col for pipe and finisher. + For finisher we need to know for each col: if its meta-field, out_put_as_arrr and if not what are sep symbols + """ + + confs = {} + m = finisher.model + as_arr = m.getOutputAsArray() + for c in m.getOutputCols(): + confs[c] = FinisherExtractorConfig(output_as_array=as_arr, + is_meta_field=True if c.endswith('_metadata') else False, + annotation_split_symbol=m.getAnnotationSplitSymbol(), + value_split_symbol=m.getValueSplitSymbol(), + source_col_name=c, + ) + + return confs diff --git a/nlu/pipe/utils/output_level_resolution_utils.py b/nlu/pipe/utils/output_level_resolution_utils.py index 56674cb2..a985be92 100644 --- a/nlu/pipe/utils/output_level_resolution_utils.py +++ b/nlu/pipe/utils/output_level_resolution_utils.py @@ -1,11 +1,13 @@ -from typing import Dict, Any, List import logging +from typing import Dict, Any, List + +from nlu.pipe.col_substitution.col_name_substitution_utils import ColSubstitutionUtils from nlu.universe.atoms import NlpLevel -from nlu.universe.feature_node_ids import NLP_HC_NODE_IDS -from nlu.universe.logic_universes import NLP_LEVELS +from nlu.universe.feature_node_ids import NLP_HC_NODE_IDS, NLP_NODE_IDS from nlu.universe.feature_universes import NLP_FEATURES +from nlu.universe.logic_universes import NLP_LEVELS from nlu.universe.universes import Licenses -from nlu.pipe.col_substitution.col_name_substitution_utils import ColSubstitutionUtils + logger = logging.getLogger('nlu') @@ -84,9 +86,42 @@ def get_columns_at_same_level_of_pipe(pipe, df, anno_2_ex_config, get_embeddings at same output level as the pipe.prediction_output_level """ same_output_level_cols = [] + # if finisher exist, we must handle it first separately and blacklist all annos if it uses clean anno cols + cleaned_components = [] for c in pipe.components: + if c.name == NLP_NODE_IDS.FINISHER: + """ + Every col needs to have its own output level + for every input col of finisher: + 1. Find provider component + 2. Resolve output level of provider component + 3. Use output level of provider component as output level of finisher col + """ + cleaned = c.model.getCleanAnnotations() + get_meta = c.model.getIncludeMetadata() + for col_in, col_out in zip(c.model.getInputCols(), c.model.getOutputCols()): + for provider_component in pipe.components: + if cleaned: + cleaned_components.append(provider_component) + + m = provider_component.model + single_out = hasattr(m, 'getOutputCol') + if single_out and col_in in m.getOutputCol() or not single_out and col_in in m.getOutputCols(): + output_level = OutputLevelUtils.resolve_component_to_output_level(pipe, provider_component) + if output_level == pipe.prediction_output_level: + # Normally need to use generated col, but finisher cols are not generated except meta?! + same_output_level_cols.append(col_out) + if get_meta: + same_output_level_cols.append(col_out + '_metadata') + continue + + for c in pipe.components: + if c in cleaned_components: + continue if 'embedding' in c.type and get_embeddings is False: continue + if c.name == NLP_NODE_IDS.FINISHER: + continue output_level = OutputLevelUtils.resolve_component_to_output_level(pipe, c) if output_level == pipe.prediction_output_level: generated_cols = ColSubstitutionUtils.get_final_output_cols_of_component(c, df, anno_2_ex_config) diff --git a/nlu/pipe/utils/pipe_utils.py b/nlu/pipe/utils/pipe_utils.py index cddad1e2..db4c16a0 100644 --- a/nlu/pipe/utils/pipe_utils.py +++ b/nlu/pipe/utils/pipe_utils.py @@ -4,6 +4,7 @@ import nlu from nlu import Licenses +from nlu.pipe.extractors.extractor_base_data_classes import FinisherExtractorConfig from nlu.pipe.nlu_component import NluComponent from nlu.pipe.pipeline import NLUPipeline from nlu.pipe.utils.resolution.storage_ref_utils import StorageRefUtils @@ -15,7 +16,7 @@ logger = logging.getLogger('nlu') from nlu.pipe.utils.component_utils import ComponentUtils -from typing import List, Union +from typing import List, Union, Dict from nlu.universe.annotator_class_universe import AnnoClassRef from nlu.utils.environment.env_utils import is_running_in_databricks import os diff --git a/nlu/universe/component_universes.py b/nlu/universe/component_universes.py index 5ff9447c..6d8062c6 100644 --- a/nlu/universe/component_universes.py +++ b/nlu/universe/component_universes.py @@ -1566,19 +1566,16 @@ class ComponentUniverse: jsl_anno_py_class=ACR.JSL_anno2_py_class[A.DOCUMENT_NORMALIZER], ), - A.FINISHER: partial(NluComponent, # TODO WIP + A.FINISHER: partial(NluComponent, name=A.FINISHER, type=T.HELPER_ANNO, get_default_model=SdfFinisher.get_default_model, - # TODO EXTRACTOR pdf_extractor_methods={'default': default_full_config, 'default_full': default_full_config, }, - # TODO SUBSTITOR - pdf_col_name_substitutor=None, # TODO no sub defined + pdf_col_name_substitutor=substitute_finisher_cols, output_level=L.DOCUMENT, - # TODO sub-token actually(?) node=NLP_FEATURE_NODES.nodes[A.FINISHER], - description='Get lemmatized base version of tokens', + description='Finisher transformer to output the results of a pipeline.', provider=ComponentBackends.open_source, license=Licenses.open_source, computation_context=ComputeContexts.spark, @@ -3549,7 +3546,7 @@ class ComponentUniverse: pdf_extractor_methods={'default': default_de_identification_config, 'default_full': default_full_config, }, pdf_col_name_substitutor=substitute_de_identification_cols, - output_level=L.DOCUMENT, + output_level=L.INPUT_DEPENDENT_DOCUMENT_CLASSIFIER, node=NLP_HC_FEATURE_NODES.nodes[H_A.DE_IDENTIFICATION], description='De-Identify named entity according to various Healthcare Data Protection standards', provider=ComponentBackends.hc,