Skip to content

Commit

Permalink
fix(import-pipeline): dict and bytes mixup (#28525)
Browse files Browse the repository at this point in the history
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and greptile-apps[bot] authored Feb 11, 2025
1 parent 69285cd commit 8e2f6fc
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def sql_table(
metadata: Optional[MetaData] = None,
incremental: Optional[dlt.sources.incremental[Any]] = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
backend: TableBackend = "sqlalchemy",
backend: TableBackend = "pyarrow",
detect_precision_hints: Optional[bool] = None,
reflection_level: Optional[ReflectionLevel] = "full",
defer_table_reflect: Optional[bool] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ def row_tuples_to_arrow(rows: Sequence[RowAny], columns: TTableSchemaColumns, tz
[None if x is not None and math.isnan(x) else x for x in columnar_known_types[field.name]]
)

if issubclass(py_type, bytes) or issubclass(py_type, str):
# For bytes/str columns, ensure any dict values are serialized to JSON strings
# Convert to numpy array after processing
processed_values = [
None if x is None else json_dumps(x) if isinstance(x, dict | list) else x
for x in columnar_known_types[field.name]
]
columnar_known_types[field.name] = np.array(processed_values, dtype=object)

# If there are unknown type columns, first create a table to infer their types
if columnar_unknown_types:
new_schema_fields = []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
import pyarrow as pa
from posthog.temporal.data_imports.pipelines.sql_database.arrow_helpers import json_dumps
from posthog.temporal.data_imports.pipelines.sql_database.arrow_helpers import json_dumps, row_tuples_to_arrow
from dlt.common.json import json


Expand All @@ -20,3 +20,18 @@ def test_handle_large_integers():
json_str_array = pa.array([None if s is None else json_dumps(s) for s in [{"a": -(2**64)}]])
loaded = json.loads(json_str_array[0].as_py())
assert loaded["a"] == float(-(2**64))


def test_row_tuples_to_arrow_string_column_with_dict():
# Test that row_tuples_to_arrow properly serializes dictionaries in string columns
test_dict = {"key": "value"}
rows = [("",), (test_dict,)]
columns = {"string_col": {"name": "string_col", "data_type": "text", "nullable": True}}

# This should now succeed and serialize the dictionary to JSON
table = row_tuples_to_arrow(rows, columns, "UTC") # type: ignore

# Verify the results
assert table.column("string_col")[0].as_py() == ""
# The dictionary should be serialized to a JSON string
assert json.loads(table.column("string_col")[1].as_py()) == test_dict

0 comments on commit 8e2f6fc

Please sign in to comment.