Skip to content

Commit

Permalink
fix(data-warehouse): Revert "fix: Salesforce source errors" (PostHog#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Feb 22, 2025
1 parent ec83a41 commit cb3fb59
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 279 deletions.
10 changes: 5 additions & 5 deletions ee/billing/billing_manager.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
from django.conf import settings
from django.db.models import F
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional, cast

import jwt
import requests
import structlog
from django.conf import settings
from django.db.models import F
from django.utils import timezone
from requests import JSONDecodeError
from rest_framework.exceptions import NotAuthenticated
from sentry_sdk import capture_message
from requests import JSONDecodeError # type: ignore[attr-defined]
from rest_framework.exceptions import NotAuthenticated
from posthog.exceptions_capture import capture_exception

from ee.billing.billing_types import BillingStatus
from ee.billing.quota_limiting import set_org_usage_summary, update_org_billing_quotas
from ee.models import License
from ee.settings import BILLING_SERVICE_URL
from posthog.cloud_utils import get_cached_instance_license
from posthog.exceptions_capture import capture_exception
from posthog.models import Organization
from posthog.models.organization import OrganizationMembership, OrganizationUsageInfo
from posthog.models.user import User
Expand Down
7 changes: 3 additions & 4 deletions posthog/api/test/test_exports.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime, timedelta
from typing import Optional
from unittest.mock import patch

from datetime import datetime, timedelta
import celery
import requests.exceptions
from boto3 import resource
Expand Down Expand Up @@ -398,7 +397,7 @@ def requests_side_effect(*args, **kwargs):

def raise_for_status():
if 400 <= response.status_code < 600:
raise requests.exceptions.HTTPError(response=response) # type: ignore[arg-type]
raise requests.exceptions.HTTPError(response=response)

response.raise_for_status = raise_for_status # type: ignore[attr-defined]
return response
Expand Down Expand Up @@ -502,7 +501,7 @@ def requests_side_effect(*args, **kwargs):

def raise_for_status():
if 400 <= response.status_code < 600:
raise requests.exceptions.HTTPError(response=response) # type: ignore[arg-type]
raise requests.exceptions.HTTPError(response=response)

response.raise_for_status = raise_for_status # type: ignore[attr-defined]
return response
Expand Down
26 changes: 14 additions & 12 deletions posthog/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import json
from django.http import HttpRequest
from rest_framework.decorators import action as drf_action
from functools import wraps
from posthog.api.documentation import extend_schema
import re
import socket
import urllib.parse
from enum import Enum, auto
from functools import wraps
from ipaddress import ip_address
from typing import Any, Literal, Optional, Union
from urllib.parse import urlparse

from requests.adapters import HTTPAdapter
from typing import Literal, Optional, Union, Any

from rest_framework.fields import Field
from urllib3 import HTTPSConnectionPool, HTTPConnectionPool, PoolManager
from uuid import UUID

import structlog
from django.core.exceptions import RequestDataTooBig
from django.db.models import QuerySet
from django.http import HttpRequest
from prometheus_client import Counter
from requests.adapters import HTTPAdapter
from rest_framework import request, serializers, status
from rest_framework.decorators import action as drf_action
from rest_framework import request, status, serializers
from rest_framework.exceptions import ValidationError
from rest_framework.fields import Field
from statshog.defaults.django import statsd
from urllib3 import HTTPConnectionPool, HTTPSConnectionPool, PoolManager

from posthog.api.documentation import extend_schema
from posthog.constants import EventDefinitionType
from posthog.exceptions import (
RequestParsingError,
Expand Down Expand Up @@ -363,13 +365,13 @@ def raise_if_connected_to_private_ip(conn):
class PublicIPOnlyHTTPConnectionPool(HTTPConnectionPool):
def _validate_conn(self, conn):
raise_if_connected_to_private_ip(conn)
super()._validate_conn(conn) # type: ignore[misc]
super()._validate_conn(conn)


class PublicIPOnlyHTTPSConnectionPool(HTTPSConnectionPool):
def _validate_conn(self, conn):
raise_if_connected_to_private_ip(conn)
super()._validate_conn(conn) # type: ignore[misc]
super()._validate_conn(conn)


class PublicIPOnlyHttpAdapter(HTTPAdapter):
Expand All @@ -388,7 +390,7 @@ def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
block=block,
**pool_kwargs,
)
self.poolmanager.pool_classes_by_scheme = { # type: ignore[attr-defined]
self.poolmanager.pool_classes_by_scheme = {
"http": PublicIPOnlyHTTPConnectionPool,
"https": PublicIPOnlyHTTPSConnectionPool,
}
Expand Down
18 changes: 9 additions & 9 deletions posthog/tasks/exports/test/test_csv_exporter.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from datetime import datetime
from io import BytesIO
from typing import Any, Optional
from unittest import mock
from unittest.mock import ANY, MagicMock, Mock, patch
from unittest.mock import MagicMock, Mock, patch, ANY
from dateutil.relativedelta import relativedelta

from freezegun import freeze_time
from openpyxl import load_workbook
from io import BytesIO
import pytest
from boto3 import resource
from botocore.client import Config
from dateutil.relativedelta import relativedelta
from django.test import override_settings
from django.utils.timezone import now
from freezegun import freeze_time
from openpyxl import load_workbook
from requests.exceptions import HTTPError

from posthog.hogql.constants import CSV_EXPORT_BREAKDOWN_LIMIT_INITIAL
from posthog.models import ExportedAsset
from posthog.models.utils import UUIDT
from posthog.settings import (
Expand All @@ -28,10 +27,11 @@
from posthog.tasks.exports import csv_exporter
from posthog.tasks.exports.csv_exporter import (
UnexpectedEmptyJsonResponse,
_convert_response_to_csv_data,
add_query_params,
_convert_response_to_csv_data,
)
from posthog.test.base import APIBaseTest, _create_event, _create_person, flush_persons_and_events
from posthog.hogql.constants import CSV_EXPORT_BREAKDOWN_LIMIT_INITIAL
from posthog.test.base import APIBaseTest, _create_event, flush_persons_and_events, _create_person
from posthog.test.test_journeys import journeys_for
from posthog.utils import absolute_uri

Expand Down Expand Up @@ -330,7 +330,7 @@ def test_failing_export_api_is_reported(self, _mock_logger: MagicMock) -> None:
def test_failing_export_api_is_reported_query_size_exceeded(self, _mock_logger: MagicMock) -> None:
with patch("posthog.tasks.exports.csv_exporter.make_api_call") as patched_make_api_call:
exported_asset = self._create_asset()
mock_error = HTTPError("Query size exceeded") # type: ignore[call-arg]
mock_error = HTTPError("Query size exceeded")
mock_error.response = Mock()
mock_error.response.text = "Query size exceeded"
patched_make_api_call.side_effect = mock_error
Expand Down
24 changes: 10 additions & 14 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,40 @@
import json
import re

import posthoganalytics
from django.db import close_old_connections
import posthoganalytics
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy


# TODO: remove dependency
from posthog.temporal.common.base import PostHogWorkflow
from posthog.temporal.common.logger import bind_temporal_worker_logger_sync
from posthog.temporal.data_imports.workflow_activities.check_billing_limits import (
CheckBillingLimitsActivityInputs,
check_billing_limits_activity,
)
from posthog.temporal.data_imports.workflow_activities.create_job_model import (
CreateExternalDataJobModelActivityInputs,
create_external_data_job_model_activity,
)
from posthog.temporal.data_imports.workflow_activities.import_data_sync import (
ImportDataActivityInputs,
import_data_activity_sync,
)
from posthog.temporal.data_imports.workflow_activities.import_data_sync import import_data_activity_sync
from posthog.temporal.data_imports.workflow_activities.sync_new_schemas import (
SyncNewSchemasActivityInputs,
sync_new_schemas_activity,
)
from posthog.temporal.utils import ExternalDataWorkflowInputs
from posthog.temporal.data_imports.workflow_activities.create_job_model import (
CreateExternalDataJobModelActivityInputs,
create_external_data_job_model_activity,
)
from posthog.temporal.data_imports.workflow_activities.import_data_sync import ImportDataActivityInputs
from posthog.utils import get_machine_id
from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source

from posthog.warehouse.external_data_source.jobs import (
update_external_job_status,
)
from posthog.warehouse.models import (
ExternalDataJob,
ExternalDataSource,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger_sync
from posthog.warehouse.models.external_data_schema import update_should_sync

Any_Source_Errors: list[str] = ["Could not establish session to SSH gateway"]
Expand Down Expand Up @@ -68,10 +68,6 @@
"No primary key defined for table",
"Access denied for user",
],
ExternalDataSource.Type.SALESFORCE: [
"400 Client Error: Bad Request for url",
"403 Client Error: Forbidden for url",
],
ExternalDataSource.Type.SNOWFLAKE: [
"This account has been marked for decommission",
"404 Not Found",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from ipaddress import IPv4Address, IPv6Address
from dateutil import parser
import decimal
import uuid
from ipaddress import IPv4Address, IPv6Address

import pyarrow as pa
import pytest
from dateutil import parser

from posthog.temporal.data_imports.pipelines.pipeline.utils import _get_max_decimal_type, table_from_py_list
from posthog.temporal.data_imports.pipelines.pipeline.utils import table_from_py_list


def test_table_from_py_list_uuid():
Expand Down Expand Up @@ -225,26 +222,6 @@ def test_table_from_py_list_with_schema_and_too_small_decimal_type():
assert table.schema.equals(expected_schema)


@pytest.mark.parametrize(
"decimals,expected",
[
([decimal.Decimal("1")], pa.decimal128(2, 1)),
([decimal.Decimal("1.001112")], pa.decimal128(7, 6)),
([decimal.Decimal("0.001112")], pa.decimal128(6, 6)),
([decimal.Decimal("1.0100000")], pa.decimal128(8, 7)),
# That is 1 followed by 37 zeroes to go over the pa.Decimal128 precision limit of 38.
([decimal.Decimal("10000000000000000000000000000000000000.1")], pa.decimal256(39, 1)),
],
)
def test_get_max_decimal_type_returns_correct_decimal_type(
decimals: list[decimal.Decimal],
expected: pa.Decimal128Type | pa.Decimal256Type,
):
"""Test whether expected PyArrow decimal type variant is returned."""
result = _get_max_decimal_type(decimals)
assert result == expected


def test_table_from_py_list_with_ipv4_address():
table = table_from_py_list([{"column": IPv4Address("127.0.0.1")}])

Expand Down
40 changes: 13 additions & 27 deletions posthog/temporal/data_imports/pipelines/pipeline/utils.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import decimal
from ipaddress import IPv4Address, IPv6Address
import json
from collections.abc import Sequence
import math
import uuid
from collections.abc import Hashable, Iterator, Sequence
from typing import Any, Optional

import deltalake as deltalake
import numpy as np
from collections.abc import Hashable
from collections.abc import Iterator
from dateutil import parser
import uuid
import orjson
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
from dateutil import parser
from django.db.models import F
from dlt.common.data_types.typing import TDataType
from dlt.common.libs.deltalake import ensure_delta_compatible_arrow_schema
from dlt.common.normalizers.naming.snake_case import NamingConvention
from dlt.sources import DltResource

import deltalake as deltalake
from django.db.models import F
from posthog.temporal.common.logger import FilteringBoundLogger
from dlt.common.data_types.typing import TDataType
from dlt.common.normalizers.naming.snake_case import NamingConvention
from posthog.temporal.data_imports.pipelines.pipeline.typings import SourceResponse
from posthog.warehouse.models import ExternalDataJob, ExternalDataSchema

Expand Down Expand Up @@ -329,29 +329,15 @@ def build_pyarrow_decimal_type(precision: int, scale: int) -> pa.Decimal128Type


def _get_max_decimal_type(values: list[decimal.Decimal]) -> pa.Decimal128Type | pa.Decimal256Type:
"""Determine maximum precision and scale from all `decimal.Decimal` values.
Returns:
A `pa.Decimal128Type` or `pa.Decimal256Type` with enough precision and
scale to hold all `values`.
"""
max_precision = 1
max_scale = 0

for value in values:
_, digits, exponent = value.as_tuple()
sign, digits, exponent = value.as_tuple()
if not isinstance(exponent, int):
continue

# This implementation accounts for leading zeroes being excluded from digits
# It is based on Arrow, see:
# https://github.com/apache/arrow/blob/main/python/pyarrow/src/arrow/python/decimal.cc#L75
if exponent < 0:
precision = max(len(digits), -exponent)
scale = -exponent
else:
precision = len(digits) + exponent
scale = 0
precision = len(digits)
scale = -exponent if exponent < 0 else 0

max_precision = max(precision, max_precision)
max_scale = max(scale, max_scale)
Expand Down
Loading

0 comments on commit cb3fb59

Please sign in to comment.