Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pre-commit.ci] pre-commit autoupdate #5

Merged
merged 14 commits into from
Feb 15, 2024
Prev Previous commit
Next Next commit
Refactor code in _omop.py files
xinyuejohn committed Feb 15, 2024
commit 012b30aaffcc4dac71cfce3f98b5a7f6d519b650
42 changes: 20 additions & 22 deletions ehrdata/pl/_omop.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import List, Union, Literal, Optional
from ehrdata.utils.omop_utils import *
from ehrdata.tl import get_concept_name
import seaborn as sns
from typing import Literal

import matplotlib.pyplot as plt
import seaborn as sns

from ehrdata.tl import get_concept_name
from ehrdata.utils.omop_utils import get_column_types, map_concept_id, read_table


# TODO allow users to pass features
def feature_counts(
@@ -17,29 +20,24 @@ def feature_counts(
"condition_occurrence",
],
number=20,
key = None
):

if source == 'measurement':
columns = ["value_as_number", "time", "visit_occurrence_id", "measurement_concept_id"]
elif source == 'observation':
columns = ["value_as_number", "value_as_string", "measurement_datetime"]
elif source == 'condition_occurrence':
columns = None
else:
raise KeyError(f"Extracting data from {source} is not supported yet")

filepath_dict = adata.uns['filepath_dict']
tables = adata.uns['tables']

key=None,
):
# if source == 'measurement':
# columns = ["value_as_number", "time", "visit_occurrence_id", "measurement_concept_id"]
# elif source == 'observation':
# columns = ["value_as_number", "value_as_string", "measurement_datetime"]
# elif source == 'condition_occurrence':
# columns = None
# else:
# raise KeyError(f"Extracting data from {source} is not supported yet")

column_types = get_column_types(adata.uns, table_name=source)
df_source = read_table(adata.uns, table_name=source, dtype=column_types, usecols=[f"{source}_concept_id"])
feature_counts = df_source[f"{source}_concept_id"].value_counts()
if adata.uns['use_dask']:
if adata.uns["use_dask"]:
feature_counts = feature_counts.compute()
feature_counts = feature_counts.to_frame().reset_index(drop=False)[0:number]


