From 496ff36bda83aa32d75907879c60371aa0b04997 Mon Sep 17 00:00:00 2001 From: Dave Bunten Date: Thu, 18 Apr 2024 12:09:29 -0600 Subject: [PATCH] Add order to limit and offset queries for deterministic results (#182) * add order to limit and offset queries * order sqlite based results --- cytotable/convert.py | 3 +++ cytotable/utils.py | 7 ++++++- tests/test_convert.py | 20 ++++++++++---------- tests/test_convert_threaded.py | 9 --------- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 4064526a..84972935 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -348,6 +348,8 @@ def _source_chunk_to_parquet( table=ddb_reader.execute( f""" {base_query} + /* order by all columns for deterministic output */ + ORDER BY ALL LIMIT {chunk_size} OFFSET {offset} """ ).arrow(), @@ -750,6 +752,7 @@ def _join_source_chunk( result = ddb_reader.execute( f""" {joins} + {"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""} LIMIT {chunk_size} OFFSET {offset} """ ).arrow() diff --git a/cytotable/utils.py b/cytotable/utils.py index 9789f42e..c93c95d1 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -257,7 +257,12 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: # perform the select using the cases built above and using chunksize + offset cursor.execute( - f'SELECT {", ".join(query_parts)} FROM {table_name} LIMIT {chunk_size} OFFSET {offset};' + f""" + SELECT {', '.join(query_parts)} + FROM {table_name} + ORDER BY {', '.join([col['column_name'] for col in column_info])} + LIMIT {chunk_size} OFFSET {offset}; + """ ) # collect the results and include the column name with values results = [ diff --git a/tests/test_convert.py b/tests/test_convert.py index 16a03d5b..07598b8e 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -417,10 +417,10 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): assert result_table.equals( other=pa.Table.from_pydict( { - "id1": [1, 2], - "id2": ["a", "a"], - "field1": ["foo", "bar"], - "field2": [True, False], + "field1": ["foo", "foo"], + "field2": [True, True], + "id1": [1, 1], + "id2": ["a", "b"], }, # use schema from result as a reference for col order schema=result_table.schema, @@ -977,10 +977,10 @@ def test_sqlite_mixed_type_query_to_parquet( ] # check the values per column assert parquet.read_table(source=result_filepath).to_pydict() == { - "col_integer": [1, None], + "col_integer": [None, 1], "col_text": ["sample", "sample"], - "col_blob": [b"sample_blob", b"another_blob"], - "col_real": [0.5, None], + "col_blob": [b"another_blob", b"sample_blob"], + "col_real": [None, 0.5], } # run full convert on mixed type database @@ -997,10 +997,10 @@ def test_sqlite_mixed_type_query_to_parquet( assert parquet.read_table( source=result["Tbl_a.sqlite"][0]["table"][0] ).to_pydict() == { - "Tbl_a_col_integer": [1, None], + "Tbl_a_col_integer": [None, 1], "Tbl_a_col_text": ["sample", "sample"], - "Tbl_a_col_blob": [b"sample_blob", b"another_blob"], - "Tbl_a_col_real": [0.5, None], + "Tbl_a_col_blob": [b"another_blob", b"sample_blob"], + "Tbl_a_col_real": [None, 0.5], } diff --git a/tests/test_convert_threaded.py b/tests/test_convert_threaded.py index b62d1370..88ca7905 100644 --- a/tests/test_convert_threaded.py +++ b/tests/test_convert_threaded.py @@ -16,7 +16,6 @@ from pyarrow import parquet from cytotable.convert import convert -from cytotable.presets import config from cytotable.sources import _get_source_filepaths @@ -125,11 +124,6 @@ def test_convert_s3_path_sqlite( race conditions with nested pytest fixture post-yield deletions. """ - # create a modified join sql for deterministic comparisons - modified_joins = ( - str(config["cellprofiler_sqlite_pycytominer"]["CONFIG_JOINS"]) + " ORDER BY ALL" - ) - # local sqlite read local_cytotable_table = parquet.read_table( source=convert( @@ -141,7 +135,6 @@ def test_convert_s3_path_sqlite( dest_datatype="parquet", chunk_size=100, preset="cellprofiler_sqlite_pycytominer", - joins=modified_joins, ) ) @@ -161,7 +154,6 @@ def test_convert_s3_path_sqlite( # sequential s3 SQLite files. See below for more information # https://cloudpathlib.drivendata.org/stable/caching/#automatically local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/1", - joins=modified_joins, ) ) @@ -181,7 +173,6 @@ def test_convert_s3_path_sqlite( # sequential s3 SQLite files. See below for more information # https://cloudpathlib.drivendata.org/stable/caching/#automatically local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/2", - joins=modified_joins, ) )