Skip to content

Commit

Permalink
[FIX] Spark expectations dependencies (#17)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

## Description
Enable Spark Expectation

## Related Issue
#16 

## How Has This Been Tested?
Run all tests

## Screenshots (if appropriate):

## Types of changes
<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->
- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->
- [x] My code follows the code style of this project.
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
- [ ] I have added tests to cover my changes.
- [x] All new and existing tests passed.
  • Loading branch information
mikita-sakalouski authored May 30, 2024
1 parent 5957a76 commit ba5b4ea
Show file tree
Hide file tree
Showing 22 changed files with 170 additions and 105 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: release
name: Release

on:
push:
Expand Down Expand Up @@ -83,8 +83,9 @@ jobs:
with:
name: python-artifacts
path: dist

- name: Install Hatch
uses: pypa/hatch@install

- name: Publish package to PyPI
uses: pypa/[email protected]
with:
print-hash: true
run: hatch publish --yes --no-prompt
17 changes: 16 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
name: test
name: Test

on:
pull_request:
branches:
- main
paths:
- '**.py'
- '**.toml'
workflow_dispatch:
inputs:
logLevel:
description: 'Log level'
required: true
default: 'warning'
type: choice
options:
- info
- warning
- debug

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }}
Expand All @@ -18,6 +32,7 @@ jobs:
tests:
name: Python ${{ matrix.python-version }} with PySpark ${{ matrix.pyspark-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }}
runs-on: ${{ matrix.os }}

strategy:
fail-fast: false
matrix:
Expand Down
64 changes: 32 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@