feature_counts[f"{source}_concept_id_1"], feature_counts[f"{source}_concept_id_2"] = map_concept_id(
adata.uns, concept_id=feature_counts[f"{source}_concept_id"], verbose=False
)
@@ -56,4 +54,4 @@ def feature_counts(
ax = sns.barplot(feature_counts, x="feature_name", y="count")
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")
plt.tight_layout()
return feature_counts
return feature_counts
93 changes: 52 additions & 41 deletions ehrdata/pp/_omop.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from typing import List, Union, Literal, Optional
from ehrdata.utils.omop_utils import *
import ehrapy as ep
import warnings
from typing import Literal, Union

import ehrapy as ep
import pandas as pd
from rich import print as rprint

from ehrdata.utils.omop_utils import get_column_types, get_feature_info, read_table


def get_feature_statistics(
adata,
@@ -14,10 +19,12 @@ def get_feature_statistics(
"drug_exposure",
"condition_occurrence",
],
features: Union[str, int , List[Union[str, int]]] = None,
features: Union[str, int, list[Union[str, int]]] = None,
level="stay_level",
value_col: str = None,
aggregation_methods: Union[Literal["min", "max", "mean", "std", "count"], List[Literal["min", "max", "mean", "std", "count"]]]=None,
aggregation_methods: Union[
Literal["min", "max", "mean", "std", "count"], list[Literal["min", "max", "mean", "std", "count"]]
] = None,
add_aggregation_to_X: bool = True,
verbose: bool = False,
use_dask: bool = None,
@@ -28,16 +35,22 @@ def get_feature_statistics(
key = f"{source.split('_')[0]}_concept_id"
else:
raise KeyError(f"Extracting data from {source} is not supported yet")

if source == 'measurement':
value_col = 'value_as_number'
warnings.warn(f"Extracting values from {value_col}. Value in measurement table could be saved in these columns: value_as_number, value_source_value.\nSpecify value_col to extract value from desired column.")
source_table_columns = ['visit_occurrence_id', 'measurement_datetime', key, value_col]
elif source == 'observation':
value_col = 'value_as_number'
warnings.warn(f"Extracting values from {value_col}. Value in observation table could be saved in these columns: value_as_number, value_as_string, value_source_value.\nSpecify value_col to extract value from desired column.")
source_table_columns = ['visit_occurrence_id', "observation_datetime", key, value_col]
elif source == 'condition_occurrence':

if source == "measurement":
value_col = "value_as_number"
warnings.warn(
f"Extracting values from {value_col}. Value in measurement table could be saved in these columns: value_as_number, value_source_value.\nSpecify value_col to extract value from desired column.",
stacklevel=2,
)
source_table_columns = ["visit_occurrence_id", "measurement_datetime", key, value_col]
elif source == "observation":
value_col = "value_as_number"
warnings.warn(
f"Extracting values from {value_col}. Value in observation table could be saved in these columns: value_as_number, value_as_string, value_source_value.\nSpecify value_col to extract value from desired column.",
stacklevel=2,
)
source_table_columns = ["visit_occurrence_id", "observation_datetime", key, value_col]
elif source == "condition_occurrence":
source_table_columns = None
else:
raise KeyError(f"Extracting data from {source} is not supported yet")
@@ -49,62 +62,60 @@ def get_feature_statistics(
use_dask = True

column_types = get_column_types(adata.uns, table_name=source)
df_source = read_table(adata.uns, table_name=source, dtype=column_types, usecols=source_table_columns, use_dask=use_dask)

df_source = read_table(
adata.uns, table_name=source, dtype=column_types, usecols=source_table_columns, use_dask=use_dask
)

info_df = get_feature_info(adata.uns, features=features, verbose=verbose)
info_dict = info_df[['feature_id', 'feature_name']].set_index('feature_id').to_dict()['feature_name']
info_dict = info_df[["feature_id", "feature_name"]].set_index("feature_id").to_dict()["feature_name"]

# Select featrues
df_source = df_source[df_source[key].isin(list(info_df.feature_id))]
#TODO Select time
#da_measurement = da_measurement[(da_measurement.time >= 0) & (da_measurement.time <= 48*60*60)]
#df_source[f'{source}_name'] = df_source[key].map(info_dict)
# TODO Select time
# da_measurement = da_measurement[(da_measurement.time >= 0) & (da_measurement.time <= 48*60*60)]
# df_source[f'{source}_name'] = df_source[key].map(info_dict)
if aggregation_methods is None:
aggregation_methods = ["min", "max", "mean", "std", "count"]
if level == 'stay_level':
result = df_source.groupby(['visit_occurrence_id', key]).agg({
value_col: aggregation_methods})

if level == "stay_level":
result = df_source.groupby(["visit_occurrence_id", key]).agg({value_col: aggregation_methods})

if use_dask:
result = result.compute()
result = result.reset_index(drop=False)
result.columns = ["_".join(a) for a in result.columns.to_flat_index()]
result.columns = result.columns.str.removesuffix('_')
result.columns = result.columns.str.removeprefix(f'{value_col}_')
result[f'{source}_name'] = result[key].map(info_dict)
result.columns = result.columns.str.removesuffix("_")
result.columns = result.columns.str.removeprefix(f"{value_col}_")
result[f"{source}_name"] = result[key].map(info_dict)

df_statistics = result.pivot(index='visit_occurrence_id',
columns=f'{source}_name',
values=aggregation_methods)
df_statistics = result.pivot(index="visit_occurrence_id", columns=f"{source}_name", values=aggregation_methods)
df_statistics.columns = df_statistics.columns.swaplevel()
df_statistics.columns = ["_".join(a) for a in df_statistics.columns.to_flat_index()]


# TODO
sort_columns = True
if sort_columns:
new_column_order = []
for feature in features:
for suffix in (f'_{aggregation_method}' for aggregation_method in aggregation_methods):
col_name = f'{feature}{suffix}'
for suffix in (f"_{aggregation_method}" for aggregation_method in aggregation_methods):
col_name = f"{feature}{suffix}"
if col_name in df_statistics.columns:
new_column_order.append(col_name)

df_statistics.columns = new_column_order

df_statistics.index = df_statistics.index.astype(str)
adata.obs = pd.merge(adata.obs, df_statistics, how='left', left_index=True, right_index=True)

adata.obs = pd.merge(adata.obs, df_statistics, how="left", left_index=True, right_index=True)

if add_aggregation_to_X:
uns = adata.uns
obsm = adata.obsm
varm = adata.varm
layers = adata.layers
# layers = adata.layers
adata = ep.ad.move_to_x(adata, list(df_statistics.columns))
adata.uns = uns
adata.obsm = obsm
adata.varm = varm
# It will change
# adata.layers = layers
return adata
return adata
29 changes: 15 additions & 14 deletions ehrdata/tl/_omop.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
from ehrdata.utils.omop_utils import * #get_column_types, read_table, df_to_dict
from typing import List, Union, Literal, Optional, Dict
import numbers
from rich import print as rprint
from typing import Union

from anndata import AnnData
from rich import print as rprint

from ehrdata.utils.omop_utils import df_to_dict, get_column_types, read_table

def get_concept_name(
adata: Union[AnnData, Dict],
concept_id: Union[str, List],
raise_error=False,
verbose=True):


def get_concept_name(adata: Union[AnnData, dict], concept_id: Union[str, list], raise_error=False, verbose=True):
if isinstance(concept_id, numbers.Integral):
concept_id = [concept_id]

if isinstance(adata, AnnData):
adata_dict = adata.uns
else:
adata_dict = adata

column_types = get_column_types(adata_dict, table_name="concept")
df_concept = read_table(adata_dict, table_name="concept", dtype=column_types)
# TODO dask Support
#df_concept.compute().dropna(subset=["concept_id", "concept_name"], inplace=True, ignore_index=True) # usecols=vocabularies_tables_columns["concept"]
df_concept.dropna(subset=["concept_id", "concept_name"], inplace=True, ignore_index=True) # usecols=vocabularies_tables_columns["concept"]
# df_concept.compute().dropna(subset=["concept_id", "concept_name"], inplace=True, ignore_index=True) # usecols=vocabularies_tables_columns["concept"]
df_concept.dropna(
subset=["concept_id", "concept_name"], inplace=True, ignore_index=True
) # usecols=vocabularies_tables_columns["concept"]
concept_dict = df_to_dict(df=df_concept, key="concept_id", value="concept_name")
concept_name = []
concept_name_not_found = []
@@ -43,6 +43,7 @@ def get_concept_name(
else:
return concept_name


# TODO
def get_concept_id():
pass
pass
450 changes: 293 additions & 157 deletions ehrdata/utils/omop_utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import pandas as pd
import os
import csv
import glob
import numbers
import os
import warnings
import dask.dataframe as dd
from pathlib import Path
from typing import List, Union, Literal, Optional, Dict
import numbers
from typing import Union

import dask.dataframe as dd
import pandas as pd
from rich import print as rprint
import glob
from difflib import SequenceMatcher
from heapq import nlargest as _nlargest


def get_table_catalog_dict():
"""Get the table catalog dictionary of the OMOP CDM v5.4
Returns
-------
Dictionary: a dictionary of the table catalog. The key is the category of the table, and the value is a list of table names
"""
table_catalog_dict = {}
table_catalog_dict['Clinical data'] = [
table_catalog_dict["Clinical data"] = [
"person",
"observation_period",
"specimen",
@@ -34,7 +39,13 @@ def get_table_catalog_dict():

table_catalog_dict["Health system data"] = ["location", "care_site", "provider"]
table_catalog_dict["Health economics data"] = ["payer_plan_period", "cost"]
table_catalog_dict["Standardized derived elements"] = ["cohort", "cohort_definition", "drug_era", "dose_era", "condition_era"]
table_catalog_dict["Standardized derived elements"] = [
"cohort",
"cohort_definition",
"drug_era",
"dose_era",
"condition_era",
]
table_catalog_dict["Metadata"] = ["cdm_source", "metadata"]
table_catalog_dict["Vocabulary"] = [
"concept",
@@ -50,42 +61,68 @@ def get_table_catalog_dict():
]
return table_catalog_dict


def get_dtype_mapping():
dtype_mapping = {'integer': "Int64",
'Integer': "Int64",
'float': float,
'bigint': "Int64",
'varchar(MAX)': str,
'varchar(2000)': str,
'varchar(1000)': str,
'varchar(255)': str,
'varchar(250)': str,
'varchar(80)': str,
'varchar(60)': str,
'varchar(50)': str,
'varchar(25)': str,
'varchar(20)': str,
'varchar(10)': str,
'varchar(9)': str,
'varchar(3)': str,
'varchar(2)': str,
'varchar(1)': str,
'datetime': object,
'date': object}

"""Get the data type mapping of the OMOP CDM v5.4
Returns
-------
Dictionary: a dictionary of the data type mapping from OMOP CDM v5.4 to Python
"""
dtype_mapping = {
"integer": "Int64",
"Integer": "Int64",
"float": float,
"bigint": "Int64",
"varchar(MAX)": str,
"varchar(2000)": str,
"varchar(1000)": str,
"varchar(255)": str,
"varchar(250)": str,
"varchar(80)": str,
"varchar(60)": str,
"varchar(50)": str,
"varchar(25)": str,
"varchar(20)": str,
"varchar(10)": str,
"varchar(9)": str,
"varchar(3)": str,
"varchar(2)": str,
"varchar(1)": str,
"datetime": object,
"date": object,
}

return dtype_mapping


def get_omop_cdm_field_level():
"""Get the field level table sof the OMOP CDM v5.4
Returns
-------
Pandas DataFrame
"""
pth = f"{Path(__file__).resolve().parent}/OMOP_CDMv5.4_Field_Level.csv"
df = pd.read_csv(pth)
return df

def check_with_omop_cdm(
delimiter,
folder_path=str,
make_filename_lowercase=True):



def check_with_omop_cdm(folder_path: str, delimiter: str = None, make_filename_lowercase: bool = True) -> dict:
"""Check if the data adheres to the OMOP Common Data Model (CDM) version 5.4 standards
Check if the table name and column names adhere to the OMOP CDM v5.4
Args:
folder_path (str): The path of the folder containing the OMOP data
delimiter (str, optional): The delimiter of the CSV file. Defaults to None.
make_filename_lowercase (bool, optional): Whether to make the filename into lowercase. Defaults to True.
Returns
-------
dict: a dictionary of the table path. The key is the table name, and the value is the path of the table
"""
# TODO check if each column's data type adheres to the OMOP CDM
print("Checking if your data adheres to the OMOP Common Data Model (CDM) version 5.4 standards.")
filepath_list = glob.glob(os.path.join(folder_path, "*.csv")) + glob.glob(os.path.join(folder_path, "*.parquet"))
filepath_dict = {}
@@ -94,7 +131,7 @@ def check_with_omop_cdm(
is_single_file = True
else:
is_single_file = False

# TODO support table stored in a folder
"""
# If not a single file, only check the first one's column names
@@ -105,50 +142,61 @@ def check_with_omop_cdm(
is_single_file = False
"""
if is_single_file and not check_csv_has_only_header(path):

# Make filename into lowercase
if make_filename_lowercase:
new_path = os.path.join(folder_path, path.split("/")[-1].lower())
if path != new_path:
warnings(f"Rename file [{path}] to [{new_path}]")
os.rename(path, new_path)
path = new_path

# check if table name adheres to the OMOP CDM
file_name = os.path.basename(path).split(".")[0]
field_level = get_omop_cdm_field_level()
if file_name not in set(field_level.cdmTableName):
raise KeyError(f"Table [{file_name}] is not defined in OMOP CDM v5.4! Please change the table name manually!")


raise KeyError(
f"Table [{file_name}] is not defined in OMOP CDM v5.4! Please change the table name manually!"
)

# check if column names adhere to the OMOP CDM
if path.endswith('csv'):
with open(path, "r") as f:
if path.endswith("csv"):
with open(path) as f:
dict_reader = csv.DictReader(f, delimiter=delimiter)
columns = dict_reader.fieldnames
columns = list(filter(None, columns))
elif path.endswith('parquet'):
columns = list(filter(None, columns))
elif path.endswith("parquet"):
df = dd.read_parquet(path)
columns = list(df.columns)
else:
raise TypeError("Only support CSV and Parquet file!")

invalid_column_name = []
for _, column in enumerate(columns):
cdm_columns = set(field_level[field_level.cdmTableName == file_name]['cdmFieldName'])
cdm_columns = set(field_level[field_level.cdmTableName == file_name]["cdmFieldName"])
if column not in cdm_columns:
invalid_column_name.append(column)
if len(invalid_column_name) > 0:
print(f"Column {invalid_column_name} is not defined in Table [{file_name}] in OMOP CDM v5.4! Please change the column name manually!\nFor more information, please refer to: https://ohdsi.github.io/CommonDataModel/cdm54.html#{file_name.upper()}")
print(
f"Column {invalid_column_name} is not defined in Table [{file_name}] in OMOP CDM v5.4! Please change the column name manually!\nFor more information, please refer to: https://ohdsi.github.io/CommonDataModel/cdm54.html#{file_name.upper()}"
)
raise KeyError

filepath_dict[file_name] = path
return filepath_dict

def check_csv_has_only_header(file_path):
if file_path.endswith('csv'):
with open(file_path, 'r') as file:


def check_csv_has_only_header(file_path: str) -> bool:
"""Check if the CSV file has only header
Args:
file_path (str): The path of the CSV file
Returns
-------
bool: True if the CSV file has only header, False otherwise
"""
if file_path.endswith("csv"):
with open(file_path) as file:
reader = csv.reader(file)
header = next(reader, None)
if header is not None:
@@ -158,24 +206,33 @@ def check_csv_has_only_header(file_path):
return False
else:
return False

def get_column_types(adata_dict,
table_name: str = None):

path = adata_dict['filepath_dict'][table_name]


def get_column_types(adata_dict: dict, table_name: str) -> dict:
"""Get the column types of the table
Args:
adata_dict (dict): a dictionary containing filepath_dict and delimiter information
table_name (str): Table name in OMOP CDM v5.4.
Returns
-------
dict: a dictionary of the column types. The key is the column name, and the value is the column type
"""
path = adata_dict["filepath_dict"][table_name]
column_types = {}
# If not a single file, read the first one
if not os.path.isfile(path):
folder_walk = os.walk(path)
first_file_in_folder = next(folder_walk)[2][0]
path = os.path.join(path, first_file_in_folder)
if path.endswith('csv'):
with open(path, "r") as f:
dict_reader = csv.DictReader(f, delimiter=adata_dict['delimiter'])

if path.endswith("csv"):
with open(path) as f:
dict_reader = csv.DictReader(f, delimiter=adata_dict["delimiter"])
columns = dict_reader.fieldnames
columns = list(filter(None, columns))
elif path.endswith('parquet'):
columns = list(filter(None, columns))
elif path.endswith("parquet"):
df = dd.read_parquet(path)
columns = list(df.columns)
else:
@@ -184,35 +241,75 @@ def get_column_types(adata_dict,
for _, column in enumerate(columns_lowercase):
dtype_mapping = get_dtype_mapping()
field_level = get_omop_cdm_field_level()
column_types[column] = dtype_mapping[field_level[(field_level.cdmTableName == table_name) & (field_level.cdmFieldName == column)]['cdmDatatype'].values[0]]
column_types[column] = dtype_mapping[
field_level[(field_level.cdmTableName == table_name) & (field_level.cdmFieldName == column)][
"cdmDatatype"
].values[0]
]
return column_types


def get_primary_key(table_name):
def get_primary_key(table_name: str) -> str:
"""Get the primary key of the table
Args:
table_name (str, optional): Table name in OMOP CDM v5.4.
Returns
-------
str: the primary key of the table
"""
field_level = get_omop_cdm_field_level()
primary_key = field_level[(field_level.cdmTableName == table_name) & (field_level.isPrimaryKey == 'Yes')]['cdmFieldName'].values[0]
primary_key = field_level[(field_level.cdmTableName == table_name) & (field_level.isPrimaryKey == "Yes")][
"cdmFieldName"
].values[0]
return primary_key

def read_table(adata_dict, table_name: str = None, dtype=None, parse_dates=None, index=None, usecols=None, use_dask=None):



def read_table(
adata_dict: dict,
table_name: str,
dtype: dict = None,
parse_dates: Union[list[str], str] = None,
index: str = None,
usecols: Union[list[str], str] = None,
use_dask: bool = None,
) -> Union[pd.DataFrame, dd.DataFrame]:
"""Read the table either in CSV or Parquet format using pandas or dask
Args:
adata_dict (dict): a dictionary containing filepath_dict, delimiter, use_dask, tables information
table_name (str, optional): Table name in OMOP CDM v5.4.
dtype (dict, optional): Data type of the columns. Defaults to None.
parse_dates (Union[List[str], str], optional): Columns to parse as dates. Defaults to None.
index (str, optional): set the index of the DataFrame. Defaults to None.
usecols (Union[List[str], str], optional): Columns to read. Defaults to None.
use_dask (bool, optional): Whether to use dask. It is recommended to use dask when the table is large. Defaults to None.
Returns
-------
Union[pd.DataFrame, dd.DataFrame]: a pandas or dask DataFrame
"""
if not use_dask:
use_dask = adata_dict['use_dask']
path = adata_dict['filepath_dict'][table_name]
use_dask = adata_dict["use_dask"]
path = adata_dict["filepath_dict"][table_name]
if use_dask:
if not os.path.isfile(path):
folder_walk = os.walk(path)
filetype = next(folder_walk)[2][0].split(".")[-1]
else:
filetype = path.split(".")[-1]
if filetype == 'csv':
if filetype == "csv":
if not os.path.isfile(path):
path = f"{path}/*.csv"
if usecols:
dtype = {key: dtype[key] for key in usecols if key in dtype}
if parse_dates:
parse_dates = {key: parse_dates[key] for key in usecols if key in parse_dates}
df = dd.read_csv(path, delimiter=adata_dict['delimiter'], dtype=dtype, parse_dates=parse_dates, usecols=usecols)
elif filetype == 'parquet':
df = dd.read_csv(
path, delimiter=adata_dict["delimiter"], dtype=dtype, parse_dates=parse_dates, usecols=usecols
)
elif filetype == "parquet":
if not os.path.isfile(path):
path = f"{path}/*.parquet"
if usecols:
@@ -226,47 +323,59 @@ def read_table(adata_dict, table_name: str = None, dtype=None, parse_dates=None,
if not os.path.isfile(path):
raise TypeError("Only support reading a single file!")
filetype = path.split(".")[-1]
if filetype == 'csv':
if filetype == "csv":
if usecols:
dtype = {key: dtype[key] for key in usecols if key in dtype}
if parse_dates:
parse_dates = {key: parse_dates[key] for key in usecols if key in parse_dates}
df = pd.read_csv(path, delimiter=adata_dict['delimiter'], dtype=dtype, parse_dates=parse_dates, usecols=usecols)
elif filetype == 'parquet':
df = pd.read_csv(
path, delimiter=adata_dict["delimiter"], dtype=dtype, parse_dates=parse_dates, usecols=usecols
)
elif filetype == "parquet":
df = pd.read_parquet(path, columns=usecols)

else:
raise TypeError("Only support CSV and Parquet file!")



if index:
df = df.set_index(index)
return df


def map_concept_id(
adata_dict,
concept_id: Union[str, List],
verbose=True):

filepath_dict = adata_dict['filepath_dict']
tables = adata_dict['tables']
delimiter = adata_dict['delimiter']

adata_dict: dict, concept_id: Union[str, list[int]], verbose: bool = True
) -> tuple[list[int], list[int]]:
"""Map between concept_id_1 and concept_id_2 using concept_relationship table
Args:
adata_dict (dict): a dictionary containing filepath_dict, delimiter, tables information.
concept_id (Union[str, list[int]]): It could be a single concept_id or a list of concept_id.
verbose (bool, optional): Defaults to True.
Returns
-------
Tuple[list[int], list[int]]: a tuple of list of concept_id_1 and list of concept_id_2. If no map is found, the concept_id_1 and concept_id_2 will be the same.
"""
filepath_dict = adata_dict["filepath_dict"]
tables = adata_dict["tables"]
delimiter = adata_dict["delimiter"]

if isinstance(concept_id, numbers.Integral):
concept_id = [concept_id]
concept_id_1 = []
concept_id_2 = []
concept_id_mapped_not_found = []

if "concept_relationship" in tables:
column_types = get_column_types(adata_dict, table_name="concept_relationship")
df_concept_relationship = pd.read_csv(
filepath_dict["concept_relationship"], dtype=column_types
filepath_dict["concept_relationship"], dtype=column_types, delimiter=delimiter
)
# TODO dask Support
#df_concept_relationship.compute().dropna(subset=["concept_id_1", "concept_id_2", "relationship_id"], inplace=True) # , usecols=vocabularies_tables_columns["concept_relationship"],
df_concept_relationship.dropna(subset=["concept_id_1", "concept_id_2", "relationship_id"], inplace=True) # , usecols=vocabularies_tables_columns["concept_relationship"],
# df_concept_relationship.compute().dropna(subset=["concept_id_1", "concept_id_2", "relationship_id"], inplace=True) # , usecols=vocabularies_tables_columns["concept_relationship"],
df_concept_relationship.dropna(
subset=["concept_id_1", "concept_id_2", "relationship_id"], inplace=True
) # , usecols=vocabularies_tables_columns["concept_relationship"],
concept_relationship_dict = df_to_dict(
df=df_concept_relationship[df_concept_relationship["relationship_id"] == "Maps to"],
key="concept_id_1",
@@ -296,67 +405,90 @@ def map_concept_id(
else:
concept_id_1 = concept_id
concept_id_2 = concept_id

if len(concept_id_1) == 1:
return concept_id_1[0], concept_id_2[0]
else:
return concept_id_1, concept_id_2


def df_to_dict(df, key, value):


def df_to_dict(df: pd.DataFrame, key: str, value: str) -> dict:
"""Convert a DataFrame to a dictionary
Args:
df (pd.DataFrame): a DataFrame
key (str): the column name to be used as the key of the dictionary
value (str): the column name to be used as the value of the dictionary
Returns
-------
dict: a dictionary
"""
if isinstance(df, dd.DataFrame):
return pd.Series(df[value].compute().values, index=df[key].compute()).to_dict()
else:
return pd.Series(df[value].values, index=df[key]).to_dict()


def get_close_matches_using_dict(word, possibilities, n=2, cutoff=0.6):
"""Use SequenceMatcher to return a list of the indexes of the best
"good enough" matches. word is a sequence for which close matches
are desired (typically a string).
possibilities is a dictionary of sequences.
Optional arg n (default 2) is the maximum number of close matches to
return. n must be > 0.
Optional arg cutoff (default 0.6) is a float in [0, 1]. Possibilities
that don't score at least that similar to word are ignored.
"""

if not n > 0:
raise ValueError("n must be > 0: %r" % (n,))
if not 0.0 <= cutoff <= 1.0:
raise ValueError("cutoff must be in [0.0, 1.0]: %r" % (cutoff,))
result = []
s = SequenceMatcher()
s.set_seq2(word)
for _, (key, value) in enumerate(possibilities.items()):
s.set_seq1(value)
if s.real_quick_ratio() >= cutoff and s.quick_ratio() >= cutoff and s.ratio() >= cutoff:
result.append((s.ratio(), value, key))

# Move the best scorers to head of list
result = _nlargest(n, result)
# def get_close_matches_using_dict(word, possibilities, n=2, cutoff=0.6):
# """Use SequenceMatcher to return a list of the indexes of the best
# "good enough" matches. word is a sequence for which close matches
# are desired (typically a string).
# possibilities is a dictionary of sequences.
# Optional arg n (default 2) is the maximum number of close matches to
# return. n must be > 0.
# Optional arg cutoff (default 0.6) is a float in [0, 1]. Possibilities
# that don't score at least that similar to word are ignored.
# """
# if not n > 0:
# raise ValueError("n must be > 0: %r" % (n,))
# if not 0.0 <= cutoff <= 1.0:
# raise ValueError("cutoff must be in [0.0, 1.0]: %r" % (cutoff,))
# result = []
# s = SequenceMatcher()
# s.set_seq2(word)
# for _, (key, value) in enumerate(possibilities.items()):
# s.set_seq1(value)
# if s.real_quick_ratio() >= cutoff and s.quick_ratio() >= cutoff and s.ratio() >= cutoff:
# result.append((s.ratio(), value, key))

# Strip scores for the best n matches
return [(value, key, score) for score, value, key in result]
# # Move the best scorers to head of list
# result = _nlargest(n, result)

# # Strip scores for the best n matches
# return [(value, key, score) for score, value, key in result]


def get_feature_info(
adata_dict: Dict,
features: Union[str, int, List[Union[str, int]]] = None,
adata_dict: dict,
features: Union[str, int, list[Union[str, int]]] = None,
ignore_not_shown_in_concept_table: bool = True,
exact_match: bool = True,
verbose: bool = True,
):
) -> pd.DataFrame:
"""Get the feature information from the concept table
Args:
adata_dict (dict): a dictionary containing filepath_dict, delimiter, tables information.
features (Union[str, int, list[Union[str, int]]], optional): a feature name or a feature id. Defaults to None.
ignore_not_shown_in_concept_table (bool, optional): If True, it will ignore the features that are not shown in the concept table. Defaults to True.
exact_match (bool, optional): If True, it will only return the exact match if the feature name is input. Defaults to True.
verbose (bool, optional): Defaults to True.
if "concept" in adata_dict['tables']:
Returns
-------
pd.DataFrame: a DataFrame containing the feature information
"""
if "concept" in adata_dict["tables"]:
column_types = get_column_types(adata_dict, table_name="concept")

df_concept = read_table(adata_dict, table_name="concept", dtype=column_types).dropna(
subset=["concept_id", "concept_name"]
) # usecols=vocabularies_tables_columns["concept"],
#concept_dict = df_to_dict(df=df_concept, key="concept_name", value="concept_id")

# concept_dict = df_to_dict(df=df_concept, key="concept_name", value="concept_id")
else:
rprint("concept table is not found in the OMOP CDM v5.4!")
raise ValueError
fetures_not_shown_in_concept_table = []

info_df = pd.DataFrame([])
@@ -369,7 +501,7 @@ def get_feature_info(
feature_id = feature
feature_id_1, feature_id_2 = map_concept_id(adata_dict=adata_dict, concept_id=feature_id, verbose=False)
try:
feature_name = df_concept[df_concept['concept_id'] == feature_id_1]['concept_name'].values[0]
feature_name = df_concept[df_concept["concept_id"] == feature_id_1]["concept_name"].values[0]
except KeyError:
if ignore_not_shown_in_concept_table:
fetures_not_shown_in_concept_table.append(feature)
@@ -382,23 +514,21 @@ def get_feature_info(
# if the input is feature name
elif isinstance(feature, str):
# return a list of (value, key, score)
#result = get_close_matches_using_dict(feature, concept_dict, n=2, cutoff=0.2)
# result = get_close_matches_using_dict(feature, concept_dict, n=2, cutoff=0.2)
from thefuzz import process

# the thefuzz match returns a list of tuples of (matched string, match ratio)
result = process.extract(feature, list(df_concept['concept_name'].values), limit=2)

# the thefuzz match returns a list of tuples of (matched string, match ratio)
result = process.extract(feature, list(df_concept["concept_name"].values), limit=2)

match_1 = result[0]
match_1_name = match_1[0]
match_1_ratio = match_1[1]
# Most of the case: if find 2 best matches
if len(result) == 2:

match_2 = result[1]
match_2_name = match_2[0]
match_2_ratio = match_2[1]

if match_1_ratio != 100:
if exact_match:
rprint(
@@ -407,35 +537,41 @@ def get_feature_info(
raise ValueError
else:
if match_2_ratio == 100:
match_1_id = df_concept[df_concept['concept_name'] == match_1_name]['concept_id'].values[0]
match_2_id = df_concept[df_concept['concept_name'] == match_2_name]['concept_id'].values[0]
match_1_id = df_concept[df_concept["concept_name"] == match_1_name]["concept_id"].values[0]
match_2_id = df_concept[df_concept["concept_name"] == match_2_name]["concept_id"].values[0]
rprint(
f"Found multiple exact matches for [blue]{feature}[/] in the concept table.\n1) concept id: [blue]{match_1_id}[/] 2) concept id: [blue]{match_2_id}[/]. Please specify concept_id directly."
)
raise ValueError



# Very rare: if only find 1 match
else:
if exact_match and match_1_ratio != 1:
rprint(
f"Unable to find an exact match for [red]{feature}[/] in the concept table. Similiar one: [blue]{match_1_name}[/] with match ratio [red]{match_1_ratio}[/]"
)
raise ValueError

feature_name = match_1_name
feature_id = df_concept[df_concept['concept_name'] == feature_name]['concept_id'].values[0]
feature_id = df_concept[df_concept["concept_name"] == feature_name]["concept_id"].values[0]
feature_id_1, feature_id_2 = map_concept_id(adata_dict=adata_dict, concept_id=feature_id, verbose=False)

else:
rprint(
f"Please input either [red]feature name (string)[/] or [red]feature id (integer)[/] that you want to extarct"
"Please input either [red]feature name (string)[/] or [red]feature id (integer)[/] that you want to extarct"
)
raise TypeError

info_df = pd.concat([info_df, pd.DataFrame(data=[[feature_name, feature_id_1, feature_id_2]], columns=['feature_name', 'feature_id_1', 'feature_id_2'])])



info_df = pd.concat(
[
info_df,
pd.DataFrame(
data=[[feature_name, feature_id_1, feature_id_2]],
columns=["feature_name", "feature_id_1", "feature_id_2"],
),
]
)

# feature_name_list.append(feature_name)
# domain_id_list.append(df_concept.loc[df_concept["concept_id"] == feature_id, "domain_id"].reset_index(drop=True).compute()[0])
# concept_class_id_list.append(df_concept.loc[df_concept["concept_id"] == feature_id, "concept_class_id"].reset_index(drop=True).compute()[0])
@@ -446,10 +582,10 @@ def get_feature_info(
f"Detected: feature [green]{feature_name}[/], feature ID [green]{feature_id}[/] in concept table, match ratio = [green]{match_1_ratio}."
)

if info_df[f"feature_id_1"].equals(info_df[f"feature_id_2"]):
info_df.drop(f"feature_id_2", axis=1, inplace=True)
if info_df["feature_id_1"].equals(info_df["feature_id_2"]):
info_df.drop("feature_id_2", axis=1, inplace=True)
info_df = info_df.rename(columns={"feature_id_1": "feature_id"})
info_df = info_df.reset_index(drop=True)
else:
info_df = info_df.reset_index(drop=True)
return info_df
return info_df