diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 0000000..fa9f5f1 --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,93 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + schedule: + - cron: '36 17 * * 6' + +jobs: + analyze: + name: Analyze (${{ matrix.language }}) + # Runner size impacts CodeQL analysis time. To learn more, please see: + # - https://gh.io/recommended-hardware-resources-for-running-codeql + # - https://gh.io/supported-runners-and-hardware-resources + # - https://gh.io/using-larger-runners (GitHub.com only) + # Consider using larger runners or machines with greater resources for possible analysis time improvements. + runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }} + timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }} + permissions: + # required for all workflows + security-events: write + + # required to fetch internal or private CodeQL packs + packages: read + + # only required for workflows in private repositories + actions: read + contents: read + + strategy: + fail-fast: false + matrix: + include: + - language: python + build-mode: none + # CodeQL supports the following values keywords for 'language': 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' + # Use `c-cpp` to analyze code written in C, C++ or both + # Use 'java-kotlin' to analyze code written in Java, Kotlin or both + # Use 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both + # To learn more about changing the languages that are analyzed or customizing the build mode for your analysis, + # see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/customizing-your-advanced-setup-for-code-scanning. + # If you are analyzing a compiled language, you can modify the 'build-mode' for that language to customize how + # your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v3 + with: + languages: ${{ matrix.language }} + build-mode: ${{ matrix.build-mode }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + + # For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs + # queries: security-extended,security-and-quality + + # If the analyze step fails for one of the languages you are analyzing with + # "We were unable to automatically build your code", modify the matrix above + # to set the build mode to "manual" for that language. Then modify this step + # to build your code. + # ℹ️ Command-line programs to run using the OS shell. + # πŸ“š See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + - if: matrix.build-mode == 'manual' + shell: bash + run: | + echo 'If you are using a "manual" build mode for one or more of the' \ + 'languages you are analyzing, replace this with the commands to build' \ + 'your code, for example:' + echo ' make bootstrap' + echo ' make release' + exit 1 + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 + with: + category: "/language:${{matrix.language}}" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5aae361..1f72446 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,7 +38,8 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 - ref: ${{ github.head_ref }} + ref: ${{ github.event.pull_request.head.ref }} + repository: ${{ github.event.pull_request.head.repo.full_name }} - name: Fetch main branch run: git fetch origin main:main - name: Check changes diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7cbfedb..0145c92 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,26 +2,18 @@ There are a few guidelines that we need contributors to follow so that we are able to process requests as efficiently as possible. - -[//]: # (If you have any questions or concerns please feel free to contact us at [opensource@nike.com](mailto:opensource@nike.com).) +If you have any questions or concerns please feel free to contact us at [opensource@nike.com](mailto:opensource@nike.com). -[//]: # () -[//]: # (## Getting Started) -[//]: # () -[//]: # (* Review our [Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md)) +## Getting Started -[//]: # (* Submit the [Individual Contributor License Agreement](https://www.clahub.com/agreements/Nike-Inc/fastbreak)) -[//]: # (* Make sure you have a [GitHub account](https://github.com/signup/free)) - -[//]: # (* Submit a ticket for your issue, assuming one does not already exist.) - -[//]: # ( * Clearly describe the issue including steps to reproduce when it is a bug.) - -[//]: # ( * Make sure you fill in the earliest version that you know has the issue.) - -[//]: # (* Fork the repository on GitHub) +* Review our [Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md) +* Make sure you have a [GitHub account](https://github.com/signup/free) +* Submit a ticket for your issue, assuming one does not already exist. + * Clearly describe the issue including steps to reproduce when it is a bug. + * Make sure you fill in the earliest version that you know has the issue. +* Fork the repository on GitHub ## Making Changes @@ -98,6 +90,5 @@ At the moment, the release process is manual. We try to make frequent releases. * [GitHub pull request documentation](https://help.github.com/send-pull-requests/) * [Nike's Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md) -[//]: # (* [Nike's Individual Contributor License Agreement](https://www.clahub.com/agreements/Nike-Inc/fastbreak)) [//]: # (* [Nike OSS](https://nike-inc.github.io/)) \ No newline at end of file diff --git a/Makefile b/Makefile index 163a4fd..54da8d0 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: help ## Display this message help: - @python koheesio/__about__.py + @python src/koheesio/__about__.py @echo "\nAvailable \033[34m'make'\033[0m commands:" @echo "\n\033[1mSetup:\033[0m" @grep -E '^.PHONY: .*?## setup - .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ".PHONY: |## (setup|hatch) - "}; {printf " \033[36m%-22s\033[0m %s\n", $$2, $$3}' diff --git a/README.md b/README.md index 23454f8..41e7f0a 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,8 @@ # Koheesio -
- +

Koheesio logo -

+

