Skip to content

Commit

Permalink
Add SQLite mixed-type compatibility for _get_table_columns_and_types (#…
Browse files Browse the repository at this point in the history
…82)

* add mixed-type capabilities for column queries

* add a test for more thorough checks

* comments towards chunk_size and sql limits

* sql typo fix
  • Loading branch information
d33bs authored Aug 2, 2023
1 parent e610b85 commit 03f3428
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 38 deletions.
71 changes: 57 additions & 14 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]

import pathlib

from cytotable.utils import _duckdb_reader
import duckdb

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

source_path = source["source_path"]
source_type = str(pathlib.Path(source_path).suffix).lower()
Expand All @@ -47,14 +49,16 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]
else f"sqlite_scan('{source_path}', '{source['table_name']}')"
)

# query top 5 results from table and use pragma_storage_info() to
# gather duckdb interpreted data typing
select_query = f"""
# Query top 5 results from table and use pragma_storage_info() to
# gather duckdb interpreted data typing. We gather 5 values for
# each column to help with type inferences (where smaller sets
# may yield lower data type accuracy for the full table).
select_query = """
/* we create an in-mem table for later use with the pragma_storage_info call
as this call only functions with materialized tables and not views or related */
CREATE TABLE column_details AS
(SELECT *
FROM {select_source}
FROM &select_source
LIMIT 5
);
Expand All @@ -64,13 +68,47 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]
column_name,
segment_type as column_dtype
FROM pragma_storage_info('column_details')
/* avoid duplicate entries in the form of VALIDITY segment_types */
WHERE segment_type != 'VALIDITY';
"""

# perform the query and create a list of dictionaries with the column data for table
return _duckdb_reader().execute(select_query).arrow().to_pylist()
# attempt to read the data to parquet from duckdb
# with exception handling to read mixed-type data
# using sqlite3 and special utility function
try:
# isolate using new connection to read data with chunk size + offset
# and export directly to parquet via duckdb (avoiding need to return data to python)
# perform the query and create a list of dictionaries with the column data for table
return (
_duckdb_reader()
.execute(select_query.replace("&select_source", select_source))
.arrow()
.to_pylist()
)

except duckdb.Error as e:
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if "Mismatch Type Error" in str(e) and source_type == ".sqlite":
arrow_data_tbl = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
# chunk size is set to 5 as a limit similar
# to above SQL within select_query variable
chunk_size=5,
# offset is set to 0 start at first row
# result from table
offset=0,
)
return (
_duckdb_reader()
.execute(select_query.replace("&select_source", "arrow_data_tbl"))
.arrow()
.to_pylist()
)
else:
raise


@python_app
Expand Down Expand Up @@ -238,6 +276,7 @@ def _source_chunk_to_parquet(

import duckdb
from cloudpathlib import AnyPath
from pyarrow import parquet

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

Expand Down Expand Up @@ -292,13 +331,17 @@ def _source_chunk_to_parquet(
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
):
result_filepath = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
result_filepath=result_filepath,
parquet.write_table(
table=_sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
),
where=result_filepath,
)
else:
raise

# return the filepath for the chunked output file
return result_filepath
Expand Down
20 changes: 5 additions & 15 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,11 @@ def _sqlite_mixed_type_query_to_parquet(
table_name: str,
chunk_size: int,
offset: int,
result_filepath: str,
) -> str:
"""
Performs SQLite table data extraction where one or many
columns include data values of potentially mismatched type
such that the data may be exported to Arrow and a Parquet file.
such that the data may be exported to Arrow for later use.
Args:
source_path: str:
Expand All @@ -178,17 +177,14 @@ def _sqlite_mixed_type_query_to_parquet(
Row count to use for chunked output.
offset: int:
The offset for chunking the data from source.
dest_path: str:
Path to store the output data.
Returns:
str:
The resulting filepath for the table exported to parquet.
pyarrow.Table:
The resulting arrow table for the data
"""
import sqlite3

import pyarrow as pa
import pyarrow.parquet as parquet

# open sqlite3 connection
with sqlite3.connect(source_path) as conn:
Expand Down Expand Up @@ -234,14 +230,8 @@ def _sqlite_mixed_type_query_to_parquet(
for row in cursor.fetchall()
]

# write results to a parquet file
parquet.write_table(
table=pa.Table.from_pylist(results),
where=result_filepath,
)

# return filepath
return result_filepath
# return arrow table with results
return pa.Table.from_pylist(results)


def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
Expand Down
40 changes: 31 additions & 9 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,36 +970,58 @@ def test_sqlite_mixed_type_query_to_parquet(
# run a more nuanced query through sqlite
# to handle the mixed types
if "Mismatch Type Error" in str(duckdb_exc):
result = _sqlite_mixed_type_query_to_parquet(
source_path=example_sqlite_mixed_types_database,
table_name=table_name,
chunk_size=2,
offset=0,
result_filepath=result_filepath,
parquet.write_table(
table=_sqlite_mixed_type_query_to_parquet(
source_path=example_sqlite_mixed_types_database,
table_name=table_name,
chunk_size=2,
offset=0,
),
where=result_filepath,
)

# check schema names
assert parquet.read_schema(where=result).names == [
assert parquet.read_schema(where=result_filepath).names == [
"col_integer",
"col_text",
"col_blob",
"col_real",
]
# check schema types
assert parquet.read_schema(where=result).types == [
assert parquet.read_schema(where=result_filepath).types == [
pa.int64(),
pa.string(),
pa.binary(),
pa.float64(),
]
# check the values per column
assert parquet.read_table(source=result).to_pydict() == {
assert parquet.read_table(source=result_filepath).to_pydict() == {
"col_integer": [1, None],
"col_text": ["sample", "sample"],
"col_blob": [b"sample_blob", b"another_blob"],
"col_real": [0.5, None],
}

# run full convert on mixed type database
result = convert(
source_path=example_sqlite_mixed_types_database,
dest_path=result_filepath,
dest_datatype="parquet",
source_datatype="sqlite",
compartments=[table_name],
join=False,
)

# assert that the single table result looks like the following dictionary
assert parquet.read_table(
source=result["Tbl_a.sqlite"][0]["table"][0]
).to_pydict() == {
"Tbl_a_col_integer": [1, None],
"Tbl_a_col_text": ["sample", "sample"],
"Tbl_a_col_blob": [b"sample_blob", b"another_blob"],
"Tbl_a_col_real": [0.5, None],
}


def test_convert_hte_cellprofiler_csv(
get_tempdir: str,
Expand Down

0 comments on commit 03f3428

Please sign in to comment.