Skip to content

Commit

Permalink
SparkDataFrame / SparkConnectDataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim Mityutko committed Sep 3, 2024
1 parent 5819ed9 commit d368b3b
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/koheesio/integrations/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
from boxsdk.object.file import File
from boxsdk.object.folder import Folder

from pyspark.sql import DataFrame
from pyspark.sql.functions import expr, lit
from pyspark.sql.types import StructType

from koheesio.spark import DataFrame
from koheesio import Step, StepOutput
from koheesio.models import (
Field,
Expand Down
20 changes: 10 additions & 10 deletions src/koheesio/spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@

from __future__ import annotations

from typing import Optional
from typing import Optional, Union
from abc import ABC

from pydantic import Field

from pyspark.sql import Column
from pyspark.sql import DataFrame as PySparkSQLDataFrame
from pyspark.sql import SparkSession as OriginalSparkSession
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

try:
from pyspark.sql.utils import AnalysisException as SparkAnalysisException
from pyspark.sql.utils import AnalysisException
except ImportError:
from pyspark.errors.exceptions.base import AnalysisException as SparkAnalysisException
from pyspark.errors.exceptions.base import AnalysisException

from koheesio import Step, StepOutput
from koheesio.spark.utils import get_spark_minor_version

# TODO: Move to spark/__init__.py after reorganizing the code
# Will be used for typing checks and consistency, specifically for PySpark >=3.5
DataFrame = PySparkSQLDataFrame
SparkSession = OriginalSparkSession
AnalysisException = SparkAnalysisException
if get_spark_minor_version() >= 3.5:
from pyspark.sql.connect.session import DataFrame as SparkConnectDataFrame

DataFrame = Union[SparkDataFrame, SparkConnectDataFrame]


class SparkStep(Step, ABC):
Expand Down
4 changes: 1 addition & 3 deletions src/koheesio/spark/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
from typing import Dict, List, Optional, Union

from py4j.protocol import Py4JJavaError # type: ignore

from pyspark.sql import DataFrame
from pyspark.sql.types import DataType

from koheesio.models import Field, field_validator, model_validator
from koheesio.spark import AnalysisException, SparkStep
from koheesio.spark import AnalysisException, SparkStep, DataFrame
from koheesio.spark.utils import on_databricks


Expand Down
3 changes: 1 addition & 2 deletions src/koheesio/spark/etl_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

from datetime import datetime

from pyspark.sql import DataFrame

from koheesio import Step
from koheesio.models import Field, InstanceOf, conlist
from koheesio.spark import DataFrame
from koheesio.spark.readers import Reader
from koheesio.spark.transformations import Transformation
from koheesio.spark.writers import Writer
Expand Down
6 changes: 3 additions & 3 deletions src/koheesio/spark/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
"""

import json
from typing import Any, Dict, List, Optional, Set, Union
from abc import ABC
from copy import deepcopy
from textwrap import dedent
from typing import Any, Dict, List, Optional, Set, Union

from pyspark.sql import DataFrame, Window
from pyspark.sql import Window
from pyspark.sql import functions as f
from pyspark.sql import types as t

Expand All @@ -61,7 +61,7 @@
field_validator,
model_validator,
)
from koheesio.spark import SparkStep
from koheesio.spark import SparkStep, DataFrame
from koheesio.spark.delta import DeltaTableStep
from koheesio.spark.readers.delta import DeltaTableReader, DeltaTableStreamReader
from koheesio.spark.readers.jdbc import JdbcReader
Expand Down

0 comments on commit d368b3b

Please sign in to comment.