| | | |---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| @@ -11,10 +10,16 @@ | Package | [![PyPI - Version](https://img.shields.io/pypi/v/koheesio.svg?logo=pypi&label=PyPI&logoColor=gold)](https://pypi.org/project/koheesio/) [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/koheesio.svg?logo=python&label=Python&logoColor=gold)](https://pypi.org/project/koheesio/) [![PyPI - Downloads](https://img.shields.io/pypi/dm/koheesio?color=blue&label=Installs&logo=pypi&logoColor=gold)](https://pypi.org/project/koheesio/) | | Meta | [![Hatch project](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch) [![linting - Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![types - Mypy](https://img.shields.io/badge/types-Mypy-blue.svg)](https://github.com/python/mypy) [![docstring - numpydoc](https://img.shields.io/badge/docstring-numpydoc-blue)](https://numpydoc.readthedocs.io/en/latest/format.html) [![code style - black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License - Apache 2.0](https://img.shields.io/github/license/Nike-Inc/koheesio)](LICENSE.txt) | -Koheesio, named after the Finnish word for cohesion, is a robust Python framework for building efficient data pipelines. -It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components. +[//]: # (suggested edit: ) +# Koheesio: A Python Framework for Efficient Data Pipelines + +Koheesio - the Finnish word for cohesion - is a robust Python framework designed to build efficient data pipelines. It +encourages modularity and collaboration, allowing the creation of complex pipelines from simple, reusable components. + -The framework is versatile, aiming to support multiple implementations and working seamlessly with various data +## What is Koheesio? + +Koheesio is a versatile framework that supports multiple implementations and works seamlessly with various data processing libraries or frameworks. This ensures that Koheesio can handle any data processing task, regardless of the underlying technology or data scale. @@ -23,50 +28,127 @@ safety and structured configurations within pipeline components. [Pydantic]: docs/includes/glossary.md#pydantic -Koheesio's goal is to ensure predictable pipeline execution through a solid foundation of well-tested code and a rich -set of features, making it an excellent choice for developers and organizations seeking to build robust and adaptable -Data Pipelines. +The goal of Koheesio is to ensure predictable pipeline execution through a solid foundation of well-tested code and a +rich set of features. This makes it an excellent choice for developers and organizations seeking to build robust and +adaptable data pipelines. -## What sets Koheesio apart from other libraries?" -Koheesio encapsulates years of data engineering expertise, fostering a collaborative and innovative community. While -similar libraries exist, Koheesio's focus on data pipelines, integration with PySpark, and specific design for tasks -like data transformation, ETL jobs, data validation, and large-scale data processing sets it apart. - -Koheesio aims to provide a rich set of features including readers, writers, and transformations for any type of Data -processing. Koheesio is not in competition with other libraries. Its aim is to offer wide-ranging support and focus -on utility in a multitude of scenarios. Our preference is for integration, not competition... +### What Koheesio is Not + +Koheesio is **not** a workflow orchestration tool. It does not serve the same purpose as tools like Luigi, +Apache Airflow, or Databricks workflows, which are designed to manage complex computational workflows and generate +DAGs (Directed Acyclic Graphs). + +Instead, Koheesio is focused on providing a robust, modular, and testable framework for data tasks. It's designed to +make it easier to write, maintain, and test data processing code in Python, with a strong emphasis on modularity, +reusability, and error handling. + +If you're looking for a tool to orchestrate complex workflows or manage dependencies between different tasks, you might +want to consider dedicated workflow orchestration tools. + + +### The Strength of Koheesio + +The core strength of Koheesio lies in its **focus on the individual tasks within those workflows**. It's all about +making these tasks as robust, repeatable, and maintainable as possible. Koheesio aims to break down tasks into small, +manageable units of work that can be easily tested, reused, and composed into larger workflows orchestrated with other +tools or frameworks (such as Apache Airflow, Luigi, or Databricks Workflows). + +By using Koheesio, you can ensure that your data tasks are resilient, observable, and repeatable, adhering to good +software engineering practices. This makes your data pipelines more reliable and easier to maintain, ultimately leading +to more efficient and effective data processing. + + +### Promoting Collaboration and Innovation + +Koheesio encapsulates years of software and data engineering expertise. It fosters a collaborative and innovative +community, setting itself apart with its unique design and focus on data pipelines, data transformation, ETL jobs, +data validation, and large-scale data processing. + +The core components of Koheesio are designed to bring strong software engineering principles to data engineering. + +'Steps' break down tasks and workflows into manageable, reusable, and testable units. Each 'Step' comes with built-in +logging, providing transparency and traceability. The 'Context' component allows for flexible customization of task +behavior, making it adaptable to various data processing needs. + +In essence, Koheesio is a comprehensive solution for data engineering challenges, designed with the principles of +modularity, reusability, testability, and transparency at its core. It aims to provide a rich set of features including +utilities, readers, writers, and transformations for any type of data processing. It is not in competition with other +libraries, but rather aims to offer wide-ranging support and focus on utility in a multitude of scenarios. Our +preference is for integration, not competition. We invite contributions from all, promoting collaboration and innovation in the data engineering community. + +### Comparison to other libraries + +#### ML frameworks + +The libraries listed under this section are primarily focused on Machine Learning (ML) workflows. They provide various +functionalities, from orchestrating ML and data processing workflows, simplifying the deployment of ML workflows on +Kubernetes, to managing the end-to-end ML lifecycle. While these libraries have a strong emphasis on ML, Koheesio is a +more general data pipeline framework. It is designed to handle a variety of data processing tasks, not exclusively +focused on ML. This makes Koheesio a versatile choice for data pipeline construction, regardless of whether the +pipeline involves ML tasks or not. + +- [Flyte](https://flyte.org/): A cloud-native platform for orchestrating ML and data processing workflows. Unlike Koheesio, it requires Kubernetes for deployment and has a strong focus on workflow orchestration. +- [Kubeflow](https://kubeflow.org/): A project dedicated to simplifying the deployment of ML workflows on Kubernetes. Unlike Koheesio, it is more specialized for ML workflows. +- [Kedro](https://kedro.readthedocs.io/): A Python framework that applies software engineering best-practice to data and machine-learning pipelines. It is similar to Koheesio but has a stronger emphasis on machine learning pipelines. +- [Metaflow](https://docs.metaflow.org/): A human-centric framework for data science that addresses the entire data science lifecycle. It is more focused on data science projects compared to Koheesio. +- [MLflow](https://mlflow.org/docs/latest/index.html): An open source platform for managing the end-to-end machine learning lifecycle. It is more focused on machine learning projects compared to Koheesio. +- [TFX](https://www.tensorflow.org/tfx/guide): An end-to-end platform for deploying production ML pipelines. It is more focused on TensorFlow-based machine learning pipelines compared to Koheesio. +- [Seldon Core](https://docs.seldon.io/projects/seldon-core/en/latest/): An open source platform for deploying machine learning models on Kubernetes. Unlike Koheesio, it is more focused on model deployment. + + +#### Orchestration tools + +The libraries listed under this section are primarily focused on workflow orchestration. They provide various +functionalities, from authoring, scheduling, and monitoring workflows, to building complex pipelines of batch jobs, and +creating and executing Directed Acyclic Graphs (DAGs). Some of these libraries are designed for modern infrastructure +and powered by open-source workflow engines, while others use a Python-style language for defining workflows. While +these libraries have a strong emphasis on workflow orchestration, Koheesio is a more general data pipeline framework. It +is designed to handle a variety of data processing tasks, not limited to workflow orchestration.Ccode written with +Koheesio is often compatible with these orchestration engines. This makes Koheesio a versatile choice for data pipeline +construction, regardless of how the pipeline orchestration is set up. + +- [Apache Airflow](https://airflow.apache.org/docs/): A platform to programmatically author, schedule and monitor workflows. Unlike Koheesio, it focuses on managing complex computational workflows. +- [Luigi](https://luigi.readthedocs.io/): A Python module that helps you build complex pipelines of batch jobs. It is more focused on workflow orchestration compared to Koheesio. +- [Databricks Workflows](https://www.databricks.com/product/workflows): A set of tools for building, debugging, deploying, and running Apache Spark workflows on Databricks. +- [Prefect](https://docs.prefect.io/): A new workflow management system, designed for modern infrastructure and powered by the open-source Prefect Core workflow engine. It is more focused on workflow orchestration and management compared to Koheesio. +- [Snakemake](https://snakemake.readthedocs.io/en/stable/): A workflow management system that uses a Python-style language for defining workflows. While it's powerful for creating complex workflows, Koheesio's focus on modularity and reusability might make it easier to build, test, and maintain your data pipelines. +- [Dagster](https://docs.dagster.io/): A data orchestrator for machine learning, analytics, and ETL. It's more focused on orchestrating and visualizing data workflows compared to Koheesio. +- [Ploomber](https://ploomber.readthedocs.io/): A Python library for building robust data pipelines. In some ways it is similar to Koheesio, but has a very different API design more focused on workflow orchestration. +- [Pachyderm](https://docs.pachyderm.com/): A data versioning, data lineage, and workflow system running on Kubernetes. It is more focused on data versioning and lineage compared to Koheesio. +- [Argo](https://argoproj.github.io/): An open source container-native workflow engine for orchestrating parallel jobs on Kubernetes. Unlike Koheesio, it requires Kubernetes for deployment. + + +#### Others + The libraries listed under this section offer a variety of unique functionalities, from parallel and distributed + computing, to SQL-first transformation workflows, to data versioning and lineage, to data relation definition and + manipulation, and data warehouse management. Some of these libraries are designed for specific tasks such as + transforming data in warehouses using SQL, building concurrent, multi-stage data ingestion and processing pipelines, + or orchestrating parallel jobs on Kubernetes. + +- [Dask](https://dask.org/): A flexible parallel computing library for analytics. Unlike Koheesio, it is more focused on parallel computing and distributed computing. While not currently support, Dask could be a future implementation pattern for Koheesio, just like Pandas and PySpark at the moment. +- [dbt](https://www.getdbt.com/): A SQL-first transformation workflow that also supports Python. It excels in transforming data in warehouses using SQL. In contrast, Koheesio is a more general data pipeline framework with strong typing, capable of handling a variety of data processing tasks beyond transformations. +- [Broadway](https://elixir-broadway.org/): An Elixir library for building concurrent, multi-stage data ingestion and processing pipelines. If your team is more comfortable with Python or if you're looking for a framework that emphasizes modularity and collaboration, Koheesio could be a better fit. +- [Ray](https://docs.ray.io/en/latest/): A general-purpose distributed computing framework. Unlike Koheesio, it is more focused on distributed computing. +- [DataJoint](https://docs.datajoint.io/): A language for defining data relations and manipulating data. Unlike Koheesio, it is more focused on data relation definition and manipulation. + + ## Koheesio Core Components -Here are the key components included in Koheesio: +Here are the 3 core components included in Koheesio: - __Step__: This is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in inputs and producing outputs. - - ```text - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ Input 1 │───────▢│ β”œβ”€β”€β”€β”€β”€β”€β”€β–Άβ”‚ Output 1 β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€βˆšβ”€β”€β”€β”€β”€β”˜ - β”‚ β”‚ - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ Input 2 │───────▢│ Step │───────▢│ Output 2 β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ β”‚ - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ Input 3 │───────▢│ β”œβ”€β”€β”€β”€β”€β”€β”€β–Άβ”‚ Output 3 β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - ``` - - __Context__: This is a configuration class used to set up the environment for a Task. It can be used to share variables across tasks and adapt the behavior of a Task based on its environment. - __Logger__: This is a class for logging messages at different levels. ## Installation -You can install Koheesio using either pip or poetry. +You can install Koheesio using either pip, hatch, or poetry. ### Using Pip @@ -81,7 +163,7 @@ pip install koheesio If you're using Hatch for package management, you can add Koheesio to your project by simply adding koheesio to your `pyproject.toml`. - ```toml + ```toml title="pyproject.toml" [dependencies] koheesio = "" ``` @@ -94,34 +176,49 @@ If you're using poetry for package management, you can add Koheesio to your proj poetry add koheesio ``` -or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace `...` with the version you want to have installed: +or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace +`...` with the version you want to have installed: -```toml +```toml title="pyproject.toml" koheesio = {version = "..."} ``` -### Features +## Extras + +Koheesio also provides some additional features that can be useful in certain scenarios. We call these 'integrations'. +With an integration we mean a module that requires additional dependencies to be installed. -Koheesio also provides some additional features that can be useful in certain scenarios. These include: +Extras can be added by adding `extras=['name_of_the_extra']` (poetry) or `koheesio[name_of_the_extra]` (pip/hatch) to +the `pyproject.toml` entry mentioned above or installing through pip. -- __Spark Expectations__: Available through the `koheesio.steps.integration.spark.dq.spark_expectations` module; - - Installable through the `se` extra. - - SE Provides Data Quality checks for Spark DataFrames. For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations). +### Integrations + +- __Spark Expectations:__ + Available through the `koheesio.steps.integration.spark.dq.spark_expectations` module; installable through the `se` extra. + - SE Provides Data Quality checks for Spark DataFrames. + - For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations). [//]: # (- **Brickflow:** Available through the `koheesio.steps.integration.workflow` module; installable through the `bf` extra.) [//]: # ( - Brickflow is a workflow orchestration tool that allows you to define and execute workflows in a declarative way.) [//]: # ( - For more information, refer to the [Brickflow docs](https://engineering.nike.com/brickflow)) -- __Box__: Available through the `koheesio.steps.integration.box` module - - Installable through the `box` extra. - - Box is a cloud content management and file sharing service for businesses. +- __Box__: + Available through the `koheesio.integration.box` module; installable through the `box` extra. + - [Box](https://www.box.com) is a cloud content management and file sharing service for businesses. + +- __SFTP__: + Available through the `koheesio.integration.spark.sftp` module; installable through the `sftp` extra. + - SFTP is a network protocol used for secure file transfer over a secure shell. + - The SFTP integration of Koheesio relies on [paramiko](https://www.paramiko.org/) -- __SFTP__: Available through the `koheesio.steps.integration.spark.sftp` module; - - Installable through the `sftp` extra. - - SFTP is a network protocol used for secure file transfer over a secure shell. +[//]: # (TODO: add implementations) +[//]: # (## Implementations) +[//]: # (TODO: add async extra) +[//]: # (TODO: add spark extra) +[//]: # (TODO: add pandas extra) > __Note:__ -> Some of the steps require extra dependencies. See the [Features](#features) section for additional info. +> Some of the steps require extra dependencies. See the [Extras](#extras) section for additional info. > Extras can be done by adding `features=['name_of_the_extra']` to the toml entry mentioned above ## Contributing @@ -130,18 +227,21 @@ Koheesio also provides some additional features that can be useful in certain sc We welcome contributions to our project! Here's a brief overview of our development process: -- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull request. +- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes + these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull + request. -- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting a pull request. +- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting + a pull request. - __Release Process__: We aim for frequent releases. Typically when we have a new feature or bugfix, a developer with admin rights will create a new release on GitHub and publish the new version to PyPI. -For more detailed information, please refer to our [contribution guidelines](./docs/contribute.md). We also adhere to -[Nike's Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md) and [Nike's Individual Contributor License Agreement](https://www.clahub.com/agreements/Nike-Inc/fastbreak). +For more detailed information, please refer to our [contribution guidelines](https://github.com/Nike-Inc/koheesio/blob/main/CONTRIBUTING.md). +We also adhere to [Nike's Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md). ### Additional Resources -- [General GitHub documentation](https://help.github.com/) -- [GitHub pull request documentation](https://help.github.com/send-pull-requests/) +- [General GitHub documentation](https://support.github.com/) +- [GitHub pull request documentation](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/about-pull-requests) - [Nike OSS](https://nike-inc.github.io/) diff --git a/docs/assets/documentation-system-overview.png b/docs/assets/documentation-system-overview.png new file mode 100644 index 0000000..88e6016 Binary files /dev/null and b/docs/assets/documentation-system-overview.png differ diff --git a/docs/community/approach-documentation.md b/docs/community/approach-documentation.md index 7e75a84..d53456e 100644 --- a/docs/community/approach-documentation.md +++ b/docs/community/approach-documentation.md @@ -1,8 +1,3 @@ ---- -tags: - - doctype/explanation ---- - ## Scope @@ -21,7 +16,7 @@ From [documentation.divio.com](https://documentation.divio.com): > They are: _tutorials, how-to guides, technical reference and explanation_. They represent four different purposes or functions, and require four different approaches to their creation. Understanding the implications of this will help improve most documentation - often immensely. > > **About the system** -> ![Documentation System Overview](assets/../../assets/documentation-system-overview.png) +> ![Documentation System Overview](../assets/documentation-system-overview.png) > The documentation system outlined here is a simple, comprehensive and nearly universally-applicable scheme. It is proven in practice across a wide variety of fields and applications. > > There are some very simple principles that govern documentation that are very rarely if ever spelled out. They seem to be a secret, though they shouldn’t be. diff --git a/docs/community/contribute.md b/docs/community/contribute.md index 03cec5b..c7805b6 100644 --- a/docs/community/contribute.md +++ b/docs/community/contribute.md @@ -17,15 +17,15 @@ There are a few guidelines that we need contributors to follow so that we are ab * Create a feature branch off of `main` before you start your work. * Please avoid working directly on the `main` branch. -* Setup the required package manager [hatch](#-package-manager) -* Setup the dev environment [see below](#-dev-environment-setup) +* Setup the required package manager [hatch](#package-manager) +* Setup the dev environment [see below](#dev-environment-setup) * Make commits of logical units. * You may be asked to squash unnecessary commits down to logical units. * Check for unnecessary whitespace with `git diff --check` before committing. * Write meaningful, descriptive commit messages. * Please follow existing code conventions when working on a file -* Make sure to check the standards on the code, [see below](#-linting-and-standards) -* Make sure to test the code before you push changes [see below](#-testing) +* Make sure to check the standards on the code, [see below](#linting-and-standards) +* Make sure to test the code before you push changes [see below](#testing) ## 🀝 Submitting Changes @@ -66,7 +66,7 @@ make hatch-install This will install hatch using brew if you are on a Mac. -If you are on a different OS, you can follow the instructions [here]( https://hatch.pypa.io/latest/install/) +If you are on a different OS, you can follow the instructions [here](https://hatch.pypa.io/latest/install/) ### πŸ“Œ Dev Environment Setup @@ -119,5 +119,4 @@ Make sure that all tests pass and that you have adequate coverage before submitt * [General GitHub documentation](https://help.github.com/) * [GitHub pull request documentation](https://help.github.com/send-pull-requests/) * [Nike's Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md) -* [Nike's Individual Contributor License Agreement](https://www.clahub.com/agreements/Nike-Inc/fastbreak) * [Nike OSS](https://nike-inc.github.io/) \ No newline at end of file diff --git a/docs/css/custom.css b/docs/css/custom.css index 684d330..b11661a 100644 --- a/docs/css/custom.css +++ b/docs/css/custom.css @@ -35,31 +35,7 @@ border-left: .05rem solid var(--md-typeset-table-color); } -/* Mark external links as such. */ -.md-content a.autorefs-external::after, -.md-content a[href^="http"]:after { - /* https://primer.style/octicons/arrow-up-right-24 */ - background-image: url('data:image/svg+xml,'); - content: ' '; - - display: inline-block; - position: relative; - top: 0.1em; - margin-left: 0.2em; - margin-right: 0.1em; - - height: 0.6em; - width: 0.6em; - border-radius: 100%; - background-color: var(--md-typeset-a-color); -} - -.md-content a.autorefs-external:hover::after, -.md-content a[href^="http"]:hover::after { - background-color: var(--md-accent-fg-color); - background-image: url('data:image/svg+xml,'); -} - +/* Gradient banner */ .md-header { background: linear-gradient(142deg, rgba(229,119,39,1) 3%, rgba(172,56,56,1) 31%, rgba(133,59,96,1) 51%, rgba(31,67,103,1) 79%, rgba(31,99,120,1) 94%, rgba(32,135,139,1) 100%); } diff --git a/docs/reference/concepts/concepts.md b/docs/reference/concepts/concepts.md index 7b01b3b..044ed2f 100644 --- a/docs/reference/concepts/concepts.md +++ b/docs/reference/concepts/concepts.md @@ -5,7 +5,14 @@ The core components are the following: > *Note:* click on the 'Concept' to take you to the corresponding module. The module documentation will have greater detail on the specifics of the implementation -## [**Step**](steps.md) + +[//]: # (References) +[Context]: context.md +[Logging]: logger.md +[Step]: step.md + + +## [Step] A custom unit of logic that can be executed. A Step is an atomic operation and serves as the building block of data pipelines built with the framework. A step can be seen as an operation on a set of inputs, and returns a set of @@ -53,39 +60,29 @@ Step ---> O3["Output 3"] Step is the core abstraction of the framework. Meaning, that it is the core building block of the framework and is used to define all the operations that can be executed. -Please see the [Step](steps.md) documentation for more details. - -## [**Task**](tasks.md) - -The unit of work of one execution of the framework. +Please see the [Step] documentation for more details. -An execution usually consists of an `Extract - Transform - Load` approach of one data object. -Tasks typically consist of a series of Steps. -Please see the [Task](tasks.md) documentation for more details. - -## [**Context**](context.md) +## [Context] The Context is used to configure the environment where a Task or Step runs. It is often based on configuration files and can be used to adapt behaviour of a Task or Step based on the environment it runs in. -Please see the [Context](context.md) documentation for more details. +Please see the [Context] documentation for more details. -## [**logger**](logging.md) -A logger object to log messages with different levels. +## [Logging] -Please see the [Logging](logging.md) documentation for more details. +A logger object to log messages with different levels. +Please see the [Logging] documentation for more details. The interactions between the base concepts of the model is visible in the below diagram: ```mermaid ---- -title: Koheesio Class Diagram ---- + classDiagram Step .. Task Step .. Transformation diff --git a/docs/reference/concepts/context.md b/docs/reference/concepts/context.md index ec3ca68..fd7a7c7 100644 --- a/docs/reference/concepts/context.md +++ b/docs/reference/concepts/context.md @@ -12,14 +12,14 @@ complex configurations. It also provides serialization and deserialization capab and load configurations in JSON, YAML, or TOML formats. Whether you're setting up the environment for a Task or Step, or managing variables shared across multiple tasks, -`Context` provides a robust and efficient solution. +`Context` provides a robust and efficient solution. This document will guide you through its key features and show you how to leverage its capabilities in your Koheesio applications. ## API Reference -See [API Reference](../../koheesio/context.html) for a detailed description of the `Context` class and its methods. +See [API Reference](../../api_reference/context.md) for a detailed description of the `Context` class and its methods. ## Key Features diff --git a/docs/reference/concepts/logging.md b/docs/reference/concepts/logger.md similarity index 100% rename from docs/reference/concepts/logging.md rename to docs/reference/concepts/logger.md diff --git a/docs/reference/concepts/steps.md b/docs/reference/concepts/step.md similarity index 100% rename from docs/reference/concepts/steps.md rename to docs/reference/concepts/step.md diff --git a/docs/reference/concepts/tasks.md b/docs/reference/concepts/tasks.md deleted file mode 100644 index 74a57fd..0000000 --- a/docs/reference/concepts/tasks.md +++ /dev/null @@ -1,8 +0,0 @@ -A Task is the unit of work of one execution of the framework. An execution usually consists of an Extract -> Transform --> Load approach of one data object. - -Tasks generally are made up of Steps chained one after another. - -## API Reference - -See [API Reference](../../koheesio/tasks) for a detailed description of the `Task` class and its methods. \ No newline at end of file diff --git a/docs/reference/concepts/readers.md b/docs/reference/spark/readers.md similarity index 97% rename from docs/reference/concepts/readers.md rename to docs/reference/spark/readers.md index 0700dc7..7c87920 100644 --- a/docs/reference/concepts/readers.md +++ b/docs/reference/spark/readers.md @@ -12,7 +12,7 @@ the `df` property of the `Reader`. ## API Reference -See [API Reference](../../koheesio/steps/readers) for a detailed description of the `Reader` class and its methods. +See [API Reference](../../api_reference/spark/readers/index.md) for a detailed description of the `Reader` class and its methods. ## Key Features of a Reader diff --git a/docs/reference/concepts/transformations.md b/docs/reference/spark/transformations.md similarity index 99% rename from docs/reference/concepts/transformations.md rename to docs/reference/spark/transformations.md index 933d0a9..21363a3 100644 --- a/docs/reference/concepts/transformations.md +++ b/docs/reference/spark/transformations.md @@ -15,8 +15,8 @@ pipeline. This can help avoid errors and make your code easier to understand and ## API Reference -See [API Reference](../../koheesio/steps/transformations) for a detailed description of the `Transformation` classes and -their methods. +See [API Reference](../../api_reference/spark/transformations/index.md) for a detailed description of the +`Transformation` classes and their methods. ## Types of Transformations diff --git a/docs/reference/concepts/writers.md b/docs/reference/spark/writers.md similarity index 100% rename from docs/reference/concepts/writers.md rename to docs/reference/spark/writers.md diff --git a/docs/tutorials/getting-started.md b/docs/tutorials/getting-started.md index 22bcdf1..8fd80ff 100644 --- a/docs/tutorials/getting-started.md +++ b/docs/tutorials/getting-started.md @@ -6,36 +6,53 @@ ## Installation -### Poetry - -If you're using Poetry, add the following entry to the `pyproject.toml` file: - -```toml title="pyproject.toml" -[[tool.poetry.source]] -name = "nike" -url = "https://artifactory.nike.com/artifactory/api/pypi/python-virtual/simple" -secondary = true -``` - -```bash -poetry add koheesio -``` - -### pip - -If you're using pip, run the following command to install Koheesio: - -Requires [pip](https://pip.pypa.io/en/stable/). - -```bash -pip install koheesio --extra-index-url https://artifactory.nike.com/artifactory/api/pypi/python-virtual/simple -``` +
+ hatch / hatchling + + If you're using hatch (or hatchling), simply add `koheesio` to the `dependencies` or section in your + `pyproject.toml` file: + + ```toml title="pyproject.toml" + dependencies = [ + "koheesio", + ] + ``` +
+ +
+ poetry + + If you're using Poetry, add the following entry to the `pyproject.toml` file: + + ```toml title="pyproject.toml" + [[tool.poetry.source]] + name = "nike" + url = "https://artifactory.nike.com/artifactory/api/pypi/python-virtual/simple" + secondary = true + ``` + + ```bash + poetry add koheesio + ``` +
+ +
+ pip + + If you're using pip, run the following command to install Koheesio: + + Requires [pip](https://pip.pypa.io/en/stable/). + + ```bash + pip install koheesio + ``` +
## Basic Usage Once you've installed Koheesio, you can start using it in your Python scripts. Here's a basic example: -```python +```python title="my_first_step.py" from koheesio import Step # Define a step @@ -50,17 +67,52 @@ step = MyStep() step.execute() ``` -### Advanced Usage -For more advanced usage, you can check out the examples in the `__notebooks__` directory of this repository. These examples show how to use Koheesio's features in more detail. +## Advanced Usage + +```python title="my_first_etl.py" +from pyspark.sql.functions import lit +from pyspark.sql import DataFrame, SparkSession + +# Step 1: import Koheesio dependencies +from koheesio.context import Context +from koheesio.spark.readers.dummy import DummyReader +from koheesio.spark.transformations.camel_to_snake import CamelToSnakeTransformation +from koheesio.spark.writers.dummy import DummyWriter +from koheesio.spark.etl_task import EtlTask + +# Step 2: Set up a SparkSession +spark = SparkSession.builder.getOrCreate() + +# Step 3: Configure your Context +context = Context({ + "source": DummyReader(), + "transformations": [CamelToSnakeTransformation()], + "target": DummyWriter(), + "my_favorite_movie": "inception", +}) + +# Step 4: Create a Task +class MyFavoriteMovieTask(EtlTask): + my_favorite_movie: str + + def transform(self, df: DataFrame = None) -> DataFrame: + df = df.withColumn("MyFavoriteMovie", lit(self.my_favorite_movie)) + return super().transform(df) + +# Step 5: Run your Task +task = MyFavoriteMovieTask(**context) +task.run() +``` ### Contributing -If you want to contribute to Koheesio, check out the `CONTRIBUTING.md` file in this repository. It contains guidelines for contributing, including how to submit issues and pull requests. +If you want to contribute to Koheesio, check out the `CONTRIBUTING.md` file in this repository. It contains guidelines +for contributing, including how to submit issues and pull requests. ### Testing To run the tests for Koheesio, use the following command: ```bash -make test +make dev-test ``` -This will run all the tests in the `test` directory. +This will run all the tests in the `tests` directory. diff --git a/docs/tutorials/how-to.md b/docs/tutorials/how-to.md deleted file mode 100644 index e49986e..0000000 --- a/docs/tutorials/how-to.md +++ /dev/null @@ -1,8 +0,0 @@ ---- -tags: - - doctype/how-to ---- - - - - diff --git a/docs/tutorials/learn-koheesio.md b/docs/tutorials/learn-koheesio.md index 5d71d08..7c237b8 100644 --- a/docs/tutorials/learn-koheesio.md +++ b/docs/tutorials/learn-koheesio.md @@ -1,40 +1,74 @@ # Learn Koheesio -Koheesio is designed to simplify the development of data engineering pipelines. It provides a structured way to define and execute data processing tasks, making it easier to build, test, and maintain complex data workflows. +Koheesio is designed to simplify the development of data engineering pipelines. It provides a structured way to define +and execute data processing tasks, making it easier to build, test, and maintain complex data workflows. ## Core Concepts Koheesio is built around several core concepts: -- **Step**: The fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in inputs and producing outputs. -- **Task**: A larger unit of work, typically encompassing an entire Extract - Transform - Load process for a data object. -- **Context**: A configuration class used to set up the environment for a Task. It can be used to share variables across tasks and adapt the behavior of a Task based on its environment. -- **Logger**: A class for logging messages at different levels. -- **Reader**: A type of Step that reads data from a source and stores the result (to make it available for subsequent steps). -- **Writer**: This controls how data is written to the output in both batch and streaming contexts. -- **Transformation**: A type of Step that takes a DataFrame as input and returns a DataFrame as output. +- **Step**: The fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in + inputs and producing outputs. + > See the [Step](../reference/concepts/step.md) documentation for more information. +- **Context**: A configuration class used to set up the environment for a Task. It can be used to share variables across + tasks and adapt the behavior of a Task based on its environment. + > See the [Context](../reference/concepts/context.md) documentation for more information. +- **Logger**: A class for logging messages at different levels. + > See the [Logger](../reference/concepts/logger.md) documentation for more information. -In any given pipeline, you can expect to use Readers, Writers, and Transformations to express the ETL logic. Readers are responsible for extracting data from various sources, such as databases, files, or APIs. Transformations then process this data, performing operations like filtering, aggregation, or conversion. Finally, Writers handle the loading of the transformed data to the desired destination, which could be a database, a file, or a data stream. The Logger and Context classes provide support for these operations, enabling detailed logging of the pipeline's execution and customization of the pipeline's behavior based on the environment, respectively. The Task class ties these components together, orchestrating the execution of the Steps in a pipeline. +The Logger and Context classes provide support, enabling detailed logging of the pipeline's execution and customization +of the pipeline's behavior based on the environment, respectively. -## Basic Usage -Here's a simple example of how to define and execute a Step: +## Implementations -```python -from koheesio import Step +In the context of Koheesio, an implementation refers to a specific way of executing Steps, the fundamental units of work +in Koheesio. Each implementation uses a different technology or approach to process data along with its own set of +Steps, designed to work with the specific technology or approach used by the implementation. -class MyStep(Step): - def execute(self): - # Your step logic here +For example, the Spark implementation includes Steps for reading data from a Spark DataFrame, transforming the data +using Spark operations, and writing the data to a Spark-supported destination. -step = MyStep() -step.execute() -``` +Currently, Koheesio supports two implementations: Spark, and AsyncIO. -## Advanced Usage +### Spark +_Requires:_ Apache Spark (pyspark) +_Installation:_ `pip install koheesio[spark]` +_Module:_ `koheesio.spark` + +This implementation uses Apache Spark, a powerful open-source unified analytics engine for large-scale data processing. + +Steps that use this implementation can leverage Spark's capabilities for distributed data processing, making it suitable +for handling large volumes of data. The Spark implementation includes the following types of Steps: + +- **Reader**: + `from koheesio.spark.readers import Reader` + A type of Step that reads data from a source and stores the result (to make it available for subsequent steps). + For more information, see the [Reader](../reference/spark/readers.md) documentation. + +- **Writer**: + `from koheesio.spark.writers import Writer` + This controls how data is written to the output in both batch and streaming contexts. + For more information, see the [Writer](../reference/spark/writers.md) documentation. + +- **Transformation**: + `from koheesio.spark.transformations import Transformation` + A type of Step that takes a DataFrame as input and returns a DataFrame as output. + For more information, see the [Transformation](../reference/spark/transformations.md) documentation. + +In any given pipeline, you can expect to use Readers, Writers, and Transformations to express the ETL logic. Readers are +responsible for extracting data from various sources, such as databases, files, or APIs. Transformations then process this +data, performing operations like filtering, aggregation, or conversion. Finally, Writers handle the loading of the +transformed data to the desired destination, which could be a database, a file, or a data stream. + +### Async +_Module:_ `koheesio.asyncio` + +This implementation uses Python's asyncio library for writing single-threaded concurrent code using coroutines, +multiplexing I/O access over sockets and other resources, running network clients and servers, and other related +primitives. Steps that use this implementation can perform data processing tasks asynchronously, which can be beneficial +for IO-bound tasks. -For more advanced usage, check out the examples in the [`__notebooks__`]("__notebooks__") directory of the Koheesio repository. -These examples show how to use Koheesio's features in more detail. ## Best Practices @@ -70,4 +104,14 @@ Here are some best practices for using Koheesio: and loading. This not only simplifies your code but also makes it more robust and efficient. Remember, these are general best practices and might need to be adapted based on your specific use case and -requirements. \ No newline at end of file +requirements. + + +## Pydantic + +Koheesio Steps are Pydantic models, which means they can be validated and serialized. This makes it easy to define +the inputs and outputs of a Step, and to validate them before running the Step. Pydantic models also provide a +consistent way to define the schema of the data that a Step expects and produces, making it easier to understand and +maintain the code. + +Learn more about Pydantic [here](https://docs.pydantic.dev/latest/). diff --git a/docs/tutorials/onboarding.md b/docs/tutorials/onboarding.md index 343c293..55276c1 100644 --- a/docs/tutorials/onboarding.md +++ b/docs/tutorials/onboarding.md @@ -1,7 +1,3 @@ - -tags: - - doctype/how-to - # Onboarding to Koheesio Koheesio is a Python library that simplifies the development of data engineering pipelines. It provides a structured diff --git a/mkdocs.yml b/mkdocs.yml index 0e9b7a9..2dc0774 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -1,6 +1,5 @@ site_name: Koheesio -#FIXME: Change the site_url to the correct one -site_url: https:///koheesio +site_url: https://engineering.nike.com/koheesio/ site_description: Koheesio is an open source python library that simplifies the development of data engineering pipelines. @@ -43,18 +42,19 @@ nav: - Reference: # reference folder - Concepts: - reference/concepts/concepts.md - - Steps: - - reference/concepts/steps.md - - Readers: reference/concepts/readers.md - - Writers: reference/concepts/writers.md - - Transformations: reference/concepts/transformations.md - - Tasks: reference/concepts/tasks.md + - Steps: reference/concepts/step.md - Context: reference/concepts/context.md - - Logging: reference/concepts/logging.md + - Logger: reference/concepts/logger.md + - Spark: + - Readers: reference/spark/readers.md + - Writers: reference/spark/writers.md + - Transformations: reference/spark/transformations.md + - Glossary: + - includes/glossary.md - API Reference: api_reference/ - Community: # community folder - Contribute: community/contribute.md - - Approach to Documentation: explanation/approach-documentation.md + - Approach to Documentation: community/approach-documentation.md theme: name: material @@ -63,27 +63,20 @@ theme: code: Roboto Mono favicon: assets/logo_koheesio.svg palette: - # Palette toggle for automatic mode - - media: "(prefers-color-scheme)" + - media: "(prefers-color-scheme: dark)" + scheme: slate + primary: indigo + accent: indigo toggle: - icon: material/brightness-auto + icon: material/weather-night name: Switch to light mode - # Palette toggle for light mode - media: "(prefers-color-scheme: light)" scheme: default - primary: custom - accent: custom + primary: indigo + accent: indigo toggle: - icon: material/brightness-7 + icon: material/weather-sunny name: Switch to dark mode - # Palette toggle for dark mode - - media: "(prefers-color-scheme: dark)" - scheme: slate - primary: custom - accent: custom - toggle: - icon: material/brightness-4 - name: Switch to light mode features: # - announce.dismiss - content.code.annotate diff --git a/pyproject.toml b/pyproject.toml index d1d297e..4b9f9de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ async_http = [ "nest-asyncio>=1.6.0", ] box = ["boxsdk[jwt]==3.8.1"] -pandas = ["pandas>=1.3", "setuptools"] +pandas = ["pandas>=1.3", "setuptools", "numpy<2.0.0"] pyspark = ["pyspark>=3.2.0", "pyarrow>13"] se = ["spark-expectations>=2.1.0"] # SFTP dependencies in to_csv line_iterator @@ -102,6 +102,7 @@ docs = [ "mkdocstrings-python>=1.7.5", "pygments>=2.17.2", "pymdown-extensions>=10.7.0", + "black", ] diff --git a/src/koheesio/integrations/spark/sftp.py b/src/koheesio/integrations/spark/sftp.py index 07249ee..672fdfd 100644 --- a/src/koheesio/integrations/spark/sftp.py +++ b/src/koheesio/integrations/spark/sftp.py @@ -121,7 +121,7 @@ class SFTPWriter(Writer): file through SFTP. Details on how the DataFrame is written to the buffer should be implemented in the implementation of the BufferWriter class. Any BufferWriter can be used here, as long as it implements the BufferWriter interface. - mode: SFTPWriteMode, optional, default=SFTPWriteMode.OVERWRITE + mode : SFTPWriteMode, optional, default=SFTPWriteMode.OVERWRITE Write mode: overwrite, append, ignore, exclusive, backup, or update. See the docstring of SFTPWriteMode for more details. """ @@ -341,7 +341,6 @@ class SendCsvToSftp(PandasCsvBufferWriter, SFTPWriter): Parameters ---------- - ### SFTP Parameters (Inherited from SFTPWriter) path : Union[str, Path] Path to the folder to write to. file_name : Optional[str] @@ -354,27 +353,27 @@ class SendCsvToSftp(PandasCsvBufferWriter, SFTPWriter): SFTP Server Username. password : SecretStr SFTP Server Password. - mode: SFTPWriteMode + mode : SFTPWriteMode Write mode: overwrite, append, ignore, exclusive, backup, or update. - - ### CSV Parameters (Inherited from PandasCsvBufferWriter) - header: bool + header : bool Whether to write column names as the first line. Default is True. - sep: str + sep : str Field delimiter for the output file. Default is ','. - quote: str + quote : str Character used to quote fields. Default is '"'. - quoteAll: bool + quoteAll : bool Whether all values should be enclosed in quotes. Default is False. - escape: str + escape : str Character used to escape sep and quote when needed. Default is '\\'. - timestampFormat: str + timestampFormat : str Date format for datetime objects. Default is '%Y-%m-%dT%H:%M:%S.%f'. - lineSep: str + lineSep : str Character used as line separator. Default is os.linesep. - compression: Optional[Literal["infer", "gzip", "bz2", "zip", "xz", "zstd", "tar"]] + compression : Optional[Literal["infer", "gzip", "bz2", "zip", "xz", "zstd", "tar"]] Compression to use for the output data. Default is None. + See Also + -------- For more details on the CSV parameters, refer to the PandasCsvBufferWriter class documentation. """ @@ -383,7 +382,7 @@ class SendCsvToSftp(PandasCsvBufferWriter, SFTPWriter): @model_validator(mode="after") def set_up_buffer_writer(self) -> "SendCsvToSftp": """Set up the buffer writer, passing all CSV related options to it.""" - self.buffer_writer = PandasCsvBufferWriter(**self.get_options(options_type="kohesio_pandas_buffer_writer")) + self.buffer_writer = PandasCsvBufferWriter(**self.get_options(options_type="koheesio_pandas_buffer_writer")) return self def execute(self): @@ -428,7 +427,6 @@ class SendJsonToSftp(PandasJsonBufferWriter, SFTPWriter): Parameters ---------- - ### SFTP Parameters (Inherited from SFTPWriter) path : Union[str, Path] Path to the folder on the SFTP server. file_name : Optional[str] @@ -441,21 +439,19 @@ class SendJsonToSftp(PandasJsonBufferWriter, SFTPWriter): SFTP Server Username. password : SecretStr SFTP Server Password. - mode: SFTPWriteMode + mode : SFTPWriteMode Write mode: overwrite, append, ignore, exclusive, backup, or update. - - ### JSON Parameters (Inherited from PandasJsonBufferWriter) - orient: Literal["split", "records", "index", "columns", "values", "table"] + orient : Literal["split", "records", "index", "columns", "values", "table"] Format of the JSON string. Default is 'records'. - lines: bool + lines : bool If True, output is one JSON object per line. Only used when orient='records'. Default is True. - date_format: Literal["iso", "epoch"] + date_format : Literal["iso", "epoch"] Type of date conversion. Default is 'iso'. - double_precision: int + double_precision : int Decimal places for encoding floating point values. Default is 10. - force_ascii: bool + force_ascii : bool If True, encoded string is ASCII. Default is True. - compression: Optional[Literal["gzip"]] + compression : Optional[Literal["gzip"]] Compression to use for output data. Default is None. See Also diff --git a/src/koheesio/logger.py b/src/koheesio/logger.py index 0ddfe03..9f00d36 100644 --- a/src/koheesio/logger.py +++ b/src/koheesio/logger.py @@ -1,7 +1,7 @@ """Loggers are used to log messages from your application. For a comprehensive guide on the usage, examples, and additional features of the logging classes, please refer to the -[reference/concepts/logging](../reference/concepts/logging.md) section of the Koheesio documentation. +[reference/concepts/logger](../reference/concepts/logger.md) section of the Koheesio documentation. Classes ------- diff --git a/src/koheesio/models/__init__.py b/src/koheesio/models/__init__.py index 77a4597..5ab80f1 100644 --- a/src/koheesio/models/__init__.py +++ b/src/koheesio/models/__init__.py @@ -46,8 +46,8 @@ class BaseModel(PydanticBaseModel, ABC): Additional methods and properties: --------------------------------- ### Fields - Every Koheesio BaseModel has two fields: `name` and `description`. These fields are used to provide a name and a - description to the model. + Every Koheesio BaseModel has two predefined fields: `name` and `description`. These fields are used to provide a + name and a description to the model. - `name`: This is the name of the Model. If not provided, it defaults to the class name. @@ -246,7 +246,7 @@ def log(self) -> Logger: return LoggingFactory.get_logger(name=self.__class__.__name__, inherit_from_koheesio=True) @classmethod - def from_basemodel(cls, basemodel: BaseModel, **kwargs): + def from_basemodel(cls, basemodel: BaseModel, **kwargs) -> InstanceOf[BaseModel]: """Returns a new BaseModel instance based on the data of another BaseModel""" kwargs = {**basemodel.model_dump(), **kwargs} return cls(**kwargs) @@ -669,4 +669,8 @@ def _list_of_columns_validation(columns_value): return list(dict.fromkeys(columns)) # dict.fromkeys is used to dedup while maintaining order -ListOfColumns = Annotated[List[str], BeforeValidator(_list_of_columns_validation)] +ListOfColumns = Annotated[Union[str, List[str]], BeforeValidator(_list_of_columns_validation)] +""" Annotated type for a list of column names. +Will ensure that there are no duplicate columns, empty strings, etc. +In case an individual column is passed, the value will be coerced to a list. +""" diff --git a/src/koheesio/models/sql.py b/src/koheesio/models/sql.py index 8906c2e..baa3bc2 100644 --- a/src/koheesio/models/sql.py +++ b/src/koheesio/models/sql.py @@ -15,11 +15,11 @@ class SqlBaseStep(Step, ExtraParamsMixin, ABC): Parameters ---------- - sql_path: Optional[Union[Path, str]], optional, default=None + sql_path : Optional[Union[Path, str]], optional, default=None Path to a SQL file - sql: Optional[str], optional, default=None + sql : Optional[str], optional, default=None SQL script to apply - params: Dict[str, Any], optional, default_factory=dict + params : Dict[str, Any], optional, default_factory=dict Placeholders (parameters) for templating. These are identified with `${placeholder}` in the SQL script.\n __Note__: any arbitrary kwargs passed to the class will be added to params. """ diff --git a/src/koheesio/secrets/cerberus.py b/src/koheesio/secrets/cerberus.py deleted file mode 100644 index 6996201..0000000 --- a/src/koheesio/secrets/cerberus.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Module for retrieving secrets from Cerberus. - -Secrets are stored as SecretContext and can be accessed accordingly. - -See CerberusSecret for more information. -""" - -import os -import re -from typing import Optional - -from boto3 import Session -from cerberus.client import CerberusClient - -from koheesio.models import Field, SecretStr, model_validator -from koheesio.steps.integrations.secrets import Secret - - -class CerberusSecret(Secret): - """ - Retrieve secrets from Cerberus and wrap them into Context class for easy access. - All secrets are stored under the "secret" root and "parent". "Parent" either derived from the - secure data path by replacing "/" and "-", or manually provided by the user. - Secrets are wrapped into the pydantic.SecretStr. - - Example: - ```python - context = { - "secrets": { - "parent": { - "webhook": SecretStr("**********"), - "description": SecretStr("**********"), - } - } - } - ``` - - Values can be decoded like this: - ```python - context.secrets.parent.webhook.get_secret_value() - ``` - or if working with dictionary is preferable: - ```python - for key, value in context.get_all().items(): - value.get_secret_value() - ``` - """ - - url: str = Field(default=..., description="Cerberus URL, eg. https://cerberus.domain.com") - path: str = Field(default=..., description="Secure data path, eg. 'app/my-sdb/my-secrets'") - aws_session: Optional[Session] = Field( - default=None, description="AWS Session to pass to Cerberus client, can be used for local execution." - ) - token: Optional[SecretStr] = Field( - default=os.environ.get("CERBERUS_TOKEN", None), - description="Cerberus token, can be used for local development without AWS auth mechanism." - "Note: Token has priority over AWS session.", - ) - verbose: bool = Field(default=False, description="Enable verbose for Cerberus client") - - @model_validator(mode="before") - def _set_parent_to_path(cls, values): - """ - Set default value for `parent` parameter on model initialization when it was not - explicitly set by the user. In this scenario secure data path will be used: - - 'app/my-sdb/my-secrets' -> app_my_sdb_my_secrets - """ - regex = re.compile(r"[/-]") - path = values.get("path") - if not values.get("parent"): - values["parent"] = regex.sub("_", path) - return values - - @property - def _client(self): - """ - Instantiated Cerberus client. - """ - self.token: Optional[SecretStr] - token = self.token.get_secret_value() if self.token else None - - return CerberusClient(cerberus_url=self.url, token=token, aws_session=self.aws_session, verbose=self.verbose) - - def _get_secrets(self): - """ - Dictionary of secrets. - """ - return self._client.get_secrets_data(self.path) diff --git a/src/koheesio/spark/readers/__init__.py b/src/koheesio/spark/readers/__init__.py index ab81a7d..fd9867c 100644 --- a/src/koheesio/spark/readers/__init__.py +++ b/src/koheesio/spark/readers/__init__.py @@ -3,7 +3,7 @@ self.output.df. For a comprehensive guide on the usage, examples, and additional features of Reader classes, please refer to the -[reference/concepts/steps/readers](../../../reference/concepts/readers.md) section of the Koheesio documentation. +[reference/spark/steps/readers](../../../reference/spark/readers.md) section of the Koheesio documentation. """ from abc import ABC, abstractmethod diff --git a/src/koheesio/spark/readers/databricks/autoloader.py b/src/koheesio/spark/readers/databricks/autoloader.py index 6b26e20..8444a54 100644 --- a/src/koheesio/spark/readers/databricks/autoloader.py +++ b/src/koheesio/spark/readers/databricks/autoloader.py @@ -3,9 +3,12 @@ Autoloader can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats. """ -from typing import Dict, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union from enum import Enum +from pyspark.sql.streaming import DataStreamReader +from pyspark.sql.types import AtomicType, StructType + from koheesio.models import Field, field_validator from koheesio.spark.readers import Reader @@ -53,7 +56,7 @@ class AutoLoader(Reader): Example ------- ```python - from koheesio.spark.readers.databricks import AutoLoader, AutoLoaderFormat + from koheesio.steps.readers.databricks import AutoLoader, AutoLoaderFormat result_df = AutoLoader( format=AutoLoaderFormat.JSON, @@ -82,11 +85,16 @@ class AutoLoader(Reader): description="The location for storing inferred schema and supporting schema evolution, " "used in `cloudFiles.schemaLocation`.", ) - options: Optional[Dict[str, str]] = Field( + options: Optional[Dict[str, Any]] = Field( default_factory=dict, description="Extra inputs to provide to the autoloader. For a full list of inputs, " "see https://docs.databricks.com/ingestion/auto-loader/options.html", ) + schema_: Optional[Union[str, StructType, List[str], Tuple[str, ...], AtomicType]] = Field( + default=None, + description="Explicit schema to apply to the input files.", + alias="schema", + ) @field_validator("format") def validate_format(cls, format_specified): @@ -107,9 +115,12 @@ def get_options(self): return self.options # @property - def reader(self): - """Return the reader for the autoloader""" - return self.spark.readStream.format("cloudFiles").options(**self.get_options()) + def reader(self) -> DataStreamReader: + reader = self.spark.readStream.format("cloudFiles") + if self.schema_ is not None: + reader = reader.schema(self.schema_) + reader = reader.options(**self.get_options()) + return reader def execute(self): """Reads from the given location with the given options using Autoloader""" diff --git a/src/koheesio/spark/readers/delta.py b/src/koheesio/spark/readers/delta.py index 99c08dd..54ee795 100644 --- a/src/koheesio/spark/readers/delta.py +++ b/src/koheesio/spark/readers/delta.py @@ -50,7 +50,7 @@ class DeltaTableReader(Reader): filter_cond : Optional[Union[Column, str]] Filter condition to apply to the dataframe. Filters can be provided by using Column or string expressions. For example: `f.col('state') == 'Ohio'`, `state = 'Ohio'` or `(col('col1') > 3) & (col('col2') < 9)` - columns: Optional[ListOfColumns] + columns : Optional[ListOfColumns] Columns to select from the table. One or many columns can be provided as strings. For example: `['col1', 'col2']`, `['col1']` or `'col1'` streaming : Optional[bool] diff --git a/src/koheesio/spark/readers/file_loader.py b/src/koheesio/spark/readers/file_loader.py index a497a0a..9d33806 100644 --- a/src/koheesio/spark/readers/file_loader.py +++ b/src/koheesio/spark/readers/file_loader.py @@ -97,6 +97,7 @@ class FileLoader(Reader, ExtraParamsMixin): schema_: Optional[Union[StructType, str]] = Field( default=None, description="Schema to use when reading the file", validate_default=False, alias="schema" ) + streaming: Optional[bool] = Field(default=False, description="Whether to read the files as a Stream or not") @field_validator("path") def ensure_path_is_str(cls, v): @@ -106,8 +107,9 @@ def ensure_path_is_str(cls, v): return v def execute(self): - """Reads the file using the specified format, schema, while applying any extra parameters.""" - reader = self.spark.read.format(self.format) + """Reads the file, in batch or as a stream, using the specified format and schema, while applying any extra parameters.""" + reader = self.spark.readStream if self.streaming else self.spark.read + reader = reader.format(self.format) if self.schema_: reader.schema(self.schema_) diff --git a/src/koheesio/spark/readers/hana.py b/src/koheesio/spark/readers/hana.py index 4d773ce..92cfbbd 100644 --- a/src/koheesio/spark/readers/hana.py +++ b/src/koheesio/spark/readers/hana.py @@ -14,7 +14,7 @@ class HanaReader(JdbcReader): Notes ----- - * Refer to [JdbcReader](jdbc.html#koheesio.spark.readers.jdbc.JdbcReader) for the list of all available parameters. + * Refer to [JdbcReader](jdbc.md#koheesio.spark.readers.jdbc.JdbcReader) for the list of all available parameters. * Refer to SAP HANA Client Interface Programming Reference docs for the list of all available connection string parameters: https://help.sap.com/docs/SAP_HANA_CLIENT/f1b440ded6144a54ada97ff95dac7adf/109397c2206a4ab2a5386d494f4cf75e.html diff --git a/src/koheesio/spark/readers/snowflake.py b/src/koheesio/spark/readers/snowflake.py index bca1bb1..bef8946 100644 --- a/src/koheesio/spark/readers/snowflake.py +++ b/src/koheesio/spark/readers/snowflake.py @@ -20,9 +20,9 @@ See Also -------- -- [koheesio.spark.readers.Reader](../readers#koheesio.spark.readers.Reader) +- [koheesio.spark.readers.Reader](index.md#koheesio.spark.readers.Reader) Base class for all Readers. -- [koheesio.steps.integrations.snowflake](../integrations/snowflake.html) +- [koheesio.spark.snowflake](../snowflake.md) Module containing Snowflake classes. More detailed class descriptions can be found in the class docstrings. diff --git a/src/koheesio/spark/readers/teradata.py b/src/koheesio/spark/readers/teradata.py index aa028fb..b4c8167 100644 --- a/src/koheesio/spark/readers/teradata.py +++ b/src/koheesio/spark/readers/teradata.py @@ -20,7 +20,7 @@ class TeradataReader(JdbcReader): See Also -------- - * Refer to [JdbcReader](../readers/jdbc.html#koheesio.spark.readers.jdbc.JdbcReader) for the list of all available + * Refer to [JdbcReader](jdbc.md#koheesio.spark.readers.jdbc.JdbcReader) for the list of all available parameters. * Refer to Teradata docs for the list of all available connection string parameters: https://teradata-docs.s3.amazonaws.com/doc/connectivity/jdbc/reference/current/jdbcug_chapter_2.html#BABJIHBJ diff --git a/src/koheesio/spark/snowflake.py b/src/koheesio/spark/snowflake.py index fa3ec32..466f912 100644 --- a/src/koheesio/spark/snowflake.py +++ b/src/koheesio/spark/snowflake.py @@ -5,7 +5,7 @@ Notes ----- -Every Step in this module is based on [SnowflakeBaseModel](#koheesio.steps.integrations.snowflake.SnowflakeBaseModel). +Every Step in this module is based on [SnowflakeBaseModel](./snowflake.md#koheesio.spark.snowflake.SnowflakeBaseModel). The following parameters are available for every Step. Parameters @@ -907,8 +907,9 @@ class SnowflakeWriter(SnowflakeBaseModel, Writer): See Also -------- - - [koheesio.steps.writers.Writer](../writers#koheesio.steps.writers.Writer) - - [koheesio.steps.writers.BatchOutputMode](../writers#koheesio.steps.writers.BatchOutputMode) + - [koheesio.steps.writers.Writer](writers/index.md#koheesio.spark.writers.Writer) + - [koheesio.steps.writers.BatchOutputMode](writers/index.md#koheesio.spark.writers.BatchOutputMode) + - [koheesio.steps.writers.StreamingOutputMode](writers/index.md#koheesio.spark.writers.StreamingOutputMode) """ table: str = Field(default=..., description="Target table name") diff --git a/src/koheesio/spark/transformations/__init__.py b/src/koheesio/spark/transformations/__init__.py index 938032c..251d66f 100644 --- a/src/koheesio/spark/transformations/__init__.py +++ b/src/koheesio/spark/transformations/__init__.py @@ -6,7 +6,7 @@ References ---------- For a comprehensive guide on the usage, examples, and additional features of Transformation classes, please refer to the -[reference/concepts/steps/transformations](../../../reference/concepts/transformations.md) section of the Koheesio +[reference/concepts/spark/transformations](../../../reference/spark/transformations.md) section of the Koheesio documentation. Classes @@ -45,7 +45,7 @@ class Transformation(SparkStep, ABC): Parameters ---------- - df: Optional[DataFrame] + df : Optional[DataFrame] The DataFrame to apply the transformation to. If not provided, the DataFrame has to be passed to the transform-method. @@ -158,21 +158,26 @@ class ColumnsTransformation(Transformation, ABC): Concept ------- - A ColumnsTransformation is a Transformation with a standardized input for column or columns. The `columns` are - stored as a list. Either a single string, or a list of strings can be passed to enter the `columns`. - `column` and `columns` are aliases to one another - internally the name `columns` should be used though. + A ColumnsTransformation is a Transformation with a standardized input for column or columns. - `columns` are stored as a list - either a single string, or a list of strings can be passed to enter the `columns` - `column` and `columns` are aliases to one another - internally the name `columns` should be used though. If more than one column is passed, the behavior of the Class changes this way: + - the transformation will be run in a loop against all the given columns Configuring the ColumnsTransformation ------------------------------------- - The ColumnsTransformation class has a `ColumnConfig` class that can be used to configure the behavior of the class. + [ColumnConfig]: ./index.md#koheesio.spark.transformations.ColumnsTransformation.ColumnConfig + [SparkDatatype]: ../utils.md#koheesio.spark.utils.SparkDatatype + + The ColumnsTransformation class has a [ColumnConfig] class that can be used to configure the behavior of the class. + Users should not have to interact with the [ColumnConfig] class directly. + This class has the following fields: + - `run_for_all_data_type` allows to run the transformation for all columns of a given type. @@ -182,20 +187,19 @@ class ColumnsTransformation(Transformation, ABC): - `data_type_strict_mode` Toggles strict mode for data type validation. Will only work if `limit_data_type` is set. - Note that Data types need to be specified as a SparkDatatype enum. + Data types need to be specified as a [SparkDatatype] enum. - See the docstrings of the `ColumnConfig` class for more information. - See the SparkDatatype enum for a list of available data types. + --- - Users should not have to interact with the `ColumnConfig` class directly. - - Parameters - ---------- - columns: - The column (or list of columns) to apply the transformation to. Alias: column + + - See the docstrings of the [ColumnConfig] class for more information.
+ - See the [SparkDatatype] enum for a list of available data types.
+
Example ------- + Implementing a transformation using the `ColumnsTransformation` class: + ```python from pyspark.sql import functions as f from koheesio.steps.transformations import ColumnsTransformation @@ -206,6 +210,14 @@ def execute(self): for column in self.get_columns(): self.output.df = self.df.withColumn(column, f.col(column) + 1) ``` + + In the above example, the `execute` method is implemented to add 1 to the values of a given column. + + Parameters + ---------- + columns : ListOfColumns + The column (or list of columns) to apply the transformation to. Alias: column + """ columns: ListOfColumns = Field( @@ -220,19 +232,19 @@ class ColumnConfig: Parameters ---------- - run_for_all_data_type: Optional[List[SparkDatatype]] + run_for_all_data_type : Optional[List[SparkDatatype]] allows to run the transformation for all columns of a given type. A user can trigger this behavior by either omitting the `columns` parameter or by passing a single `*` as a column name. In both cases, the `run_for_all_data_type` will be used to determine the data type. Value should be be passed as a SparkDatatype enum. (default: [None]) - limit_data_type: Optional[List[SparkDatatype]] + limit_data_type : Optional[List[SparkDatatype]] allows to limit the transformation to a specific data type. Value should be passed as a SparkDatatype enum. (default: [None]) - data_type_strict_mode: bool + data_type_strict_mode : bool Toggles strict mode for data type validation. Will only work if `limit_data_type` is set. - when True, a ValueError will be raised if any column does not adhere to the `limit_data_type` - when False, a warning will be thrown and the column will be skipped instead @@ -307,14 +319,14 @@ def column_type_of_col( Parameters ---------- - col: Union[str, Column] + col : Union[str, Column] The column to check the type of - df: Optional[DataFrame] + df : Optional[DataFrame] The DataFrame belonging to the column. If not provided, the DataFrame passed to the constructor will be used. - simple_return_mode: bool + simple_return_mode : bool If True, the return value will be a simple string. If False, the return value will be a SparkDatatype enum. Returns @@ -352,7 +364,7 @@ def get_all_columns_of_specific_type(self, data_type: Union[str, SparkDatatype]) Parameters ---------- - data_type: Union[str, SparkDatatype] + data_type : Union[str, SparkDatatype] The data type to get the columns for Returns @@ -499,7 +511,7 @@ def func(self, column: Column) -> Column: Parameters ---------- - column: Column + column : Column The column to apply the transformation to Returns diff --git a/src/koheesio/spark/transformations/arrays.py b/src/koheesio/spark/transformations/arrays.py index eaf9a91..d58a133 100644 --- a/src/koheesio/spark/transformations/arrays.py +++ b/src/koheesio/spark/transformations/arrays.py @@ -17,9 +17,9 @@ See Also -------- -* [koheesio.spark.transformations](../transformations) +* [koheesio.spark.transformations](index.md) Module containing all transformation classes. -* [koheesio.spark.transformations.ColumnsTransformationWithTarget](../transformations/index.html#koheesio.spark.transformations.ColumnsTransformationWithTarget) +* [koheesio.spark.transformations.ColumnsTransformationWithTarget](index.md#koheesio.spark.transformations.ColumnsTransformationWithTarget) Base class for all transformations that operate on columns and have a target column. """ diff --git a/src/koheesio/spark/transformations/cast_to_datatype.py b/src/koheesio/spark/transformations/cast_to_datatype.py index 42730a5..004c0ef 100644 --- a/src/koheesio/spark/transformations/cast_to_datatype.py +++ b/src/koheesio/spark/transformations/cast_to_datatype.py @@ -20,35 +20,35 @@ ---- Dates, Arrays and Maps are not supported by this module. -- for dates, use the [koheesio.spark.transformations.date_time](date_time) module -- for arrays, use the [koheesio.spark.transformations.arrays](arrays.html) module +- for dates, use the [koheesio.spark.transformations.date_time](date_time/index.md) module +- for arrays, use the [koheesio.spark.transformations.arrays](arrays.md) module Classes ------- -CastToDatatype: - Cast a column or set of columns to a given datatype +CastToDatatype + Cast a column or set of columns to a given datatype CastToByte - Cast to Byte (a.k.a. tinyint) + Cast to Byte (a.k.a. tinyint) CastToShort - Cast to Short (a.k.a. smallint) + Cast to Short (a.k.a. smallint) CastToInteger - Cast to Integer (a.k.a. int) + Cast to Integer (a.k.a. int) CastToLong - Cast to Long (a.k.a. bigint) + Cast to Long (a.k.a. bigint) CastToFloat - Cast to Float (a.k.a. real) + Cast to Float (a.k.a. real) CastToDouble - Cast to Double + Cast to Double CastToDecimal - Cast to Decimal (a.k.a. decimal, numeric, dec, BigDecimal) + Cast to Decimal (a.k.a. decimal, numeric, dec, BigDecimal) CastToString - Cast to String + Cast to String CastToBinary - Cast to Binary (a.k.a. byte array) + Cast to Binary (a.k.a. byte array) CastToBoolean - Cast to Boolean + Cast to Boolean CastToTimestamp - Cast to Timestamp + Cast to Timestamp Note ---- @@ -820,7 +820,7 @@ class CastToTimestamp(CastToDatatype): See Also -------- - * [koheesio.spark.transformations.date_time](date_time) + * [koheesio.spark.transformations.date_time](date_time/index.md) * https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html#timestamp-pattern Unsupported datatypes: diff --git a/src/koheesio/spark/transformations/date_time/__init__.py b/src/koheesio/spark/transformations/date_time/__init__.py index c316232..9270110 100644 --- a/src/koheesio/spark/transformations/date_time/__init__.py +++ b/src/koheesio/spark/transformations/date_time/__init__.py @@ -194,12 +194,18 @@ class ToTimestamp(ColumnsTransformationWithTarget): -------- Related Koheesio classes: - * [koheesio.spark.transformations.ColumnsTransformation](transformations/index.html#koheesio.steps.transformation.ColumnsTransformation) : + [ColumnsTransformation]: ../index.md#koheesio.spark.transformations.ColumnsTransformation + [ColumnsTransformationWithTarget]: ../index.md#koheesio.spark.transformations.ColumnsTransformationWithTarget + [pyspark.sql.functions]: https://spark.apache.org/docs/3.5.1/api/python/reference/pyspark.sql/functions.html + + From the `koheesio.spark.transformations` module: + + * [ColumnsTransformation] : Base class for ColumnsTransformation. Defines column / columns field + recursive logic - * [koheesio.spark.transformations.ColumnsTransformationWithTarget](transformations/index.html#koheesio.steps.transformation.ColumnsTransformationWithTarget) : + * [ColumnsTransformationWithTarget] : Defines target_column / target_suffix field - pyspark.sql.functions: + [pyspark.sql.functions]: * datetime pattern : https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html diff --git a/src/koheesio/spark/transformations/date_time/interval.py b/src/koheesio/spark/transformations/date_time/interval.py index 576b757..9b574a7 100644 --- a/src/koheesio/spark/transformations/date_time/interval.py +++ b/src/koheesio/spark/transformations/date_time/interval.py @@ -35,10 +35,13 @@ --------- Related Koheesio classes: -* [koheesio.spark.transformations.ColumnsTransformation](transformations/index.html#koheesio.steps.transformation.ColumnsTransformation) : - Base class for ColumnsTransformation. Defines column / columns field + recursive logic -* [koheesio.spark.transformations.ColumnsTransformationWithTarget](transformations/index.html#koheesio.steps.transformation.ColumnsTransformationWithTarget) : - Defines target_column / target_suffix field +[ColumnsTransformation]: ../index.md#koheesio.spark.transformations.ColumnsTransformation +[ColumnsTransformationWithTarget]: ../index.md#koheesio.spark.transformations.ColumnsTransformationWithTarget + +From the koheesio.spark.transformations module: + +* [ColumnsTransformation] : Base class for ColumnsTransformation. Defines column / columns field + recursive logic +* [ColumnsTransformationWithTarget] : Defines target_column / target_suffix field pyspark.sql.functions: diff --git a/src/koheesio/spark/transformations/strings/__init__.py b/src/koheesio/spark/transformations/strings/__init__.py index 47c883e..1f97276 100644 --- a/src/koheesio/spark/transformations/strings/__init__.py +++ b/src/koheesio/spark/transformations/strings/__init__.py @@ -7,7 +7,7 @@ The following Transformations are included: -[change_case](change_case.html): +[change_case](change_case.md): - `Lower` Converts a string column to lower case. @@ -16,12 +16,12 @@ - `TitleCase` or `InitCap` Converts a string column to title case, where each word starts with a capital letter. -[concat](concat.html): +[concat](concat.md): - `Concat` Concatenates multiple input columns together into a single column, optionally using the given separator. -[pad](pad.html): +[pad](pad.md): - `Pad` Pads the values of `source_column` with the `character` up until it reaches `length` of characters @@ -30,31 +30,31 @@ - `RPad` Pad with a character on the right side of the string. -[regexp](regexp.html): +[regexp](regexp.md): - `RegexpExtract` Extract a specific group matched by a Java regexp from the specified string column. - `RegexpReplace` Searches for the given regexp and replaces all instances with what is in 'replacement'. -[replace](replace.html): +[replace](replace.md): - `Replace` Replace all instances of a string in a column with another string. -[split](split.html): +[split](split.md): - `SplitAll` Splits the contents of a column on basis of a split_pattern. - `SplitAtFirstMatch` Like SplitAll, but only splits the string once. You can specify whether you want the first or second part. -[substring](substring.html): +[substring](substring.md): - `Substring` Extracts a substring from a string column starting at the given position. -[trim](trim.html): +[trim](trim.md): - `Trim` Trim whitespace from the beginning and/or end of a string. diff --git a/src/koheesio/spark/transformations/strings/change_case.py b/src/koheesio/spark/transformations/strings/change_case.py index 0957ca8..42d6301 100644 --- a/src/koheesio/spark/transformations/strings/change_case.py +++ b/src/koheesio/spark/transformations/strings/change_case.py @@ -36,7 +36,7 @@ class LowerCase(ColumnsTransformationWithTarget): The name of the column or columns to convert to lower case. Alias: column. Lower case will be applied to all columns in the list. Column is required to be of string type. - target_column: str + target_column : str, optional The name of the column to store the result in. If None, the result will be stored in the same column as the input. @@ -94,7 +94,7 @@ class UpperCase(LowerCase): The name of the column or columns to convert to upper case. Alias: column. Upper case will be applied to all columns in the list. Column is required to be of string type. - target_column: str + target_column : str, optional The name of the column to store the result in. If None, the result will be stored in the same column as the input. @@ -143,11 +143,11 @@ class TitleCase(LowerCase): Parameters ---------- - columns: Union[str, List[str]] + columns : ListOfColumns The name of the column or columns to convert to title case. Alias: column. Title case will be applied to all columns in the list. Column is required to be of string type. - target_column: str + target_column : str, optional The name of the column to store the result in. If None, the result will be stored in the same column as the input. diff --git a/src/koheesio/spark/transformations/strings/regexp.py b/src/koheesio/spark/transformations/strings/regexp.py index 63b4de0..63f3171 100644 --- a/src/koheesio/spark/transformations/strings/regexp.py +++ b/src/koheesio/spark/transformations/strings/regexp.py @@ -31,7 +31,7 @@ class RegexpExtract(ColumnsTransformationWithTarget): Parameters ---------- - columns : Union[str, List[str]] + columns : ListOfColumns The column (or list of columns) to extract from. Alias: column target_column : Optional[str], optional, default=None The column to store the result in. If not provided, the result will be stored in the source column. @@ -113,14 +113,14 @@ class RegexpReplace(ColumnsTransformationWithTarget): Parameters ---------- - columns: + columns : ListOfColumns The column (or list of columns) to replace in. Alias: column - target_column: + target_column : Optional[str], optional, default=None The column to store the result in. If not provided, the result will be stored in the source column. Alias: target_suffix - if multiple columns are given as source, this will be used as a suffix. - regexp: + regexp : str The regular expression to replace - replacement: + replacement : str String to replace matched pattern with. Examples diff --git a/src/koheesio/spark/transformations/strings/trim.py b/src/koheesio/spark/transformations/strings/trim.py index 655df98..36a9105 100644 --- a/src/koheesio/spark/transformations/strings/trim.py +++ b/src/koheesio/spark/transformations/strings/trim.py @@ -37,13 +37,13 @@ class Trim(ColumnsTransformationWithTarget): Parameters ---------- - columns: + columns : ListOfColumns The column (or list of columns) to trim. Alias: column If no columns are provided, all string columns will be trimmed. - target_column: + target_column : ListOfColumns, optional The column to store the result in. If not provided, the result will be stored in the source column. Alias: target_suffix - if multiple columns are given as source, this will be used as a suffix. - direction: + direction : trim_type, optional, default "left-right" On which side to remove the spaces. Either "left", "right" or "left-right". Defaults to "left-right" Examples diff --git a/src/koheesio/spark/writers/buffer.py b/src/koheesio/spark/writers/buffer.py index 08d5896..64f57db 100644 --- a/src/koheesio/spark/writers/buffer.py +++ b/src/koheesio/spark/writers/buffer.py @@ -289,7 +289,7 @@ def get_options(self, options_type: str = "csv"): if options_type == "spark": csv_options["lineterminator"] = csv_options.pop(line_sep_option_naming) - elif options_type == "kohesio_pandas_buffer_writer": + elif options_type == "koheesio_pandas_buffer_writer": csv_options["line_terminator"] = csv_options.pop(line_sep_option_naming) return csv_options @@ -320,26 +320,29 @@ class PandasJsonBufferWriter(BufferWriter, ExtraParamsMixin): Parameters ----------- - orient: Literal["split", "records", "index", "columns", "values", "table"] + orient : Literal["split", "records", "index", "columns", "values", "table"] Format of the resulting JSON string. Default is 'records'. - lines: bool + lines : bool Format output as one JSON object per line. Only used when orient='records'. Default is True. - If true, the output will be formatted as one JSON object per line. - If false, the output will be written as a single JSON object. Note: this value is only used when orient='records' and will be ignored otherwise. - date_format: Literal["iso", "epoch"] + date_format : Literal["iso", "epoch"] Type of date conversion. Default is 'iso'. See `Date and Timestamp Formats` for a detailed description and more information. - double_precision: int + double_precision : int Number of decimal places for encoding floating point values. Default is 10. - force_ascii: bool + force_ascii : bool Force encoded string to be ASCII. Default is True. - compression: Optional[Literal["gzip"]] + compression : Optional[Literal["gzip"]] A string representing the compression to use for on-the-fly compression of the output data. Koheesio sets this default to 'None' leaving the data uncompressed. Can be set to gzip' optionally. Other compression options are currently not supported by Koheesio for JSON output. - ## Date and Timestamp Formats in JSON + Other Possible Parameters + ------------------------- + + ### Date and Timestamp Formats in JSON The `date_format` and `date_unit` parameters in pandas `to_json()` method are used to control the representation of dates and timestamps in the resulting JSON. @@ -354,7 +357,7 @@ class PandasJsonBufferWriter(BufferWriter, ExtraParamsMixin): options: 's' for seconds, 'ms' for milliseconds, 'us' for microseconds, and 'ns' for nanoseconds. The default is 'ms'. Note that this parameter is ignored when `date_format='iso'`. - ## Orient Parameter + ### Orient Parameter The `orient` parameter is used to specify the format of the resulting JSON string. Each option is useful in different scenarios depending on whether you need to preserve the index, data types, and/or column names of the original DataFrame. The set of possible orients is: diff --git a/src/koheesio/spark/writers/delta/batch.py b/src/koheesio/spark/writers/delta/batch.py index dc04d7c..7334f27 100644 --- a/src/koheesio/spark/writers/delta/batch.py +++ b/src/koheesio/spark/writers/delta/batch.py @@ -233,7 +233,7 @@ def __merge_all(self) -> Union[DeltaMergeBuilder, DataFrameWriter]: return self.__merge(merge_builder=builder) - def _get_merge_builder(self, provided_merge_builder=None): + def _get_merge_builder(self, provided_merge_builder=None) -> DeltaMergeBuilder: """Resolves the merge builder. If provided, it will be used, otherwise it will be created from the args""" # A merge builder has been already created - case for merge_all @@ -274,19 +274,8 @@ def _merge_builder_from_args(self): .merge(self.df.alias(source_alias), merge_cond) ) - valid_clauses = [ - "whenMatchedUpdate", - "whenNotMatchedInsert", - "whenMatchedDelete", - "whenNotMatchedBySourceUpdate", - "whenNotMatchedBySourceDelete", - ] - for merge_clause in merge_clauses: clause_type = merge_clause.pop("clause", None) - if clause_type not in valid_clauses: - raise ValueError(f"Invalid merge clause '{clause_type}' provided") - method = getattr(builder, clause_type) builder = method(**merge_clause) @@ -314,6 +303,25 @@ def _validate_table(cls, table): return DeltaTableStep(table=table) return table + @field_validator("params") + def _validate_params(cls, params): + """Validates params. If an array of merge clauses is provided, they will be validated against the available + ones in DeltaMergeBuilder""" + + valid_clauses = {m for m in dir(DeltaMergeBuilder) if m.startswith("when")} + + if "merge_builder" in params: + merge_builder = params["merge_builder"] + if isinstance(merge_builder, list): + for merge_conf in merge_builder: + clause = merge_conf.get("clause") + if clause not in valid_clauses: + raise ValueError(f"Invalid merge clause '{clause}' provided") + elif not isinstance(merge_builder, DeltaMergeBuilder): + raise ValueError("merge_builder must be a list or merge clauses or a DeltaMergeBuilder instance") + + return params + @classmethod def get_output_mode(cls, choice: str, options: Set[Type]) -> Union[BatchOutputMode, StreamingOutputMode]: """Retrieve an OutputMode by validating `choice` against a set of option OutputModes. diff --git a/tests/spark/integrations/dq/test_spark_expectations.py b/tests/spark/integrations/dq/test_spark_expectations.py index 259af00..6aa05e6 100644 --- a/tests/spark/integrations/dq/test_spark_expectations.py +++ b/tests/spark/integrations/dq/test_spark_expectations.py @@ -73,6 +73,8 @@ def test_rows_are_dropped(self, spark: SparkSession, prepare_tables): err_table_df = spark.read.table("default.output_table_error") assert err_table_df.count() == 2 + spark.sql("drop table default.output_table_error") + def test_meta_columns_are_not_dropped(self, spark, prepare_tables): from koheesio.integrations.spark.dq.spark_expectations import ( SparkExpectationsTransformation, @@ -98,6 +100,8 @@ def test_meta_columns_are_not_dropped(self, spark, prepare_tables): # Spark Expectations should add either meta_dq_run_date (<=1.1.0) or meta_dq_run_datetime (>= v1.1.1) assert "meta_dq_run_date" in output_columns or "meta_dq_run_datetime" in output_columns + spark.sql("drop table default.output_table_error") + def test_meta_columns_are_dropped(self, spark, prepare_tables): from koheesio.integrations.spark.dq.spark_expectations import ( SparkExpectationsTransformation, @@ -122,6 +126,8 @@ def test_meta_columns_are_dropped(self, spark, prepare_tables): assert "meta_dq_run_id" not in output_columns assert "meta_dq_run_datetime" not in output_columns and "meta_dq_run_datetime" not in output_columns + spark.sql("drop table default.output_table_error") + @staticmethod def apply_spark_sql(spark: SparkSession, source_sql_files: Union[List[str], str]) -> None: if isinstance(source_sql_files, str): diff --git a/tests/spark/readers/test_auto_loader.py b/tests/spark/readers/test_auto_loader.py index ca07f55..8f2b168 100644 --- a/tests/spark/readers/test_auto_loader.py +++ b/tests/spark/readers/test_auto_loader.py @@ -21,10 +21,13 @@ def test_invalid_format(bad_format): def mock_reader(self): - return self.spark.read.format("json").options(**self.options) + reader = self.spark.read.format("json") + if self.schema_ is not None: + reader = reader.schema(self.schema_) + return reader.options(**self.options) -def test_read_json(spark, mocker, data_path): +def test_read_json_infer_schema(spark, mocker, data_path): mocker.patch("koheesio.spark.readers.databricks.autoloader.AutoLoader.reader", mock_reader) options = {"multiLine": "true"} @@ -49,3 +52,51 @@ def test_read_json(spark, mocker, data_path): ] expected_df = spark.createDataFrame(data_expected, schema_expected) assert_df_equality(result, expected_df, ignore_column_order=True) + + +def test_read_json_exact_explicit_schema_struct(spark, mocker, data_path): + mocker.patch("koheesio.spark.readers.databricks.autoloader.AutoLoader.reader", mock_reader) + + schema = StructType( + [ + StructField("string", StringType(), True), + StructField("int", LongType(), True), + StructField("array", ArrayType(LongType()), True), + ] + ) + options = {"multiLine": "true"} + json_file_path_str = f"{data_path}/readers/json_file/dummy.json" + auto_loader = AutoLoader( + format="json", location=json_file_path_str, schema_location="dummy_value", options=options, schema=schema + ) + + auto_loader.execute() + result = auto_loader.output.df + + data_expected = [ + {"string": "string1", "int": 1, "array": [1, 11, 111]}, + {"string": "string2", "int": 2, "array": [2, 22, 222]}, + ] + expected_df = spark.createDataFrame(data_expected, schema) + assert_df_equality(result, expected_df, ignore_column_order=True) + + +def test_read_json_different_explicit_schema_string(spark, mocker, data_path): + mocker.patch("koheesio.spark.readers.databricks.autoloader.AutoLoader.reader", mock_reader) + + schema = "string STRING,array ARRAY" + options = {"multiLine": "true"} + json_file_path_str = f"{data_path}/readers/json_file/dummy.json" + auto_loader = AutoLoader( + format="json", location=json_file_path_str, schema_location="dummy_value", options=options, schema=schema + ) + + auto_loader.execute() + result = auto_loader.output.df + + data_expected = [ + {"string": "string1", "array": [1, 11, 111]}, + {"string": "string2", "array": [2, 22, 222]}, + ] + expected_df = spark.createDataFrame(data_expected, schema) + assert_df_equality(result, expected_df, ignore_column_order=True) diff --git a/tests/spark/readers/test_file_loader.py b/tests/spark/readers/test_file_loader.py index d6c3d13..9605e56 100644 --- a/tests/spark/readers/test_file_loader.py +++ b/tests/spark/readers/test_file_loader.py @@ -1,5 +1,7 @@ import pytest +import pyspark.sql.types as T + from koheesio.spark import AnalysisException from koheesio.spark.readers.file_loader import ( AvroReader, @@ -106,6 +108,15 @@ def test_json_reader(json_file): assert actual_data == expected_data +def test_json_stream_reader(json_file): + schema = "string STRING, int INT, float FLOAT" + reader = JsonReader(path=json_file, schema=schema, streaming=True) + assert reader.path == json_file + df = reader.read() + assert df.isStreaming + assert df.schema == T._parse_datatype_string(schema) + + def test_parquet_reader(parquet_file): expected_data = [ {"id": 0}, diff --git a/tests/spark/writers/delta/test_delta_writer.py b/tests/spark/writers/delta/test_delta_writer.py index 4a36069..66306de 100644 --- a/tests/spark/writers/delta/test_delta_writer.py +++ b/tests/spark/writers/delta/test_delta_writer.py @@ -308,24 +308,25 @@ def test_merge_from_args(spark, dummy_df): ) -def test_merge_from_args_raise_value_error(spark, dummy_df): - table_name = "test_table_merge_from_args_value_error" - dummy_df.write.format("delta").saveAsTable(table_name) - - writer = DeltaTableWriter( - df=dummy_df, - table=table_name, - output_mode=BatchOutputMode.MERGE, - output_mode_params={ +@pytest.mark.parametrize( + "output_mode_params", + [ + { "merge_builder": [ {"clause": "NOT-SUPPORTED-MERGE-CLAUSE", "set": {"id": "source.id"}, "condition": "source.id=target.id"} ], "merge_cond": "source.id=target.id", }, - ) - + {"merge_builder": MagicMock()}, + ], +) +def test_merge_from_args_raise_value_error(spark, output_mode_params): with pytest.raises(ValueError): - writer._merge_builder_from_args() + DeltaTableWriter( + table="test_table_merge", + output_mode=BatchOutputMode.MERGE, + output_mode_params=output_mode_params, + ) def test_merge_no_table(spark):