Skip to content

Commit

Permalink
Ability to read Excel files (#37)
Browse files Browse the repository at this point in the history
- introduces pandas module
- abstracted Reader class to a common BaseReader (as part of models
module)
- re-implemented spark.readers.Reader based on BaseReader
- introduces ExcelReader at koheesio.pandas.readers.excel.ExcelReader
- introduces ExcelReader at koheesio.spark.reader.excel.ExcelReader
- added unittests to cover the above
- added excel extra dependency
- added docs

---------

Co-authored-by: Danny Meijer <[email protected]>
  • Loading branch information
dannymeijer and dannymeijer authored Jun 7, 2024
1 parent d7d3243 commit fc11f0e
Show file tree
Hide file tree
Showing 22 changed files with 416 additions and 85 deletions.
22 changes: 19 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,31 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
ref: ${{ github.head_ref }}
- name: Fetch main branch
run: git fetch origin main:main
- name: Check changes
id: check
run: |
echo "python_changed=$(git diff --name-only ${{ github.event.before }} ${{ github.event.after }} | grep '\.py$')" >> "$GITHUB_OUTPUT"
echo "toml_changed=$(git diff --name-only ${{ github.event.before }} ${{ github.event.after }} | grep '\.toml$')" >> "$GITHUB_OUTPUT"
# Set the base reference for the git diff
BASE_REF=${{ github.event.pull_request.base.ref || 'main' }}
# Check for changes in this PR / commit
git_diff_output=$(git diff --name-only $BASE_REF ${{ github.event.after }})
# Count the number of changes to Python and TOML files
python_changed=$(echo "$git_diff_output" | grep '\.py$' | wc -l)
toml_changed=$(echo "$git_diff_output" | grep '\.toml$' | wc -l)
# Write the changes to the GITHUB_OUTPUT environment file
echo "python_changed=$python_changed" >> $GITHUB_OUTPUT
echo "toml_changed=$toml_changed" >> $GITHUB_OUTPUT
tests:
needs: check_changes
if: needs.check_changes.outputs.python_changed != '' || needs.check_changes.outputs.toml_changed != '' || github.event_name == 'workflow_dispatch'
if: needs.check_changes.outputs.python_changed > 0 || needs.check_changes.outputs.toml_changed > 0 || github.event_name == 'workflow_dispatch'
name: Python ${{ matrix.python-version }} with PySpark ${{ matrix.pyspark-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }}
runs-on: ${{ matrix.os }}

Expand Down
20 changes: 9 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ se = ["spark-expectations>=2.1.0"]
# SFTP dependencies in to_csv line_iterator
sftp = ["paramiko>=2.6.0"]
delta = ["delta-spark>=2.2"]
excel = ["openpyxl>=3.0.0"]
dev = ["black", "isort", "ruff", "mypy", "pylint", "colorama", "types-PyYAML"]
test = [
"chispa",
Expand Down Expand Up @@ -175,6 +176,7 @@ features = [
"pyspark",
"sftp",
"delta",
"excel",
"se",
"box",
"dev",
Expand All @@ -184,16 +186,15 @@ features = [
[tool.hatch.envs.default.scripts]
# TODO: add scripts section based on Makefile
# TODO: add bandit
# TODO: move scripts from linting and style here
# Code Quality commands
black-check = "black --check --diff ."
black-fmt = "black ."
isort-check = "isort . --check --diff --color"
isort-fmt = "isort ."
ruff-check = "ruff check ."
ruff-fmt = "ruff check . --fix"
mypy-check = "mypy koheesio"
pylint-check = "pylint --output-format=colorized -d W0511 koheesio"
mypy-check = "mypy src"
pylint-check = "pylint --output-format=colorized -d W0511 src"
check = [
"- black-check",
"- isort-check",
Expand All @@ -213,15 +214,7 @@ non-spark-tests = "test -m \"not spark\""
# scripts.run = "- log-versions && pytest tests/ {env:HATCH_TEST_ARGS:} {args}"
# run ="echo {args}"
# run = "- pytest tests/ {env:HATCH_TEST_ARGS:} {args}"
# run-cov = "coverage run -m pytest{env:HATCH_TEST_ARGS:} {args}"
# cov-combine = "coverage combine"
# cov-report = "coverage report"
# log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark"
#
#
#
# coverage = "- pytest tests/ {env:HATCH_TEST_ARGS:} {args} --cov=koheesio --cov-report=html --cov-report=term-missing --cov-fail-under=90"
# cov = "coverage"


### ~~~~~~~~~~~~~~~~~~~~~ ###
Expand Down Expand Up @@ -249,6 +242,7 @@ features = [
"pyspark",
"sftp",
"delta",
"excel",
"dev",
"test",
]
Expand Down Expand Up @@ -284,6 +278,9 @@ matrix.version.extra-dependencies = [
{ value = "spark-expectations>=2.1.0", if = [
"pyspark33",
] },
{ value = "pandas<2", if = [
"pyspark33",
] },
{ value = "pyspark>=3.4,<3.5", if = [
"pyspark34",
] },
Expand Down Expand Up @@ -400,6 +397,7 @@ features = [
"se",
"sftp",
"delta",
"excel",
"dev",
"test",
"docs",
Expand Down
8 changes: 5 additions & 3 deletions src/koheesio/integrations/spark/dq/spark_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@

from typing import Any, Dict, Optional, Union

import pyspark
from pydantic import Field
from pyspark.sql import DataFrame
from spark_expectations.config.user_config import Constants as user_config
from spark_expectations.core.expectations import (
SparkExpectations,
WrappedDataFrameWriter,
)

from pydantic import Field

import pyspark
from pyspark.sql import DataFrame

from koheesio.spark.transformations import Transformation
from koheesio.spark.writers import BatchOutputMode

Expand Down
50 changes: 50 additions & 0 deletions src/koheesio/models/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Module for the BaseReader class
"""

from typing import Optional, TypeVar
from abc import ABC, abstractmethod

from koheesio import Step

# Define a type variable that can be any type of DataFrame
DataFrameType = TypeVar("DataFrameType")


class BaseReader(Step, ABC):
"""Base class for all Readers
A Reader is a Step that reads data from a source based on the input parameters
and stores the result in self.output.df (DataFrame).
When implementing a Reader, the execute() method should be implemented.
The execute() method should read from the source and store the result in self.output.df.
The Reader class implements a standard read() method that calls the execute() method and returns the result. This
method can be used to read data from a Reader without having to call the execute() method directly. Read method
does not need to be implemented in the child class.
The Reader class also implements a shorthand for accessing the output Dataframe through the df-property. If the
output.df is None, .execute() will be run first.
"""

@property
def df(self) -> Optional[DataFrameType]:
"""Shorthand for accessing self.output.df
If the output.df is None, .execute() will be run first
"""
if not self.output.df:
self.execute()
return self.output.df

@abstractmethod
def execute(self) -> Step.Output:
"""Execute on a Reader should handle self.output.df (output) as a minimum
Read from whichever source -> store result in self.output.df
"""
pass

def read(self) -> DataFrameType:
"""Read from a Reader without having to call the execute() method directly"""
self.execute()
return self.output.df
27 changes: 27 additions & 0 deletions src/koheesio/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Base class for a Pandas step
Extends the Step class with Pandas DataFrame support. The following:
- Pandas steps are expected to return a Pandas DataFrame as output.
"""

from typing import Optional
from abc import ABC

from koheesio import Step, StepOutput
from koheesio.models import Field
from koheesio.spark.utils import import_pandas_based_on_pyspark_version

pandas = import_pandas_based_on_pyspark_version()


class PandasStep(Step, ABC):
"""Base class for a Pandas step
Extends the Step class with Pandas DataFrame support. The following:
- Pandas steps are expected to return a Pandas DataFrame as output.
"""

class Output(StepOutput):
"""Output class for PandasStep"""

df: Optional[pandas.DataFrame] = Field(default=None, description="The Pandas DataFrame")
34 changes: 34 additions & 0 deletions src/koheesio/pandas/readers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Base class for all Readers
"""

from abc import ABC, abstractmethod

from koheesio.models.reader import BaseReader
from koheesio.pandas import PandasStep


class Reader(BaseReader, PandasStep, ABC):
"""Base class for all Readers
A Reader is a Step that reads data from a source based on the input parameters
and stores the result in self.output.df (DataFrame).
When implementing a Reader, the execute() method should be implemented.
The execute() method should read from the source and store the result in self.output.df.
The Reader class implements a standard read() method that calls the execute() method and returns the result. This
method can be used to read data from a Reader without having to call the execute() method directly. Read method
does not need to be implemented in the child class.
The Reader class also implements a shorthand for accessing the output Dataframe through the df-property. If the
output.df is None, .execute() will be run first.
"""

@abstractmethod
def execute(self) -> PandasStep.Output:
"""Execute on a Reader should handle self.output.df (output) as a minimum
Read from whichever source -> store result in self.output.df
"""
# self.output.df # output dataframe
...
50 changes: 50 additions & 0 deletions src/koheesio/pandas/readers/excel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Excel reader for Spark
Note
----
Ensure the 'excel' extra is installed before using this reader.
Default implementation uses openpyxl as the engine for reading Excel files.
Other implementations can be used by passing the correct keyword arguments to the reader.
See Also
--------
- https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
- koheesio.pandas.readers.excel.ExcelReader
"""

from typing import List, Optional, Union
from pathlib import Path

import pandas as pd

from koheesio.models import ExtraParamsMixin, Field
from koheesio.pandas.readers import Reader


class ExcelReader(Reader, ExtraParamsMixin):
"""Read data from an Excel file
See Also
--------
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
Attributes
----------
path : Union[str, Path]
The path to the Excel file
sheet_name : str
The name of the sheet to read
header : Optional[Union[int, List[int]]]
Row(s) to use as the column names
Any other keyword arguments will be passed to pd.read_excel.
"""

path: Union[str, Path] = Field(description="The path to the Excel file")
sheet_name: str = Field(default="Sheet1", description="The name of the sheet to read")
header: Optional[Union[int, List[int]]] = Field(default=0, description="Row(s) to use as the column names")

def execute(self):
extra_params = self.params or {}
self.output.df = pd.read_excel(self.path, sheet_name=self.sheet_name, header=self.header, **extra_params)
24 changes: 3 additions & 21 deletions src/koheesio/spark/readers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
[reference/concepts/steps/readers](../../../reference/concepts/readers.md) section of the Koheesio documentation.
"""

from typing import Optional
from abc import ABC, abstractmethod

from pyspark.sql import DataFrame

from koheesio.models.reader import BaseReader
from koheesio.spark import SparkStep


class Reader(SparkStep, ABC):
class Reader(BaseReader, SparkStep, ABC):
"""Base class for all Readers
A Reader is a Step that reads data from a source based on the input parameters
Expand All @@ -33,24 +31,8 @@ class Reader(SparkStep, ABC):
output.df is None, .execute() will be run first.
"""

@property
def df(self) -> Optional[DataFrame]:
"""Shorthand for accessing self.output.df
If the output.df is None, .execute() will be run first
"""
if not self.output.get("df"):
self.execute()
return self.output.df

@abstractmethod
def execute(self):
def execute(self) -> SparkStep.Output:
"""Execute on a Reader should handle self.output.df (output) as a minimum
Read from whichever source -> store result in self.output.df
"""
# self.output.df # output dataframe
...

def read(self) -> Optional[DataFrame]:
"""Read from a Reader without having to call the execute() method directly"""
self.execute()
return self.output.df
40 changes: 40 additions & 0 deletions src/koheesio/spark/readers/excel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Excel reader for Spark
Note
----
Ensure the 'excel' extra is installed before using this reader.
Default implementation uses openpyxl as the engine for reading Excel files.
Other implementations can be used by passing the correct keyword arguments to the reader.
See Also
--------
- https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
- koheesio.pandas.readers.excel.ExcelReader
"""

from pyspark.pandas import DataFrame as PandasDataFrame

from koheesio.pandas.readers.excel import ExcelReader as PandasExcelReader
from koheesio.spark.readers import Reader


class ExcelReader(Reader, PandasExcelReader):
"""Read data from an Excel file
This class is a wrapper around the PandasExcelReader class. It reads an Excel file first using pandas, and then
converts the pandas DataFrame to a Spark DataFrame.
Attributes
----------
path: str
The path to the Excel file
sheet_name: str
The name of the sheet to read
header: int
The row to use as the column names
"""

def execute(self):
pdf: PandasDataFrame = PandasExcelReader.from_step(self).execute().df
self.output.df = self.spark.createDataFrame(pdf)
Loading

0 comments on commit fc11f0e

Please sign in to comment.