Skip to content

Commit

Permalink
Add order to limit and offset queries for deterministic results (#182)
Browse files Browse the repository at this point in the history
* add order to limit and offset queries

* order sqlite based results
  • Loading branch information
d33bs authored Apr 18, 2024
1 parent 327a2d8 commit 496ff36
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 20 deletions.
3 changes: 3 additions & 0 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ def _source_chunk_to_parquet(
table=ddb_reader.execute(
f"""
{base_query}
/* order by all columns for deterministic output */
ORDER BY ALL
LIMIT {chunk_size} OFFSET {offset}
"""
).arrow(),
Expand Down Expand Up @@ -750,6 +752,7 @@ def _join_source_chunk(
result = ddb_reader.execute(
f"""
{joins}
{"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""}
LIMIT {chunk_size} OFFSET {offset}
"""
).arrow()
Expand Down
7 changes: 6 additions & 1 deletion cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:

# perform the select using the cases built above and using chunksize + offset
cursor.execute(
f'SELECT {", ".join(query_parts)} FROM {table_name} LIMIT {chunk_size} OFFSET {offset};'
f"""
SELECT {', '.join(query_parts)}
FROM {table_name}
ORDER BY {', '.join([col['column_name'] for col in column_info])}
LIMIT {chunk_size} OFFSET {offset};
"""
)
# collect the results and include the column name with values
results = [
Expand Down
20 changes: 10 additions & 10 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,10 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str):
assert result_table.equals(
other=pa.Table.from_pydict(
{
"id1": [1, 2],
"id2": ["a", "a"],
"field1": ["foo", "bar"],
"field2": [True, False],
"field1": ["foo", "foo"],
"field2": [True, True],
"id1": [1, 1],
"id2": ["a", "b"],
},
# use schema from result as a reference for col order
schema=result_table.schema,
Expand Down Expand Up @@ -977,10 +977,10 @@ def test_sqlite_mixed_type_query_to_parquet(
]
# check the values per column
assert parquet.read_table(source=result_filepath).to_pydict() == {
"col_integer": [1, None],
"col_integer": [None, 1],
"col_text": ["sample", "sample"],
"col_blob": [b"sample_blob", b"another_blob"],
"col_real": [0.5, None],
"col_blob": [b"another_blob", b"sample_blob"],
"col_real": [None, 0.5],
}

# run full convert on mixed type database
Expand All @@ -997,10 +997,10 @@ def test_sqlite_mixed_type_query_to_parquet(
assert parquet.read_table(
source=result["Tbl_a.sqlite"][0]["table"][0]
).to_pydict() == {
"Tbl_a_col_integer": [1, None],
"Tbl_a_col_integer": [None, 1],
"Tbl_a_col_text": ["sample", "sample"],
"Tbl_a_col_blob": [b"sample_blob", b"another_blob"],
"Tbl_a_col_real": [0.5, None],
"Tbl_a_col_blob": [b"another_blob", b"sample_blob"],
"Tbl_a_col_real": [None, 0.5],
}


Expand Down
9 changes: 0 additions & 9 deletions tests/test_convert_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from pyarrow import parquet

from cytotable.convert import convert
from cytotable.presets import config
from cytotable.sources import _get_source_filepaths


Expand Down Expand Up @@ -125,11 +124,6 @@ def test_convert_s3_path_sqlite(
race conditions with nested pytest fixture post-yield deletions.
"""

# create a modified join sql for deterministic comparisons
modified_joins = (
str(config["cellprofiler_sqlite_pycytominer"]["CONFIG_JOINS"]) + " ORDER BY ALL"
)

# local sqlite read
local_cytotable_table = parquet.read_table(
source=convert(
Expand All @@ -141,7 +135,6 @@ def test_convert_s3_path_sqlite(
dest_datatype="parquet",
chunk_size=100,
preset="cellprofiler_sqlite_pycytominer",
joins=modified_joins,
)
)

Expand All @@ -161,7 +154,6 @@ def test_convert_s3_path_sqlite(
# sequential s3 SQLite files. See below for more information
# https://cloudpathlib.drivendata.org/stable/caching/#automatically
local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/1",
joins=modified_joins,
)
)

Expand All @@ -181,7 +173,6 @@ def test_convert_s3_path_sqlite(
# sequential s3 SQLite files. See below for more information
# https://cloudpathlib.drivendata.org/stable/caching/#automatically
local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/2",
joins=modified_joins,
)
)

Expand Down

0 comments on commit 496ff36

Please sign in to comment.