Skip to content

Commit

Permalink
proper handling for finisher
Browse files Browse the repository at this point in the history
  • Loading branch information
C-K-Loan committed Jan 18, 2024
1 parent da5c47c commit e43607b
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 27 deletions.
25 changes: 16 additions & 9 deletions nlu/pipe/col_substitution/col_name_substitution_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@
- we remove all _<field> suffixex
- replace all '@' with '_'
"""
import logging
from typing import List

from nlu.universe.feature_node_ids import NLP_NODE_IDS
from sparknlp.annotator import *

import nlu
from nlu.universe.feature_universes import NLP_FEATURES
from nlu.pipe.col_substitution import substitution_map_OS
from nlu.pipe.col_substitution import col_substitution_OS
import logging

from nlu.pipe.extractors.extractor_base_data_classes import SparkOCRExtractorConfig
from nlu.universe.feature_universes import NLP_FEATURES
from nlu.universe.logic_universes import AnnoTypes
from nlu.universe.universes import Licenses

Expand Down Expand Up @@ -47,7 +46,6 @@ class ColSubstitutionUtils:
"""Utils for substituting col names in Pythonify to short and meaningful names.
Uses custom rename methods for either PySpark or Pandas
"""
from sparknlp.annotator import MarianTransformer
cleanable_splits = ['ner_converter', 'spell', 'ner_to_chunk_converter', 'train', 'classify', 'ner', 'med_ner', 'dl',
'match', 'clean', 'sentiment', 'embed', 'embed_sentence', 'embed_chunk', 'explain', 'pos',
'resolve_chunk', 'resolve', ]
Expand All @@ -66,7 +64,6 @@ def substitute_col_names(df, anno_2_ex, pipe, stranger_cols=[], get_embeddings=F
anno2final_cols = {} # mapping of final col names to annotator class Key=AnnoModel, Value=List of Result cols
new_cols = {}
if pipe.has_licensed_components:
from nlu.pipe.col_substitution import col_substitution_HC
from nlu.pipe.col_substitution import substitution_map_HC
deducted_component_names = ColSubstitutionUtils.deduct_component_names(pipe)
for c in pipe.components:
Expand All @@ -79,7 +76,9 @@ def substitute_col_names(df, anno_2_ex, pipe, stranger_cols=[], get_embeddings=F
continue
if 'embedding' in c.type and get_embeddings == False: continue
cols_to_substitute = ColSubstitutionUtils.get_final_output_cols_of_component(c, df, anno_2_ex)

if len(cols_to_substitute) == 0:
# finisher cleaned components cols
continue
if type(c.model) in substitution_map_OS.OS_anno2substitution_fn.keys():
substitution_fn = substitution_map_OS.OS_anno2substitution_fn[type(c.model)]['default']
else:
Expand All @@ -104,7 +103,8 @@ def substitute_col_names(df, anno_2_ex, pipe, stranger_cols=[], get_embeddings=F
for k in cols_to_rename:
# some cols might not exist because no annotations generated, so we need to double check it really exists
if k not in df.columns: del new_cols[k]
return df.rename(columns=new_cols)[list(set(new_cols.values()).union(set(stranger_cols)))] if drop_debug_cols else \
return df.rename(columns=new_cols)[
list(set(new_cols.values()).union(set(stranger_cols)))] if drop_debug_cols else \
df.rename(columns=new_cols)

@staticmethod
Expand All @@ -113,8 +113,15 @@ def get_final_output_cols_of_component(c, df, anno_2_ex) -> List[str]:
"""Get's a list of all columns that have been derived in the pythonify procedure from the component_to_resolve
os_components in dataframe df for anno_2_ex configs """
og_output_col = c.spark_output_column_names[0]

# may be missing because finisher cleaning
if og_output_col not in df.columns: return []
configs = anno_2_ex[og_output_col]

if c.name == NLP_NODE_IDS.FINISHER:
result_cols = c.model.getOutputCols()
if c.model.getIncludeMetadata():
result_cols = result_cols + [f'{col}_metadata' for col in result_cols]
return result_cols
result_cols = []
if isinstance(configs, SparkOCRExtractorConfig):
# TODO better OCR-EX handling --> Col Name generator function which we use everywhere for unified col naming !!!!!
Expand Down
6 changes: 6 additions & 0 deletions nlu/pipe/col_substitution/col_substitution_OS.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,12 @@ def substitute_doc_norm_cols(c, cols, nlu_identifier=True):
return new_cols


def substitute_finisher_cols(c, cols, nlu_identifier=True):
"""
Substitute col name finisher. For now keeps original name
"""
return dict(zip(cols,cols))

def substitute_spell_context_cols(c, cols, nlu_identifier=True):
"""
Substitute col name for normalized, <spell> will be new base col namem
Expand Down
11 changes: 11 additions & 0 deletions nlu/pipe/extractors/extractor_base_data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,14 @@ class SparkOCRExtractorConfig(SparkNLPExtractorConfig):
get_exception: bool = field(default=False)
# Position struct fields
get_img_positions: bool = field(default=False)

