From 2eb6e1ebe05597f18be3eac5ae75a8f81d370aea Mon Sep 17 00:00:00 2001 From: Katy Scott Date: Mon, 16 Dec 2024 09:53:58 -0500 Subject: [PATCH] feat: add data functions used for analysis code (#89) ## Summary by CodeRabbit - **New Features** - Introduced functions for manipulating and analyzing patient data, including patient ID handling and time conversion. - Added capabilities for selecting and filtering radiomic data within DataFrames. - Implemented methods for replacing values and splitting DataFrames based on specified criteria. - **Bug Fixes** - Enhanced error handling across multiple functions to ensure robust performance and logging of issues. --- src/readii/data/label.py | 307 ++++++++++++++++++++++++++++++++++++++ src/readii/data/select.py | 177 ++++++++++++++++++++++ src/readii/data/split.py | 103 +++++++++++++ 3 files changed, 587 insertions(+) create mode 100644 src/readii/data/label.py create mode 100644 src/readii/data/select.py create mode 100644 src/readii/data/split.py diff --git a/src/readii/data/label.py b/src/readii/data/label.py new file mode 100644 index 0000000..9512872 --- /dev/null +++ b/src/readii/data/label.py @@ -0,0 +1,307 @@ +import re +from pandas import DataFrame, Series +import numpy as np +from typing import Optional + +from readii.utils import logger +from readii.data.split import replaceColumnValues + +def getPatientIdentifierLabel(dataframe_to_search:DataFrame) -> str: + """Function to find a column in a dataframe that contains some form of patient ID or case ID (case-insensitive). + If multiple found, will return the first match. + + Current regex is: '(pat)?(ient)?(case)?(\s|.)?(id|#)' + + Parameters + ---------- + dataframe_to_search : DataFrame + Dataframe to look for a patient ID column in. + + Returns + ------- + str + Label for patient identifier column from the dataframe. + """ + + # regex to get patient identifier column name in the dataframes + # catches case-insensitive variations of patient_id, patid, pat id, case_id, case id, caseid, id + id_search_term = re.compile(pattern= r'(pat)?(ient)?(case)?(\s|.)?(id|#)', flags=re.IGNORECASE) + + # Get any columns from the dataframe based on the regex + patient_identifier = dataframe_to_search.filter(regex=id_search_term).columns.to_list() + + if len(patient_identifier) > 1: + logger.debug(f"Multiple patient identifier labels found. Using {patient_identifier[0]}.") + + elif len(patient_identifier) == 0: + msg = "Dataframe doesn't have a recognizeable patient ID column. Must contain patient or case ID." + logger.exception(msg) + raise ValueError() + + return patient_identifier[0] + + + +def setPatientIdAsIndex(dataframe_to_index:DataFrame, + patient_id_col:str = None): + """ Function to set the patient ID column as the index of a dataframe. + + Parameters + ---------- + dataframe : DataFrame + Dataframe to set the patient ID column as the index. + patient_id_col : str, optional + Name of the patient ID column to use as the index. If not provided, will find the patient ID column with getPatientIdentifierLabel. + """ + if not patient_id_col: + patient_id_col = getPatientIdentifierLabel(dataframe_to_index) + + pat_indexed_dataframe = dataframe_to_index.set_index(patient_id_col) + return pat_indexed_dataframe + + + +def convertDaysToYears(dataframe_with_outcome:DataFrame, + time_column_label:str, + divide_by:int = 365): + """ Function to create a copy of a time outcome column mesaured in days and convert it to years. + + Parameters + ---------- + dataframe_with_outcome : DataFrame + Dataframe containing the outcome column to convert. + time_column_label : str + Label for the time column to convert in the dataframe. + divide_by : int, optional + Value to divide the time column by. The default is 365. + + Returns + ------- + dataframe_with_outcome : DataFrame + Dataframe with a copy of the specified time column converted to years. + """ + + # Set up the new column name for the converted time column + years_column_label = time_column_label + "_years" + # Make a copy of the time column with the values converted from days to years and add suffic _years to the column name + dataframe_with_outcome[years_column_label] = dataframe_with_outcome[time_column_label] / divide_by + + return dataframe_with_outcome + + + +def timeOutcomeColumnSetup(dataframe_with_outcome:DataFrame, + outcome_column_label:str, + standard_column_label:str, + convert_to_years:bool = False, + ): + """ Function to set up a time outcome column in a dataframe. Makes a copy of the specified outcome column with a standardized column name and converts it to years if specified. + + Parameters + ---------- + dataframe_with_outcome : DataFrame + Dataframe containing the outcome column to convert. + outcome_column_label : str + Label for the outcome column to convert in the dataframe. + standard_column_label : str + Name of the column to save the standardized outcome column as. + convert_to_years : bool, optional + Whether to convert the time column to years. The default is False. + + Returns + ------- + dataframe_with_outcome : DataFrame + Dataframe with a copy of the specified outcome column converted to years. + """ + + # Check if the outcome column is numeric + if not np.issubdtype(dataframe_with_outcome[outcome_column_label].dtype, np.number): + msg = f"{outcome_column_label} is not numeric. Please confirm outcome_column_label is the correct column or convert the column in the dataframe to numeric." + logger.exception(msg) + raise ValueError() + + else: + # Make a copy of the dataframe to work on + dataframe_with_standardized_outcome = dataframe_with_outcome.copy() + + if convert_to_years: + logger.debug("Converting outcome column to years by dividing by 365.") + # Create a copy of the outcome column and convert it to years + dataframe_with_standardized_outcome = convertDaysToYears(dataframe_with_standardized_outcome, outcome_column_label) + + # Rename the converted column with the standardized name + dataframe_with_standardized_outcome.rename(columns={f"{outcome_column_label}_years": standard_column_label}, inplace=True) + else: + + # Make a copy of the outcome column with the standardized name + dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_standardized_outcome[outcome_column_label] + + return dataframe_with_standardized_outcome + + + +def survivalStatusToNumericMapping(event_outcome_column:Series): + """Convert a survival status column to a numeric column by iterating over unique values and assigning a numeric value to each. + Alive values will be assigned a value of 0, and dead values will be assigned a value of 1. + If "alive" is present, next event value index will start at 1. If "dead" is present, next event value index will start at 2. + Any NaN values will be assigned the value "unknown", then converted to a numeric value. + + Parameters + ---------- + event_outcome_column : Series + Series containing the survival status values. + + Returns + ------- + event_column_value_mapping : dict + Dictionary mapping the survival status values to numeric values. + """ + + # Create a dictionary to map event values to numeric values + event_column_value_mapping = {} + # Get a list of all unique event values, set NaN values to unknown, set remaining values to lower case + existing_event_values = event_outcome_column.fillna("unknown").str.lower().unique() + + # Set the conversion value for the first event value to 0 + other_event_num_value = 0 + + # See if alive is present, and set the conversion value for alive to 0 + if "alive" in existing_event_values: + event_column_value_mapping['alive'] = 0 + # Remove alive from the list of existing event values + existing_event_values.remove("alive") + # Update starting value for other event values + other_event_num_value = 1 + + # See if dead is present, and set the conversion value for dead to 1 + if "dead" in existing_event_values: + event_column_value_mapping['dead'] = 1 + # Remove dead from the list of existing event values + existing_event_values.remove("dead") + # Update starting value for other event values + other_event_num_value = 2 + + # Set the conversion value for the other event values to the next available value + for other_event_value in existing_event_values: + event_column_value_mapping[other_event_value] = other_event_num_value + other_event_num_value += 1 + + + return event_column_value_mapping + + + +def eventOutcomeColumnSetup(dataframe_with_outcome:DataFrame, + outcome_column_label:str, + standard_column_label:str, + event_column_value_mapping:Optional[dict]=None): + """ Function to set up an event outcome column in a dataframe. + + Parameters + ---------- + dataframe_with_outcome : DataFrame + Dataframe containing the outcome column to convert. + outcome_column_label : str + Label for the outcome column to convert in the dataframe. + standard_column_label : str + Name of the column to save the standardized outcome column as. + event_column_value_mapping : dict, optional + Dictionary of event values to convert to numeric values. Keys are the event values, values are the numeric values. If provided, all event values in the outcome column must be handled by the dictionary. + If not provided, will attempt to convert based on the values in the outcome column. By default alive and dead will be converted to 0 and 1, respectively, if present in the outcome column. + The default is an empty dictionary. + + Returns + ------- + dataframe_with_standardized_outcome : DataFrame + Dataframe with a copy of the specified outcome column converted to numeric values. + """ + + # Get the type of the existing event column + event_variable_type = dataframe_with_outcome[outcome_column_label].dtype + + # Make a copy of the dataframe to work on + dataframe_with_standardized_outcome = dataframe_with_outcome.copy() + + # Handle numeric event column + if np.issubdtype(event_variable_type, np.number): + dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_outcome[outcome_column_label] + + # Handle boolean event column + elif np.issubdtype(event_variable_type, np.bool_): + dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_outcome[outcome_column_label].astype(int) + + # Handle string event column + elif np.issubdtype(event_variable_type, np.object_): + try: + # Make values of outcome column lowercase + dataframe_with_standardized_outcome[outcome_column_label] = dataframe_with_outcome[outcome_column_label].str.lower() + except Exception as e: + msg = f"Error converting string event column {outcome_column_label} to lowercase. Please check the column is a string type and try again." + logger.exception(msg) + raise e + + # Get the existing event values in the provided dataframe and and sort them + existing_event_values = sorted(dataframe_with_standardized_outcome[outcome_column_label].unique()) + + if not event_column_value_mapping: + # Create a dictionary to map event values to numeric values + event_column_value_mapping = survivalStatusToNumericMapping(event_outcome_column=dataframe_with_standardized_outcome[outcome_column_label]) + + else: + # Convert all dictionary keys in provided mapping to lowercase + event_column_value_mapping = dict((status.lower(), value) for status, value in event_column_value_mapping.items()) + + # Check if user provided dictionary handles all event values in the outcome column + if set(existing_event_values) != set(event_column_value_mapping.keys()): + msg = f"Not all event values in {outcome_column_label} are handled by the provided event_column_value_mapping dictionary." + logger.exception(msg) + raise ValueError() + + # TODO: add handling for values not in the dictionary + + # Set up the new column name for the converted event column + dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_standardized_outcome[outcome_column_label] + + # Swap the keys and values in the mapping dictionary to use the replaceColumnValues function + # So the numeric values will be the keys and the string event values will be the values + replacement_value_data = dict([num_event, str_event] for str_event, num_event in event_column_value_mapping.items()) + + # Replace the string event values in the standardized column with the numeric event values + dataframe_with_standardized_outcome = replaceColumnValues(dataframe_with_standardized_outcome, + column_to_change=standard_column_label, + replacement_value_data=replacement_value_data) + + # end string handling + else: + msg = f"Event column {outcome_column_label} is not a valid type. Must be a string, boolean, or numeric." + logger.exception(msg) + raise TypeError() + + + return dataframe_with_standardized_outcome + + + +def addOutcomeLabels(feature_data_to_label:DataFrame, + clinical_data:DataFrame, + outcome_labels:Optional[list] = None): + """ Function to add survival labels to a feature dataframe based on a clinical dataframe. + + Parameters + ---------- + feature_data_to_label : DataFrame + Dataframe containing the feature data to add survival labels to. + clinical_data : DataFrame + Dataframe containing the clinical data to use for survival labels. + outcome_labels : list, optional + List of outcome labels to extract from the clinical dataframe. The default is ["survival_time_in_years", "survival_event_binary"]. + """ + if outcome_labels is None: + outcome_labels = ["survival_time_in_years", "survival_event_binary"] + + # Get the survival time and event columns as a dataframe + outcome_label_columns = clinical_data[outcome_labels] + + # Join the outcome label dataframe to the feature data dataframe + outcome_labelled_feature_data = outcome_label_columns.join(feature_data_to_label) + return outcome_labelled_feature_data \ No newline at end of file diff --git a/src/readii/data/select.py b/src/readii/data/select.py new file mode 100644 index 0000000..f2d4a51 --- /dev/null +++ b/src/readii/data/select.py @@ -0,0 +1,177 @@ +from pandas import DataFrame +from typing import Optional + +from readii.utils import logger + +from .label import setPatientIdAsIndex + +def dropUpToFeature(dataframe:DataFrame, + feature_name:str, + keep_feature_name_column:Optional[bool] = False + ): + """ Function to drop all columns up to and possibly including the specified feature. + + Parameters + ---------- + dataframe : DataFrame + Dataframe to drop columns from. + feature_name : str + Name of the feature to drop up to. + keep_feature_name_column : bool, optional + Whether to keep the specified feature name column in the dataframe or drop it. The default is False. + + Returns + ------- + dataframe : DataFrame + Dataframe with all columns up to and including the specified feature dropped. + """ + try: + if keep_feature_name_column: + # Get the column names up to but not including the specified feature + column_names = dataframe.columns.to_list()[:dataframe.columns.get_loc(feature_name)] + else: + # Get the column names up to and including the specified feature + column_names = dataframe.columns.to_list()[:dataframe.columns.get_loc(feature_name)+1] + + # Drop all columns up to and including the specified feature + dataframe_dropped_columns = dataframe.drop(columns=column_names) + + return dataframe_dropped_columns + + except KeyError: + logger.warning(f"Feature {feature_name} was not found as a column in dataframe. No columns dropped.") + return dataframe + + except Exception as e: + logger.exception(f"An error occurred in dropUpToFeature: {e}") + raise e + + + +def selectByColumnValue(dataframe:DataFrame, + include_col_values:Optional[dict] = None, + exclude_col_values:Optional[dict] = None) -> DataFrame: + """ + Get rows of pandas DataFrame based on row values in the columns labelled as keys of the include_col_values and not in the keys of the exclude_col_values. + Include variables will be processed first, then exclude variables, in the order they are provided in the corresponding dictionaries. + + Parameters + ---------- + dataframeToSubset : pd.DataFrame + Dataframe to subset. + include_col_values : dict + Dictionary of column names and values to include in the subset. ex. {"column_name": ["value1", "value2"]} + exclude_col_values : dict + Dictionary of column names and values to exclude from the subset. ex. {"column_name": ["value1", "value2"]} + + Returns + ------- + pd.DataFrame + Subset of the input dataframe. + + """ + try: + if (include_col_values is None) and (exclude_col_values is None): + raise ValueError("Must provide one of include_col_values or exclude_col_values.") + + if include_col_values is not None: + for key in include_col_values.keys(): + if key in ["Index", "index"]: + dataframe = dataframe[dataframe.index.isin(include_col_values[key])] + else: + dataframe = dataframe[dataframe[key].isin(include_col_values[key])] + + if exclude_col_values is not None: + for key in exclude_col_values.keys(): + if key in ["Index", "index"]: + dataframe = dataframe[~dataframe.index.isin(exclude_col_values[key])] + else: + dataframe = dataframe[~dataframe[key].isin(exclude_col_values[key])] + + return dataframe + + except Exception as e: + logger.exception(f"An error occurred in selectByColumnValue: {e}") + raise e + + +def getOnlyPyradiomicsFeatures(radiomic_data:DataFrame): + """ Function to get out just the features from a PyRadiomics output that includes metadata/diagnostics columns before the features. + Will look for the last diagnostics column or the first PyRadiomics feature column with the word "original" in it + Parameters + ---------- + radiomic_data : DataFrame + Dataframe of Pyradiomics features with diagnostics and other columns before the features + + Returns + ------- + pyradiomic_features_only : DataFrame + Dataframe with just the radiomic features + + """ + # Find all the columns that begin with diagnostics + diagnostic_data = radiomic_data.filter(regex=r"diagnostics_*") + + if not diagnostic_data.empty: + # Get the last diagnostics column name - the features begin in the next column + last_diagnostic_column = diagnostic_data.columns[-1] + # Drop all the columns before the features start + pyradiomic_features_only = dropUpToFeature(radiomic_data, last_diagnostic_column, keep_feature_name_column=False) + + else: + original_feature_data = radiomic_data.filter(regex=r'^original_*') + if not original_feature_data.empty: + # Get the first original feature column name - the features begin in this column + first_pyradiomic_feature_column = original_feature_data.columns[0] + # Drop all the columns before the features start + pyradiomic_features_only = dropUpToFeature(radiomic_data, first_pyradiomic_feature_column, keep_feature_name_column=True) + else: + msg = "PyRadiomics file doesn't contain any diagnostics or original feature columns, so can't find beginning of features. Use dropUpToFeature and specify the last non-feature or first PyRadiomic feature column name to get only PyRadiomics features." + logger.exception(msg) + raise ValueError(msg) + + return pyradiomic_features_only + + + +def getPatientIntersectionDataframes(dataframe_A:DataFrame, + dataframe_B:DataFrame, + need_pat_index_A:bool = True, + need_pat_index_B:bool = True): + """ Function to get the subset of two dataframes based on the intersection of their indices. Intersection will be based on the index of dataframe A. + + Parameters + ---------- + dataframe_A : DataFrame + Dataframe A to get the intersection of based on the index. + dataframe_B : DataFrame + Dataframe B to get the intersection of based on the index. + need_pat_index_A : bool, optional + Whether to run setPatientIdAsIndex on dataframe A. If False, assumes the patient ID column is already set as the index. The default is True. + need_pat_index_B : bool, optional + Whether to run setPatientIdAsIndex on dataframe B. If False, assumes the patient ID column is already set as the index. The default is True. + + Returns + ------- + intersection_index_dataframeA : DataFrame + Dataframe containing the rows of dataframe A that are in the intersection of the indices of dataframe A and dataframe B. + intersection_index_dataframeB : DataFrame + Dataframe containing the rows of dataframe B that are in the intersection of the indices of dataframe A and dataframe B. + """ + + # Set the patient ID column as the index for dataframe A if needed + if need_pat_index_A: + dataframe_A = setPatientIdAsIndex(dataframe_A) + + # Set the patient ID column as the index for dataframe B if needed + if need_pat_index_B: + dataframe_B = setPatientIdAsIndex(dataframe_B) + + # Get patients in common between dataframe A and dataframe B + intersection_index = dataframe_A.index.intersection(dataframe_B.index) + + # Select common patient rows from each dataframe + intersection_index_dataframeA = dataframe_A.loc[intersection_index] + intersection_index_dataframeB = dataframe_B.loc[intersection_index] + + return intersection_index_dataframeA, intersection_index_dataframeB \ No newline at end of file diff --git a/src/readii/data/split.py b/src/readii/data/split.py new file mode 100644 index 0000000..452865f --- /dev/null +++ b/src/readii/data/split.py @@ -0,0 +1,103 @@ +from pandas import DataFrame + +def replaceColumnValues(dataframe:DataFrame, + column_to_change:str, + replacement_value_data:dict + ): + """Function to replace specified values in a column with a new value. + + Parameters + ---------- + dataframe : DataFrame + Dataframe to replace values in. + column_to_change : str + Name of the column to replace values in. + replacement_value_data : dict + Dictionary of values to replace in the column. Key is the new value, value is the old value(s) to replace. + Can be a single value or a list of values. + + Returns + ------- + dataframe : DataFrame + Dataframe with values replaced. + """ + + # Check if the column name is a valid column in the dataframe + if column_to_change not in dataframe.columns: + raise ValueError(f"Column {column_to_change} not found in dataframe.") + + for new_value in replacement_value_data.keys(): + # Check if the replacement value is a valid value in the column + old_values = replacement_value_data[new_value] + values_not_found_in_column = set(old_values).difference(set(dataframe[column_to_change].unique())) + if values_not_found_in_column == set(old_values): + raise ValueError(f"All values in {values_not_found_in_column} are not found to be replaced in column {column_to_change}.") + # Replace the old values with the new value + dataframe = dataframe.replace(to_replace=replacement_value_data[new_value], + value=new_value) + + return dataframe + + +def splitDataByColumnValue(dataframe:DataFrame, + split_col_data:dict[str,list], + impute_value = None, + ): + """Function to split a dataframe into multiple dataframes based on the values in a specified column. Optionally, impute values in the split columns. + + Parameters + ---------- + dataframe : DataFrame + Dataframe to split. + split_col_data : dict[str,list] + Dictionary of a column name and values to split the dataframe by. Key is the column name, value is the list of values to split by. + impute_value (Optional) + If set, will impute any non-specified split values in the split column with this value. The default is None. + + Returns + ------- + split_dataframes : dict + Dictionary of dataframes, where the key is the split value and the value is the dataframe for that split value. + """ + + # Initialize dictionary to store the split dataframes + split_dataframes = {} + + for split_column_name in split_col_data.keys(): + # Check if the column name is a valid column in the dataframe + if split_column_name not in dataframe.columns: + raise ValueError(f"Column {split_column_name} not found in dataframe.") + + # Get split column values for this column + split_col_values = split_col_data[split_column_name] + + if impute_value is not None: + # Get all values in the column that are not one of the split_col_values + column_value_set = set(dataframe[split_column_name].unique()) + split_value_set = set(split_col_values) + non_split_values = list(column_value_set - split_value_set) + + # Replace all values not specified in the split_col_data with the impute_value specified for this column + dataframe = replaceColumnValues(dataframe, + column_to_change=split_column_name, + replacement_value_data={impute_value: non_split_values}) + + # End imputation + + # Split dataframe by the specified split_col_values + for split_value in split_col_values: + # Check if the split_value is a valid value in the column + if split_value not in dataframe[split_column_name].unique(): + raise ValueError(f"Split value {split_value} not found in column {split_column_name}.") + + # Split the dataframe by the specified split_value + split_dataframe = dataframe[dataframe[split_column_name] == split_value] + + split_dataframe.reset_index(inplace=True, drop=True) + + # Save the split dataframe to the dictionary + split_dataframes[split_value] = split_dataframe + + # End dataframe splitting + + return split_dataframes