Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tablenumber capabilities to distinguish repeated image numbers from multiple datasets #188

Merged
merged 14 commits into from
Oct 25, 2024
126 changes: 123 additions & 3 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
@@ -173,6 +173,100 @@ def _prep_cast_column_data_types(
return columns


@python_app
def _set_tablenumber(
sources: Dict[str, List[Dict[str, Any]]],
add_tablenumber: Optional[bool] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Gathers a "TableNumber" for the table which is a unique identifier intended
to help differentiate between imagenumbers to create distinct results.
Note:
- If using CSV data sources, the image.csv table is used for checksum.
- If using SQLite data sources, the entire SQLite database is used for checksum.
Args:
sources: Dict[str, List[Dict[str, Any]]]
Contains metadata about data tables and related contents.
add_tablenumber: Optional[bool]
Whether to add a calculated tablenumber.
Note: when False, adds None as the tablenumber
Returns:
List[Dict[str, Any]]
New source group with added TableNumber details.
"""

from cloudpathlib import AnyPath

from cytotable.utils import _gather_tablenumber_checksum

image_table_groups = {
# create a data structure with the common parent for each dataset
# and the calculated checksum from the image table.
# note: the source_path parent is used for non-SQLite files
# whereas the direct source path is used for SQLite files.
(
str(source["source_path"].parent)
if source["source_path"].suffix != "sqlite"
else source["source_path"]
): source["source_path"]
for source_group_name, source_group_vals in sources.items()
# use the image tables references only for the basis of the
# these calculations.
if any(
value in str(AnyPath(source_group_name).stem).lower()
for value in ["image", "per_image"]
)
for source in source_group_vals
}

# determine if we need to add tablenumber data
if (
# case for detecting multiple image tables which need to be differentiated
add_tablenumber is None
and (len(image_table_groups) <= 1)
) or (
# case for explicitly set no tablenumbers
add_tablenumber
is False
):
return {
source_group_name: [
dict(
source,
**{
"tablenumber": None,
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in sources.items()
}

# gather the image table from the source_group
tablenumber_table = {
# create a data structure with the common parent for each dataset
# and the calculated checksum from the image table
group: _gather_tablenumber_checksum(path)
for group, path in image_table_groups.items()
}

# return a modified sources data structure with the tablenumber added
return {
source_group_name: [
dict(
source,
**{"tablenumber": tablenumber_table[str(source["source_path"].parent)]},
)
for source in source_group_vals
if str(source["source_path"].parent) in list(tablenumber_table.keys())
]
for source_group_name, source_group_vals in sources.items()
}


@python_app
def _get_table_keyset_pagination_sets(
chunk_size: int,
@@ -310,15 +404,26 @@ def _source_pageset_to_parquet(
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)

# build tablenumber segment addition (if necessary)
tablenumber_sql = (
# to become tablenumber in sql select later with bigint (8-byte integer)
# we cast here to bigint to avoid concat or join conflicts later due to
# misaligned automatic data typing.
f"CAST({source['tablenumber']} AS BIGINT) as TableNumber, "
if source["tablenumber"] is not None
# if we don't have a tablenumber value, don't introduce the column
else ""
)

# add source table columns
casted_source_cols = [
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
]

# create selection statement from lists above
select_columns = ",".join(
# create selection statement from tablenumber_sql + lists above
select_columns = tablenumber_sql + ",".join(
# if we should sort the output, add the metadata_cols
casted_source_cols
if sort_output
@@ -376,6 +481,7 @@ def _source_pageset_to_parquet(
page_key=source["page_key"],
pageset=pageset,
sort_output=sort_output,
tablenumber=source["tablenumber"],
),
where=result_filepath,
)
@@ -994,6 +1100,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
sort_output: bool,
page_keys: Dict[str, str],
data_type_cast_map: Optional[Dict[str, str]] = None,
add_tablenumber: Optional[bool] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
"""
@@ -1137,6 +1244,12 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
for source_group_name, source_group_vals in invalid_files_dropped.items()
}

# add tablenumber details, appending None if not add_tablenumber
tablenumber_prepared = _set_tablenumber(
sources=evaluate_futures(column_names_and_types_gathered),
add_tablenumber=add_tablenumber,
).result()

results = {
source_group_name: [
dict(
@@ -1165,7 +1278,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
for source in source_group_vals
]
for source_group_name, source_group_vals in evaluate_futures(
column_names_and_types_gathered
tablenumber_prepared
).items()
}

@@ -1273,6 +1386,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
infer_common_schema: bool = True,
drop_null: bool = False,
data_type_cast_map: Optional[Dict[str, str]] = None,
add_tablenumber: Optional[bool] = None,
page_keys: Optional[Dict[str, str]] = None,
sort_output: bool = True,
preset: Optional[str] = "cellprofiler_csv",
@@ -1322,6 +1436,11 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
add_tablenumber: Optional[bool]
Whether to add a calculated tablenumber which helps differentiate
various repeated values (such as ObjectNumber) within source data.
Useful for processing multiple SQLite or CSV data sources together
to retain distinction from each dataset.
page_keys: str:
The table and column names to be used for key pagination.
Uses the form: {"table_name":"column_name"}.
@@ -1462,6 +1581,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
infer_common_schema=infer_common_schema,
drop_null=drop_null,
data_type_cast_map=data_type_cast_map,
add_tablenumber=add_tablenumber,
sort_output=sort_output,
page_keys=cast(dict, page_keys),
**kwargs,
66 changes: 61 additions & 5 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
@@ -182,6 +182,7 @@ def _sqlite_mixed_type_query_to_parquet(
page_key: str,
pageset: Tuple[Union[int, float], Union[int, float]],
sort_output: bool,
tablenumber: Optional[int] = None,
) -> str:
"""
Performs SQLite table data extraction where one or many
@@ -201,6 +202,9 @@ def _sqlite_mixed_type_query_to_parquet(
Specifies whether to sort cytotable output or not.
add_cytotable_meta: bool, default=False:
Whether to add CytoTable metadata fields or not
tablenumber: Optional[int], default=None:
An optional table number to append to the results.
Defaults to None.
Returns:
pyarrow.Table:
@@ -256,9 +260,19 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:
# return the translated type for use in SQLite
return translated_type[0]

# build tablenumber segment addition (if necessary)
tablenumber_sql = (
# to become tablenumber in sql select later with integer
f"CAST({tablenumber} AS INTEGER) as TableNumber, "
if tablenumber is not None
# if we don't have a tablenumber value, don't introduce the column
else ""
)

# create cases for mixed-type handling in each column discovered above
query_parts = [
f"""
query_parts = tablenumber_sql + ", ".join(
[
f"""
CASE
/* when the storage class type doesn't match the column, return nulltype */
WHEN typeof({col['column_name']}) !=
@@ -267,13 +281,14 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:
ELSE {col['column_name']}
END AS {col['column_name']}
"""
for col in column_info
]
for col in column_info
]
)

# perform the select using the cases built above and using chunksize + offset
sql_stmt = f"""
SELECT
{', '.join(query_parts)}
{query_parts}
FROM {table_name}
WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}
{"ORDER BY " + page_key if sort_output else ""};
@@ -482,6 +497,47 @@ def _write_parquet_table_with_metadata(table: pa.Table, **kwargs) -> None:
)


def _gather_tablenumber_checksum(pathname: str, buffer_size: int = 1048576) -> int:
"""
Build and return a checksum for use as a unique identifier across datasets
referenced from cytominer-database:
https://github.com/cytomining/cytominer-database/blob/master/cytominer_database/ingest_variable_engine.py#L129
Args:
pathname: str:
A path to a file with which to generate the checksum on.
buffer_size: int:
Buffer size to use for reading data.
Returns:
int
an integer representing the checksum of the pathname file.
"""

import os
import zlib

# check whether the buffer size is larger than the file_size
file_size = os.path.getsize(pathname)
if file_size < buffer_size:
buffer_size = file_size

# open file
with open(str(pathname), "rb") as stream:
# begin result formation
result = zlib.crc32(bytes(0))
while True:
# read data from stream using buffer size
buffer = stream.read(buffer_size)
if not buffer:
# if we have no more data to use, break while loop
break
# use buffer read data to form checksum
result = zlib.crc32(buffer, result)

return result & 0xFFFFFFFF


def _unwrap_value(val: Union[parsl.dataflow.futures.AppFuture, Any]) -> Any:
"""
Helper function to unwrap futures from values or return values
1 change: 1 addition & 0 deletions docs/source/architecture.data.md
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ Data are organized into tables of generally two categories:

Identifying or key fields for image and compartment tables may include the following:

- __TableNumber__: Provides a unique number based on the file referenced to build CytoTable output to help distinguish from repeated values in ImageNumber, ObjectNumber or other metadata columns which are referenced. Typically useful when using multiple SQLite or CSV-based source datasets.
- __ImageNumber__: Provides specificity on what image is being referenced (there may be many).
- __ObjectNumber__: Provides specificity for a specific compartment object within an ImageNumber.
- __Parent_Cells__: Provides a related Cell compartment ObjectNumber. This field is canonically referenced from the Cytoplasm compartment for joining Cytoplasm and Cell compartment data. (see [Cytoplasm Compartment Data Relationships](architecture.data.md#cytoplasm-compartment-data-relationships) below for greater detail)
4 changes: 4 additions & 0 deletions docs/source/python-api.md
Original file line number Diff line number Diff line change
@@ -45,6 +45,10 @@ Convert
|
.. autofunction:: _set_tablenumber
|
.. autofunction:: _prepend_column_name
|
97 changes: 5 additions & 92 deletions poetry.lock
92 changes: 92 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -214,6 +214,98 @@ def cytominerdatabase_to_pycytominer_merge_single_cells_parquet(
return output_paths


@pytest.fixture()
def cytominerdatabase_to_manual_join_parquet(
fx_tempdir: str,
cytominerdatabase_sqlite_static: List[str],
) -> List[str]:
"""
Processed cytominer-database test sqlite data as
pycytominer merged single cell parquet files
"""

output_paths = []
for sqlite_file in cytominerdatabase_sqlite_static:
destination_path = (
f"{fx_tempdir}/manual_join.{pathlib.Path(sqlite_file).name}.parquet"
)
df_cytominerdatabase = (
pd.read_sql(
sql="""
WITH Image_Filtered AS (
SELECT
TableNumber,
ImageNumber
FROM
Image
),
/* gather unique objectnumber column names from each
compartment so as to retain differentiation */
Cytoplasm_renamed AS (
SELECT
ObjectNumber AS Cytoplasm_ObjectNumber,
*
FROM Cytoplasm
),
Cells_renamed AS (
SELECT
ObjectNumber AS Cells_ObjectNumber,
*
FROM Cells
),
Nuclei_renamed AS (
SELECT
ObjectNumber AS Nuclei_ObjectNumber,
*
FROM Nuclei
)
SELECT *
FROM Cytoplasm_renamed cytoplasm
LEFT JOIN Cells_renamed cells ON
cells.ImageNumber = cytoplasm.ImageNumber
AND cells.TableNumber = cytoplasm.TableNumber
AND cells.Cells_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Cells
LEFT JOIN Nuclei_renamed nuclei ON
nuclei.ImageNumber = cytoplasm.ImageNumber
AND nuclei.TableNumber = cytoplasm.TableNumber
AND nuclei.Nuclei_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Nuclei
LEFT JOIN Image_Filtered image ON
image.ImageNumber = cytoplasm.ImageNumber
AND image.TableNumber = cytoplasm.TableNumber
""",
con=sqlite_file,
)
# replacing 'nan' strings with None
.replace(to_replace="nan", value=None)
# renaming columns as appropriate
.rename(
columns={
"ImageNumber": "Metadata_ImageNumber",
"TableNumber": "Metadata_TableNumber",
"Cytoplasm_Parent_Cells": "Metadata_Cytoplasm_Parent_Cells",
"Cytoplasm_Parent_Nuclei": "Metadata_Cytoplasm_Parent_Nuclei",
"Cells_Parent_Nuclei": "Metadata_Cells_Parent_Nuclei",
}
# drop generic objectnumber column gathered from each compartment
# (we'll rely on the compartment prefixed name instead for comparisons)
).drop(columns="ObjectNumber")
)

# drop duplicate column names
df_cytominerdatabase = df_cytominerdatabase.loc[
:, ~df_cytominerdatabase.columns.duplicated()
].copy()

# sort the columns and export to parquet
df_cytominerdatabase[
sorted(sorted(df_cytominerdatabase.columns.tolist()), key=_column_sort)
].to_parquet(destination_path)

output_paths.append(destination_path)

return output_paths


@pytest.fixture(name="example_tables")
def fixture_example_tables() -> Tuple[pa.Table, ...]:
"""
78 changes: 78 additions & 0 deletions tests/test_convert_threaded.py
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@


import pathlib
from typing import List

import pandas as pd
import pyarrow as pa
@@ -161,6 +162,83 @@ def test_get_source_filepaths(
assert len(set(single_dir_result.keys())) == 4


def test_gather_tablenumber(
load_parsl_threaded: None,
fx_tempdir: str,
data_dirs_cytominerdatabase: List[str],
cytominerdatabase_to_manual_join_parquet: List[str],
):
"""
Tests _gather_tablenumber
"""

for unprocessed_cytominerdatabase, processed_cytominerdatabase in zip(
data_dirs_cytominerdatabase, cytominerdatabase_to_manual_join_parquet
):
test_table = parquet.read_table(
source=convert(
source_path=unprocessed_cytominerdatabase,
dest_path=(
f"{fx_tempdir}/{pathlib.Path(unprocessed_cytominerdatabase).name}.test_table.parquet"
),
dest_datatype="parquet",
source_datatype="csv",
join=True,
joins="""
WITH Image_Filtered AS (
SELECT
Metadata_TableNumber,
Metadata_ImageNumber
FROM
read_parquet('image.parquet')
)
SELECT
image.*,
cytoplasm.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber),
nuclei.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber),
cells.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber)
FROM
read_parquet('cytoplasm.parquet') AS cytoplasm
LEFT JOIN read_parquet('cells.parquet') AS cells ON
cells.Metadata_TableNumber = cells.Metadata_TableNumber
AND cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND cells.Cells_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells
LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON
nuclei.Metadata_TableNumber = nuclei.Metadata_TableNumber
AND nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND nuclei.Nuclei_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei
LEFT JOIN Image_Filtered AS image ON
image.Metadata_TableNumber = cytoplasm.Metadata_TableNumber
AND image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
""",
preset="cell-health-cellprofiler-to-cytominer-database",
)
)
control_table = parquet.read_table(source=processed_cytominerdatabase)

control_unique_tablenumbers = pc.unique(control_table["Metadata_TableNumber"])

# use pandas to assert a test of equality to help with differences in how
# data may be rounded by CytoTable vs cytominer-database (which use different data parsers
# and related conversions).
# See here for more information: https://github.com/cytomining/CytoTable/issues/187
pd.testing.assert_frame_equal(
test_table.filter(
# we use only those tablenumbers which appear in cytominer-database related results
# to help compare. CytoTable only removes datasets which have no image table whereas
# cytominer-database removes any dataset which has no image table or problematic
# compartment tables (any compartment table with errors triggers the entire dataset
# being removed).
pc.field("Metadata_TableNumber").isin(control_unique_tablenumbers)
)
.sort_by([(name, "ascending") for name in test_table.column_names])
.to_pandas(),
control_table.sort_by(
[(name, "ascending") for name in control_table.column_names]
).to_pandas(),
)


def test_avoid_na_row_output(
load_parsl_threaded: None, fx_tempdir: str, data_dir_cellprofiler: str
):