@dataclass
class FinisherExtractorConfig:
"""
Universal Configuration class for defining how to extract data from a Finisher
"""
source_col_name: str
output_as_array: bool = field(default=True)
is_meta_field: bool = field(default=False)
annotation_split_symbol:Optional[bool]=field(default=None)
value_split_symbol:Optional[bool]=field(default=None)
13 changes: 12 additions & 1 deletion nlu/pipe/extractors/extractor_configs_OS.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"""

from nlu.pipe.extractors.extractor_base_data_classes import SparkNLPExtractor, SparkNLPExtractorConfig
from nlu.pipe.extractors.extractor_base_data_classes import SparkNLPExtractor, SparkNLPExtractorConfig, \
FinisherExtractorConfig
from nlu.pipe.extractors.extractor_methods.helper_extractor_methods import *

"""
Expand Down Expand Up @@ -41,6 +42,16 @@ def default_full_config(output_col_prefix='DEFAULT'):

)

def default_finisher_config(output_col_prefix='DEFAULT'):
return FinisherExtractorConfig(
output_as_array=True,
is_meta_field=False,
annotation_split_symbol=None,
value_split_symbol=None,
# clean_annotations=True,

)


def default_document_config(output_col_prefix='document'):
return SparkNLPExtractorConfig(
Expand Down
21 changes: 20 additions & 1 deletion nlu/pipe/extractors/extractor_methods/base_extractor_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def extract_base_sparknlp_features(row: pd.Series, configs: SparkNLPExtractorCon

return {**beginnings, **endings, **results, **annotator_types, **embeddings, **origins} # Merge dicts NLP output


def extract_sparknlp_metadata(row: pd.Series, configs: SparkNLPExtractorConfig) -> dict:
"""
Extract base features common in all saprk NLP annotators
Expand Down Expand Up @@ -267,7 +268,25 @@ def extract_master(row: pd.Series, configs: SparkNLPExtractorConfig) -> pd.Serie
if isinstance(configs, SparkOCRExtractorConfig):
base_annos = extract_base_sparkocr_features(row, configs)
else:
base_annos = extract_base_sparknlp_features(row, configs)
if isinstance(configs, FinisherExtractorConfig):
# 1. if normal finisher col, just return val -> {finisher_col: [values]}
# 2. if meta finisher col, return grouped meta dict ->{m1: [v1,v2,..], m2: [v1,v2,..], ...}
# 3. double check with other anno behaviour
if configs.is_meta_field:
return pd.Series(
{
configs.source_col_name: row
})
else:
return pd.Series(
{
configs.source_col_name: row
})

else:
base_annos = extract_base_sparknlp_features(row, configs)

# TODO proper finsiher handling!
# Get Metadata
all_metas = extract_sparknlp_metadata(row, configs) if configs.get_meta or configs.get_full_meta else {}

Expand Down
32 changes: 28 additions & 4 deletions nlu/pipe/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from nlu.pipe.utils.data_conversion_utils import DataConversionUtils
from nlu.pipe.utils.output_level_resolution_utils import OutputLevelUtils
from nlu.pipe.utils.resolution.storage_ref_utils import StorageRefUtils
from nlu.universe.feature_node_ids import NLP_NODE_IDS
from nlu.universe.universes import Licenses
from nlu.utils.environment.env_utils import is_running_in_databricks, try_import_streamlit

Expand Down Expand Up @@ -230,6 +231,9 @@ def get_extraction_configs(self, full_meta, positions, get_embeddings, processed
continue
if 'embedding' in c.type and not get_embeddings:
continue
if c.name == NLP_NODE_IDS.FINISHER:
anno_2_ex_config = {**anno_2_ex_config, **self.__get_finisher_conf(c)}
continue
for col in c.spark_output_column_names:
if 'default' in c.pdf_extractor_methods.keys() and not full_meta:
anno_2_ex_config[col] = c.pdf_extractor_methods['default'](output_col_prefix=col)
Expand Down Expand Up @@ -330,19 +334,19 @@ def pythonify_spark_dataframe(self, processed,
self.prediction_output_level = output_level

# Get mapping from component to feature extractor method configs
anno_2_ex_config = self.get_extraction_configs(output_metadata, positions, get_embeddings, processed)
col_2_ex_config = self.get_extraction_configs(output_metadata, positions, get_embeddings, processed)

# Processed becomes pandas after applying extractors
processed = self.unpack_and_apply_extractors(processed, keep_stranger_features, stranger_features,
anno_2_ex_config, self.light_pipe_configured, get_embeddings)
col_2_ex_config, self.light_pipe_configured, get_embeddings)

# Get mapping between column_name and pipe_prediction_output_level
same_level = OutputLevelUtils.get_columns_at_same_level_of_pipe(self, processed, anno_2_ex_config,
same_level = OutputLevelUtils.get_columns_at_same_level_of_pipe(self, processed, col_2_ex_config,
get_embeddings)
logger.info(f"Extracting for same_level_cols = {same_level}\n")
processed = zip_and_explode(processed, same_level)
processed = self.convert_embeddings_to_np(processed)
processed = ColSubstitutionUtils.substitute_col_names(processed, anno_2_ex_config, self, stranger_features,
processed = ColSubstitutionUtils.substitute_col_names(processed, col_2_ex_config, self, stranger_features,
get_embeddings)
processed = processed.loc[:, ~processed.columns.duplicated()]

Expand Down Expand Up @@ -943,3 +947,23 @@ def __configure_light_pipe_usage__(self, data_instances, use_multi=True, force=F
self.light_pipe_configured = True
logger.info("Enabling light pipeline")
self.light_transformer_pipe = LightPipeline(self.vanilla_transformer_pipe, parse_embeddings=True)

@staticmethod
def __get_finisher_conf(finisher: NluComponent) -> Dict[str, FinisherExtractorConfig]:
"""
returns a dict where key=col name and value=FinisherExtractorConfig for that col for pipe and finisher.
For finisher we need to know for each col: if its meta-field, out_put_as_arrr and if not what are sep symbols
"""

confs = {}
m = finisher.model
as_arr = m.getOutputAsArray()
for c in m.getOutputCols():
confs[c] = FinisherExtractorConfig(output_as_array=as_arr,
is_meta_field=True if c.endswith('_metadata') else False,
annotation_split_symbol=m.getAnnotationSplitSymbol(),
value_split_symbol=m.getValueSplitSymbol(),
source_col_name=c,
)

return confs
43 changes: 39 additions & 4 deletions nlu/pipe/utils/output_level_resolution_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import Dict, Any, List
import logging
from typing import Dict, Any, List

from nlu.pipe.col_substitution.col_name_substitution_utils import ColSubstitutionUtils
from nlu.universe.atoms import NlpLevel
from nlu.universe.feature_node_ids import NLP_HC_NODE_IDS
from nlu.universe.logic_universes import NLP_LEVELS
from nlu.universe.feature_node_ids import NLP_HC_NODE_IDS, NLP_NODE_IDS
from nlu.universe.feature_universes import NLP_FEATURES
from nlu.universe.logic_universes import NLP_LEVELS
from nlu.universe.universes import Licenses
from nlu.pipe.col_substitution.col_name_substitution_utils import ColSubstitutionUtils

logger = logging.getLogger('nlu')


Expand Down Expand Up @@ -84,9 +86,42 @@ def get_columns_at_same_level_of_pipe(pipe, df, anno_2_ex_config, get_embeddings
at same output level as the pipe.prediction_output_level
"""
same_output_level_cols = []
# if finisher exist, we must handle it first separately and blacklist all annos if it uses clean anno cols
cleaned_components = []
for c in pipe.components:
if c.name == NLP_NODE_IDS.FINISHER:
"""
Every col needs to have its own output level
for every input col of finisher:
1. Find provider component
2. Resolve output level of provider component
3. Use output level of provider component as output level of finisher col
"""
cleaned = c.model.getCleanAnnotations()
get_meta = c.model.getIncludeMetadata()
for col_in, col_out in zip(c.model.getInputCols(), c.model.getOutputCols()):
for provider_component in pipe.components:
if cleaned:
cleaned_components.append(provider_component)

m = provider_component.model
single_out = hasattr(m, 'getOutputCol')
if single_out and col_in in m.getOutputCol() or not single_out and col_in in m.getOutputCols():
output_level = OutputLevelUtils.resolve_component_to_output_level(pipe, provider_component)
if output_level == pipe.prediction_output_level:
# Normally need to use generated col, but finisher cols are not generated except meta?!
same_output_level_cols.append(col_out)
if get_meta:
same_output_level_cols.append(col_out + '_metadata')
continue

for c in pipe.components:
if c in cleaned_components:
continue
if 'embedding' in c.type and get_embeddings is False:
continue
if c.name == NLP_NODE_IDS.FINISHER:
continue
output_level = OutputLevelUtils.resolve_component_to_output_level(pipe, c)
if output_level == pipe.prediction_output_level:
generated_cols = ColSubstitutionUtils.get_final_output_cols_of_component(c, df, anno_2_ex_config)
Expand Down
3 changes: 2 additions & 1 deletion nlu/pipe/utils/pipe_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import nlu
from nlu import Licenses
from nlu.pipe.extractors.extractor_base_data_classes import FinisherExtractorConfig
from nlu.pipe.nlu_component import NluComponent
from nlu.pipe.pipeline import NLUPipeline
from nlu.pipe.utils.resolution.storage_ref_utils import StorageRefUtils
Expand All @@ -15,7 +16,7 @@

logger = logging.getLogger('nlu')
from nlu.pipe.utils.component_utils import ComponentUtils
from typing import List, Union
from typing import List, Union, Dict
from nlu.universe.annotator_class_universe import AnnoClassRef
from nlu.utils.environment.env_utils import is_running_in_databricks
import os
Expand Down
11 changes: 4 additions & 7 deletions nlu/universe/component_universes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,19 +1566,16 @@ class ComponentUniverse:
jsl_anno_py_class=ACR.JSL_anno2_py_class[A.DOCUMENT_NORMALIZER],
),

A.FINISHER: partial(NluComponent, # TODO WIP
A.FINISHER: partial(NluComponent,
name=A.FINISHER,
type=T.HELPER_ANNO,
get_default_model=SdfFinisher.get_default_model,
# TODO EXTRACTOR
pdf_extractor_methods={'default': default_full_config,
'default_full': default_full_config, },
# TODO SUBSTITOR
pdf_col_name_substitutor=None, # TODO no sub defined
pdf_col_name_substitutor=substitute_finisher_cols,
output_level=L.DOCUMENT,
# TODO sub-token actually(?)
node=NLP_FEATURE_NODES.nodes[A.FINISHER],
description='Get lemmatized base version of tokens',
description='Finisher transformer to output the results of a pipeline.',
provider=ComponentBackends.open_source,
license=Licenses.open_source,
computation_context=ComputeContexts.spark,
Expand Down Expand Up @@ -3549,7 +3546,7 @@ class ComponentUniverse:
pdf_extractor_methods={'default': default_de_identification_config,
'default_full': default_full_config, },
pdf_col_name_substitutor=substitute_de_identification_cols,
output_level=L.DOCUMENT,
output_level=L.INPUT_DEPENDENT_DOCUMENT_CLASSIFIER,
node=NLP_HC_FEATURE_NODES.nodes[H_A.DE_IDENTIFICATION],
description='De-Identify named entity according to various Healthcare Data Protection standards',
provider=ComponentBackends.hc,
Expand Down

0 comments on commit e43607b

Please sign in to comment.