Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into add-tablenumbers
Browse files Browse the repository at this point in the history
  • Loading branch information
d33bs committed Oct 25, 2024
2 parents 22c5ee7 + 4e8c57f commit 01e92c3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 13 deletions.
26 changes: 15 additions & 11 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
data_type_cast_map: Optional[Dict[str, str]] = None,
add_tablenumber: Optional[bool] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
) -> Union[Dict[str, List[Dict[str, Any]]], List[Any], str]:
"""
Export data to parquet.
Expand Down Expand Up @@ -1364,15 +1364,19 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
).result()
]

# concat our join chunks together as one cohesive dataset
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=expanded_dest_path,
join_sources=[join.result() for join in join_sources_result],
sources=evaluated_results,
sort_output=sort_output,
)
if concat:
# concat our join chunks together as one cohesive dataset
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=expanded_dest_path,
join_sources=[join.result() for join in join_sources_result],
sources=evaluated_results,
sort_output=sort_output,
)
else:
# else we leave the joined chunks as-is and return them
return evaluate_futures(join_sources_result)

# wrap the final result as a future and return
return evaluate_futures(results)
Expand All @@ -1399,7 +1403,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
preset: Optional[str] = "cellprofiler_csv",
parsl_config: Optional[parsl.Config] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
) -> Union[Dict[str, List[Dict[str, Any]]], List[Any], str]:
"""
Convert file-based data from various sources to Pycytominer-compatible standards.
Expand Down
6 changes: 4 additions & 2 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,14 +593,16 @@ def _unwrap_source(
return _unwrap_value(source)


def evaluate_futures(sources: Union[Dict[str, List[Dict[str, Any]]], str]) -> Any:
def evaluate_futures(
sources: Union[Dict[str, List[Dict[str, Any]]], List[Any], str]
) -> Any:
"""
Evaluates any Parsl futures for use within other tasks.
This enables a pattern of Parsl app usage as "tasks" and delayed
future result evaluation for concurrency.
Args:
sources: Union[Dict[str, List[Dict[str, Any]]], str]
sources: Union[Dict[str, List[Dict[str, Any]]], List[Any], str]
Sources are an internal data structure used by CytoTable for
processing and organizing data results. They may include futures
which require asynchronous processing through Parsl, so we
Expand Down
44 changes: 44 additions & 0 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,50 @@ def test_convert_cellprofiler_sqlite(
assert test_result.equals(control_result)


def test_convert_cellprofiler_sqlite_join_with_no_concat(
load_parsl_default: None,
fx_tempdir: str,
data_dir_cellprofiler: str,
cellprofiler_merged_nf1data: pa.Table,
):
"""
Tests convert with cellprofiler sqlite exports
using joins but no concatenation.
"""

control_result = cellprofiler_merged_nf1data

# gather results as a joined list of chunk files which aren't concatenated
test_result_data = convert(
source_path=(
f"{data_dir_cellprofiler}/NF1_SchwannCell_data/all_cellprofiler.sqlite"
),
dest_path=f"{fx_tempdir}/NF1_data.parquet",
dest_datatype="parquet",
source_datatype="sqlite",
preset="cellprofiler_sqlite",
# explicitly set the chunk size to receive multiple chunk files
chunk_size=100,
join=True,
concat=False,
)

# read the result files as a single pyarrow table
test_result = parquet.ParquetDataset(path_or_paths=test_result_data).read()

# sort all values by the same columns
# we do this due to the potential for inconsistently ordered results
control_result = control_result.sort_by(
[(colname, "ascending") for colname in control_result.column_names]
)
test_result = test_result.sort_by(
[(colname, "ascending") for colname in test_result.column_names]
)

assert test_result.shape == control_result.shape
assert test_result.equals(control_result)


def test_convert_cellprofiler_csv(
load_parsl_default: None,
fx_tempdir: str,
Expand Down

0 comments on commit 01e92c3

Please sign in to comment.