| | |
|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| CI/CD | [![CI - Test](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml) [![CD - Build Koheesio](https://github.com/Nike-Inc/koheesio/actions/workflows/build_koheesio.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/release.yml) |
| CI/CD | [![CI - Test](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml) [![CD - Release Koheesio](https://github.com/Nike-Inc/koheesio/actions/workflows/release.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/release.yml) |
| Package | [![PyPI - Version](https://img.shields.io/pypi/v/koheesio.svg?logo=pypi&label=PyPI&logoColor=gold)](https://pypi.org/project/koheesio/) [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/koheesio.svg?logo=python&label=Python&logoColor=gold)](https://pypi.org/project/koheesio/) [![PyPI - Installs](https://img.shields.io/pypi/dm/koheesio.svg?color=blue&label=Installs&logo=pypi&logoColor=gold)](https://pypi.org/project/koheesio/) [![Release - Downloads](https://img.shields.io/github/downloads/Nike-Inc/koheesio/total?label=Downloads)](https://github.com/Nike-Inc/koheesio/releases) |
| Meta | [![Hatch project](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch) [![linting - Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![types - Mypy](https://img.shields.io/badge/types-Mypy-blue.svg)](https://github.com/python/mypy) [![docstring - numpydoc](https://img.shields.io/badge/docstring-numpydoc-blue)](https://numpydoc.readthedocs.io/en/latest/format.html) [![code style - black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License - Apache 2.0](https://img.shields.io/badge/License-Apache_2.0-green.svg)](LICENSE.txt) |
| Meta | [![Hatch project](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch) [![linting - Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![types - Mypy](https://img.shields.io/badge/types-Mypy-blue.svg)](https://github.com/python/mypy) [![docstring - numpydoc](https://img.shields.io/badge/docstring-numpydoc-blue)](https://numpydoc.readthedocs.io/en/latest/format.html) [![code style - black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License - Apache 2.0](https://img.shields.io/github/license/Nike-Inc/koheesio)](LICENSE.txt) |

Koheesio, named after the Finnish word for cohesion, is a robust Python framework for building efficient data pipelines.
It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components.


Koheesio, named after the Finnish word for cohesion, is a robust Python framework for building efficient data pipelines.
It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components.

The framework is versatile, aiming to support multiple implementations and working seamlessly with various data
The framework is versatile, aiming to support multiple implementations and working seamlessly with various data
processing libraries or frameworks. This ensures that Koheesio can handle any data processing task, regardless of the
underlying technology or data scale.

Koheesio uses [Pydantic] for strong typing, data validation, and settings management, ensuring a high level of type
Koheesio uses [Pydantic] for strong typing, data validation, and settings management, ensuring a high level of type
safety and structured configurations within pipeline components.

[Pydantic]: docs/includes/glossary.md#pydantic
Expand All @@ -29,25 +27,25 @@ Koheesio's goal is to ensure predictable pipeline execution through a solid foun
set of features, making it an excellent choice for developers and organizations seeking to build robust and adaptable
Data Pipelines.

## What sets Koheesio apart from other libraries?"

### What sets Koheesio apart from other libraries?"
Koheesio encapsulates years of data engineering expertise, fostering a collaborative and innovative community. While
similar libraries exist, Koheesio's focus on data pipelines, integration with PySpark, and specific design for tasks
Koheesio encapsulates years of data engineering expertise, fostering a collaborative and innovative community. While
similar libraries exist, Koheesio's focus on data pipelines, integration with PySpark, and specific design for tasks
like data transformation, ETL jobs, data validation, and large-scale data processing sets it apart.

Koheesio aims to provide a rich set of features including readers, writers, and transformations for any type of Data
processing. Koheesio is not in competition with other libraries. Its aim is to offer wide-ranging support and focus
processing. Koheesio is not in competition with other libraries. Its aim is to offer wide-ranging support and focus
on utility in a multitude of scenarios. Our preference is for integration, not competition...

We invite contributions from all, promoting collaboration and innovation in the data engineering community.


## Koheesio Core Components

Here are the key components included in Koheesio:

- __Step__: This is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline,
taking in inputs and producing outputs.

```text
┌─────────┐ ┌──────────────────┐ ┌──────────┐
│ Input 1 │───────▶│ ├───────▶│ Output 1 │
Expand All @@ -61,11 +59,11 @@ Here are the key components included in Koheesio:
│ Input 3 │───────▶│ ├───────▶│ Output 3 │
└─────────┘ └──────────────────┘ └──────────┘
```

- __Context__: This is a configuration class used to set up the environment for a Task. It can be used to share
variables across tasks and adapt the behavior of a Task based on its environment.
- __Logger__: This is a class for logging messages at different levels.


## Installation

You can install Koheesio using either pip or poetry.
Expand All @@ -82,7 +80,11 @@ pip install koheesio

If you're using Hatch for package management, you can add Koheesio to your project by simply adding koheesio to your
`pyproject.toml`.


```toml
[dependencies]
koheesio = "<version>"
```

### Using Poetry

Expand All @@ -92,47 +94,45 @@ If you're using poetry for package management, you can add Koheesio to your proj
poetry add koheesio
```

or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace
`...` with the version you want to have installed:
or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace `...` with the version you want to have installed:

```toml
koheesio = {version = "..."}
```

### Extras
### Features

Koheesio also provides some additional features that can be useful in certain scenarios. These include:

- __Spark Expectations:__ Available through the `koheesio.steps.integration.spark.dq.spark_expectations` module; installable through the `se` extra.
- SE Provides Data Quality checks for Spark DataFrames.
- For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations).
- __Spark Expectations__: Available through the `koheesio.steps.integration.spark.dq.spark_expectations` module;
- Installable through the `se` extra.
- SE Provides Data Quality checks for Spark DataFrames. For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations).

[//]: # (- **Brickflow:** Available through the `koheesio.steps.integration.workflow` module; installable through the `bf` extra.)
[//]: # ( - Brickflow is a workflow orchestration tool that allows you to define and execute workflows in a declarative way.)
[//]: # ( - For more information, refer to the [Brickflow docs]&#40;https://engineering.nike.com/brickflow&#41;)

- __Box__: Available through the `koheesio.steps.integration.box` module; installable through the `box` extra.
- Box is a cloud content management and file sharing service for businesses.
- __Box__: Available through the `koheesio.steps.integration.box` module
- Installable through the `box` extra.
- Box is a cloud content management and file sharing service for businesses.

- __SFTP__: Available through the `koheesio.steps.integration.spark.sftp` module; installable through the `sftp` extra.
- SFTP is a network protocol used for secure file transfer over a secure shell.
- __SFTP__: Available through the `koheesio.steps.integration.spark.sftp` module;
- Installable through the `sftp` extra.
- SFTP is a network protocol used for secure file transfer over a secure shell.

> __Note:__
> Some of the steps require extra dependencies. See the [Extras](#extras) section for additional info.
> Extras can be added to Poetry by adding `extras=['name_of_the_extra']` to the toml entry mentioned above
> Some of the steps require extra dependencies. See the [Features](#features) section for additional info.
> Extras can be done by adding `features=['name_of_the_extra']` to the toml entry mentioned above
## Contributing

### How to Contribute

We welcome contributions to our project! Here's a brief overview of our development process:

- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes
these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull
request.
- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull request.

- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting
a pull request.
- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting a pull request.

- __Release Process__: We aim for frequent releases. Typically when we have a new feature or bugfix, a developer with
admin rights will create a new release on GitHub and publish the new version to PyPI.
Expand Down
29 changes: 15 additions & 14 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ async_http = [
box = ["boxsdk[jwt]==3.8.1"]
pandas = ["pandas>=1.3", "setuptools"]
pyspark = ["pyspark>=3.2.0", "pyarrow>13"]
# FIXME: loose versioning in spark_excpectations for pluggy module
# se = ["spark-expectations>=1.1.0"]
se = []
se = ["spark-expectations>=2.1.0"]
# SFTP dependencies in to_csv line_iterator
sftp = ["paramiko>=2.6.0"]
delta = ["delta-spark>=2.2"]
Expand Down Expand Up @@ -249,7 +247,6 @@ features = [
"box",
"pandas",
"pyspark",
"se",
"sftp",
"delta",
"dev",
Expand Down Expand Up @@ -284,19 +281,27 @@ matrix.version.extra-dependencies = [
{ value = "pyspark>=3.3,<3.4", if = [
"pyspark33",
] },
{ value = "spark-expectations>=2.1.0", if = [
"pyspark33",
] },
{ value = "pyspark>=3.4,<3.5", if = [
"pyspark34",
] },
{ value = "spark-expectations>=2.1.0", if = [
"pyspark34",
] },
{ value = "pyspark>=3.5,<3.6", if = [
"pyspark35",
] },
]

name.".*".env-vars = [
# set number of workes for parallel testing
{ key = "PYTEST_XDIST_AUTO_NUM_WORKERS", value = "2" },
# disables Koheesio logo being printed during testing
{ key = "KOHEESIO__PRINT_LOGO", value = "False" },
# set number of workes for parallel testing
{ key = "PYTEST_XDIST_AUTO_NUM_WORKERS", value = "2" },
# disables Koheesio logo being printed during testing
{ key = "KOHEESIO__PRINT_LOGO", value = "False" },
]

[tool.pytest.ini_options]
addopts = "-q --color=yes --order-scope=module"
log_level = "CRITICAL"
Expand Down Expand Up @@ -331,11 +336,7 @@ koheesio = ["src/koheesio", "*/koheesio/src/koheesio"]
tests = ["tests", "*/koheesio/tests"]

[tool.coverage.report]
exclude_lines = [
"no cov",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]
exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"]
omit = ["tests/*"]

### ~~~~ ###
Expand Down Expand Up @@ -650,4 +651,4 @@ enable = ["logging-not-lazy", "c-extension-no-member"]
notes = ["FIXME", "TODO"]

[tool.pylint.refactoring]
max-nested-blocks = 3
max-nested-blocks = 3
15 changes: 9 additions & 6 deletions src/koheesio/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
LICENSE_INFO = "Licensed as Apache 2.0"
SOURCE = "https://github.com/Nike-Inc/koheesio"
__version__ = "0.7.0"
__logo__ = 75, (
b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15"
b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee"
b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc"
b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e"
b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00"
__logo__ = (
75,
(
b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15"
b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee"
b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc"
b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e"
b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00"
),
)
__short_description__ = __doc__.split("\n", maxsplit=1)[0]
__about__ = f"""Koheesio - v{__version__}
Expand Down
4 changes: 3 additions & 1 deletion src/koheesio/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def merge(self, other: Union[Dict, StepOutput]):
--------
```python
step_output = StepOutput(foo="bar")
step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
step_output.merge(
{"lorem": "ipsum"}
) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
```
Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`.
Expand Down
8 changes: 6 additions & 2 deletions src/koheesio/integrations/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,16 @@ class BoxFileWriter(BoxFolderBase):
from koheesio.steps.integrations.box import BoxFileWriter
auth_params = {...}
f1 = BoxFileWriter(**auth_params, path="/foo/bar", file="path/to/my/file.ext").execute()
f1 = BoxFileWriter(
**auth_params, path="/foo/bar", file="path/to/my/file.ext"
).execute()
# or
import io
b = io.BytesIO(b"my-sample-data")
f2 = BoxFileWriter(**auth_params, path="/foo/bar", file=b, name="file.ext").execute()
f2 = BoxFileWriter(
**auth_params, path="/foo/bar", file=b, name="file.ext"
).execute()
```
"""

Expand Down
10 changes: 6 additions & 4 deletions src/koheesio/integrations/spark/dq/spark_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@

from typing import Any, Dict, Optional, Union

import pyspark
from pydantic import Field
from pyspark.sql import DataFrame
from spark_expectations.config.user_config import Constants as user_config
from spark_expectations.core.expectations import (
SparkExpectations,
WrappedDataFrameWriter,
)

from pydantic import Field

from pyspark.sql import DataFrame

from koheesio.spark.transformations import Transformation
from koheesio.spark.writers import BatchOutputMode

if pyspark.__version__.startswith("3.5"):
raise ImportError("Spark Expectations is not supported for Spark 3.5")


class SparkExpectationsTransformation(Transformation):
"""
Expand Down
Loading

0 comments on commit ba5b4ea

Please sign in to comment.