Skip to content

Commit

Permalink
fix(data-warehouse): Handle IP addresses (PostHog#29064)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Feb 21, 2025
1 parent 6eed2f6 commit 8ae0db9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
6 changes: 3 additions & 3 deletions posthog/clickhouse/test/__snapshots__/test_cluster.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@
'''
1 future(s) did not return a result:

* HostInfo(connection_info=ConnectionInfo(address='172.18.0.17', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Syntax error: failed at position 1 ('invalid'): invalid query. Expected one of: Query, Query with output, EXPLAIN, EXPLAIN, SELECT query, possibly with UNION, list of union elements, SELECT query, subquery, possibly with UNI...
* HostInfo(connection_info=ConnectionInfo(address='127.0.0.1', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Syntax error: failed at position 1 ('invalid'): invalid query. Expected one of: Query, Query with output, EXPLAIN, EXPLAIN, SELECT query, possibly with UNION, list of union elements, SELECT query, subquery, possibly with UNI...
'''
# ---
# name: test_exception_summary.1
'''
1 future(s) did not return a result:

* HostInfo(connection_info=ConnectionInfo(address='172.18.0.17', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Unknown table expression identifier 'invalid_table_name' in scope SELECT * FROM invalid_table_name. Stack trace:\n\n0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000000000000\n1. DB::Exceptio...
* HostInfo(connection_info=ConnectionInfo(address='127.0.0.1', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ServerException("DB::Exception: Unknown table expression identifier 'invalid_table_name' in scope SELECT * FROM invalid_table_name. Stack trace:\n\n0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000000000000\n1. DB::Exceptio...
'''
# ---
# name: test_exception_summary.2
'''
1 future(s) did not return a result:

* HostInfo(connection_info=ConnectionInfo(address='172.18.0.17', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ValueError('custom error')
* HostInfo(connection_info=ConnectionInfo(address='127.0.0.1', port=9000), shard_num=1, replica_num=1, host_cluster_type='online', host_cluster_role='data'): ValueError('custom error')
'''
# ---
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ipaddress import IPv4Address, IPv6Address
from dateutil import parser
import decimal
import uuid
Expand Down Expand Up @@ -219,3 +220,29 @@ def test_table_from_py_list_with_schema_and_too_small_decimal_type():
)
)
assert table.schema.equals(expected_schema)


def test_table_from_py_list_with_ipv4_address():
table = table_from_py_list([{"column": IPv4Address("127.0.0.1")}])

assert table.equals(pa.table({"column": ["127.0.0.1"]}))
assert table.schema.equals(
pa.schema(
[
("column", pa.string()),
]
)
)


def test_table_from_py_list_with_ipv6_address():
table = table_from_py_list([{"column": IPv6Address("::1")}])

assert table.equals(pa.table({"column": ["::1"]}))
assert table.schema.equals(
pa.schema(
[
("column", pa.string()),
]
)
)
9 changes: 9 additions & 0 deletions posthog/temporal/data_imports/pipelines/pipeline/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import decimal
from ipaddress import IPv4Address, IPv6Address
import json
from collections.abc import Sequence
import math
Expand Down Expand Up @@ -549,6 +550,14 @@ def _convert_to_decimal_or_none(x: decimal.Decimal | float | None) -> decimal.De
if arrow_schema:
arrow_schema = arrow_schema.set(field_index, arrow_schema.field(field_index).with_type(pa.string()))

# Convert IP types to string
if issubclass(py_type, IPv4Address | IPv6Address):
str_array = pa.array([None if s is None else str(s) for s in columnar_table_data[field_name].tolist()])
columnar_table_data[field_name] = str_array
py_type = str
if arrow_schema:
arrow_schema = arrow_schema.set(field_index, arrow_schema.field(field_index).with_type(pa.string()))

# Remove any binary columns
if issubclass(py_type, bytes):
drop_column_names.append(field_name)
Expand Down

0 comments on commit 8ae0db9

Please sign in to comment.