From 8ae0db933332a6e41da38b5030b820792130c2fb Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 21 Feb 2025 16:54:37 +0100 Subject: [PATCH] fix(data-warehouse): Handle IP addresses (#29064) --- .../test/__snapshots__/test_cluster.ambr | 6 ++--- .../pipeline/test/test_pipeline_utils.py | 27 +++++++++++++++++++ .../data_imports/pipelines/pipeline/utils.py | 9 +++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/posthog/clickhouse/test/__snapshots__/test_cluster.ambr b/posthog/clickhouse/test/__snapshots__/test_cluster.ambr index 360cbcb73dbf8..74d21e08289bc 100644 --- a/posthog/clickhouse/test/__snapshots__/test_cluster.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_cluster.ambr @@ -3,20 +3,20 @@ ''' 1 future(s) did not return a result: - * HostInfo(connection_info=ConnectionInfo(address='172.18.0.17', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Syntax error: failed at position 1 ('invalid'): invalid query. Expected one of: Query, Query with output, EXPLAIN, EXPLAIN, SELECT query, possibly with UNION, list of union elements, SELECT query, subquery, possibly with UNI... + * HostInfo(connection_info=ConnectionInfo(address='127.0.0.1', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Syntax error: failed at position 1 ('invalid'): invalid query. Expected one of: Query, Query with output, EXPLAIN, EXPLAIN, SELECT query, possibly with UNION, list of union elements, SELECT query, subquery, possibly with UNI... ''' # --- # name: test_exception_summary.1 ''' 1 future(s) did not return a result: - * HostInfo(connection_info=ConnectionInfo(address='172.18.0.17', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Unknown table expression identifier 'invalid_table_name' in scope SELECT * FROM invalid_table_name. Stack trace:\n\n0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000000000000\n1. DB::Exceptio... + * HostInfo(connection_info=ConnectionInfo(address='127.0.0.1', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Unknown table expression identifier 'invalid_table_name' in scope SELECT * FROM invalid_table_name. Stack trace:\n\n0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000000000000\n1. DB::Exceptio... ''' # --- # name: test_exception_summary.2 ''' 1 future(s) did not return a result: - * HostInfo(connection_info=ConnectionInfo(address='172.18.0.17', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ValueError('custom error') + * HostInfo(connection_info=ConnectionInfo(address='127.0.0.1', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ValueError('custom error') ''' # --- diff --git a/posthog/temporal/data_imports/pipelines/pipeline/test/test_pipeline_utils.py b/posthog/temporal/data_imports/pipelines/pipeline/test/test_pipeline_utils.py index 8b6ba401a65d1..ba93086583936 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline/test/test_pipeline_utils.py +++ b/posthog/temporal/data_imports/pipelines/pipeline/test/test_pipeline_utils.py @@ -1,3 +1,4 @@ +from ipaddress import IPv4Address, IPv6Address from dateutil import parser import decimal import uuid @@ -219,3 +220,29 @@ def test_table_from_py_list_with_schema_and_too_small_decimal_type(): ) ) assert table.schema.equals(expected_schema) + + +def test_table_from_py_list_with_ipv4_address(): + table = table_from_py_list([{"column": IPv4Address("127.0.0.1")}]) + + assert table.equals(pa.table({"column": ["127.0.0.1"]})) + assert table.schema.equals( + pa.schema( + [ + ("column", pa.string()), + ] + ) + ) + + +def test_table_from_py_list_with_ipv6_address(): + table = table_from_py_list([{"column": IPv6Address("::1")}]) + + assert table.equals(pa.table({"column": ["::1"]})) + assert table.schema.equals( + pa.schema( + [ + ("column", pa.string()), + ] + ) + ) diff --git a/posthog/temporal/data_imports/pipelines/pipeline/utils.py b/posthog/temporal/data_imports/pipelines/pipeline/utils.py index 475a38ec4af92..3dcb8ae153e38 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline/utils.py +++ b/posthog/temporal/data_imports/pipelines/pipeline/utils.py @@ -1,4 +1,5 @@ import decimal +from ipaddress import IPv4Address, IPv6Address import json from collections.abc import Sequence import math @@ -549,6 +550,14 @@ def _convert_to_decimal_or_none(x: decimal.Decimal | float | None) -> decimal.De if arrow_schema: arrow_schema = arrow_schema.set(field_index, arrow_schema.field(field_index).with_type(pa.string())) + # Convert IP types to string + if issubclass(py_type, IPv4Address | IPv6Address): + str_array = pa.array([None if s is None else str(s) for s in columnar_table_data[field_name].tolist()]) + columnar_table_data[field_name] = str_array + py_type = str + if arrow_schema: + arrow_schema = arrow_schema.set(field_index, arrow_schema.field(field_index).with_type(pa.string())) + # Remove any binary columns if issubclass(py_type, bytes): drop_column_names.append(field_name)