-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0567dd5
commit ff7bfb6
Showing
2 changed files
with
200 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
# DataFrame Module | ||
|
||
Configurable extension of PySpark DataFrame API. | ||
|
||
## Description | ||
|
||
The `TidyDataFrame` is a custom class that primarily allows you to configure | ||
how your DataFrame executes during its lifetime. The most beneficial of these | ||
configurations is the built-in logging process (inspired by tidylog in R). However, | ||
additional configurations include: | ||
|
||
- Sisabling count/display operations | ||
- Specifying multiple logging handlers | ||
- Documenting workflows in code | ||
|
||
On top of these configurations, `TidyDataFrame` also offers experimental features | ||
as seen throughout the rest of the Tidy Tools package, including: | ||
|
||
- Column selectors (like Polars) | ||
- Functional filters | ||
- Pythonic API | ||
|
||
Let's see how someone would use `TidyDataFrame`. | ||
|
||
## Usage | ||
|
||
A simple example shows the immediate benefits of `TidyDataFrame`: | ||
|
||
```python | ||
from tidy_tools.dataframe import TidyDataFrame | ||
|
||
|
||
# with PySpark, I do not know the impact of each query | ||
result = ( | ||
spark_data | ||
.select(...) # how many columns did I select? | ||
.filter(...) # how many rows did I remove? | ||
.withColumn(...) # what column did I add to my DataFrame? | ||
) | ||
|
||
# with TidyDataFrame, I know the exact impact of each query | ||
result = ( | ||
TidyDataFrame(spark_data) | ||
.select(...) | ||
.filter(...) | ||
.withColumn(...) | ||
._data | ||
) | ||
#> INFO | enter: TidyDataFrame[M rows x N cols] | ||
#> INFO | select: selected Y columns | ||
#> INFO | filter: removed X rows, M-X remaining | ||
#> INFO | mutate: added `<column-name>` (type: <column-type>) | ||
#> INFO | exit: DataFrame[M-X rows x N-Y+1 cols] | ||
``` | ||
|
||
With minor modifications to your existing code, you receive this immensely | ||
beneficial insight into your workflow as it is happening in real-time. This | ||
eliminates any need to verify the correctness of your code after running it | ||
since you understand exactly what is happening. | ||
|
||
Let's see how else we can use TidyDataFrame. | ||
|
||
```python | ||
from tidy_tools.dataframe import TidyContext | ||
from tidy_tools.dataframe import TidyLogHandler, TidyFileHandler | ||
|
||
|
||
# 01. simple configuration | ||
context = TidyContext(name="Cool Data", display=False) | ||
TidyDataFrame(spark_data, context).display() | ||
#> INFO | Cool Data[M rows x N cols] | ||
#> WARNING | display: disabled by context | ||
|
||
|
||
# 02. passing multiple logging handlers (logs to all sinks provided) | ||
context = TidyContext(name="Logging Data", handlers=[TidyLogHandler(), TidyFileHandler("logging_data.log")]) | ||
( | ||
TidyDataFrame(spark_data, context) | ||
.comment("Removing invalid entries from dataset") # persist comments to logs | ||
.filter(...) | ||
) | ||
#> INFO | Logging Data [M rows x N cols] | ||
#> INFO | comment: Removing entries from dataset | ||
#> INFO | filter: removed X rows, M-X remaining | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
# Model Module | ||
|
||
Class-driven workflows driven by *attrs*. | ||
|
||
## Description | ||
|
||
The `TidyDataModel` is a custom class that allows you to **express your expectations** | ||
of a data model. In data pipelines, it's essential to know if and when your data no | ||
longer meets your expectations, but how is your pipeline meant to know this? | ||
|
||
With a simple class - supported by *attrs* - we can capture all field-level expectations, | ||
including: | ||
|
||
- **Type Hints**: what type is your data meant to be? | ||
- **Conversions**: how should values be treated? | ||
- **Validations**: what values should/shouldn't exist? | ||
|
||
Let's see how someone would use `TidyDataModel` for their bank statements. | ||
|
||
## Usage | ||
|
||
Assume you have the following columns in your bank statement: | ||
|
||
- `account_no`: the account associated with the transaction | ||
- `posted_date`: the date of the transaction | ||
- `amount`: the amount charged in the currency of the transaction's country | ||
- `description`: details of the transaction (e.g. store, retailer, etc.) | ||
|
||
A simple model would include these fields' names and their data types. | ||
|
||
```python | ||
import datetime | ||
import decimal | ||
|
||
from attrs import define, field | ||
from tidy_tools.model import TidyDataModel | ||
|
||
|
||
@define | ||
class BankStatement(TidyDataModel): | ||
"""Basic implementation of bank statement data.""" | ||
account_no: str | ||
posted_date: datetime.date | ||
amount: decimal.Decimal | ||
description: str | ||
``` | ||
|
||
This implementation is similar to creating a `StructType` object to represent | ||
our schema. What's the point? Well, our expectations don't just stop at each | ||
field's name and data type. Let's add more to our model. | ||
|
||
```python | ||
from attrs import validators | ||
from pyspark.sql import Column | ||
|
||
|
||
def convert_currency(currency: str): | ||
match currency.strip().upper(): | ||
case "CAD": | ||
currency_rate = 1.44 | ||
case "YEN": | ||
currency_rate = 70 | ||
case _: | ||
currency_rate = 1 | ||
|
||
def closure(column: Column) -> Column: | ||
return column * currency_rate | ||
return closure | ||
|
||
|
||
@define | ||
class BankStatement(TidyDataModel): | ||
"""Complete implementation of bank statement data.""" | ||
# we expect: | ||
# - all account numbers to have a specific format | ||
account_no: str = field(validator=validators.matches_re(r"\d{5}-\d{3}-\d{2}")) | ||
|
||
# we expect: | ||
# - each date to be within the year 2024 | ||
# - each date to be formatted like this '01/01/1900' | ||
posted_date: datetime.date = field( | ||
validators=[ | ||
validators.ge(datetime.date(2024, 1, 1)), | ||
validators.le(datetime.date(2024, 12, 31)) | ||
], | ||
metadata={"format": "MM/dd/yyyy"} | ||
) | ||
|
||
# we expect: | ||
# - currency in 'USD' so we convert to 'CAD' | ||
amount: decimal.Decimal = field(converter=convert_currency("CAD")) | ||
|
||
# we have no "significant" expectations for `description` | ||
description: str | ||
``` | ||
|
||
Models are more than just definitions of data - they are also | ||
actionable objects. Using your model, you can load data directly | ||
into your environment. This step will: | ||
|
||
- enforce the model's schema | ||
- concatenate all sources (if multiple passed) | ||
- run all field-level conversions | ||
- run all field-level validations | ||
- return a converted DataFrame | ||
|
||
```python | ||
bank_activity = BankStatement.load( | ||
"activity/january.csv", | ||
"activity/feburary.csv", | ||
"activity/march.csv", | ||
read_func=spark.read.csv, | ||
read_options={"header": "true"} | ||
) | ||
``` |