Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

018 Normalize data #33

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .spark.data_quality.duplicate_detection import *
from .spark.data_quality.normalization.normalization import *
from .spark.data_quality.normalization.normalization_mean import *
from .spark.data_quality.normalization.normalization_minmax import *
from .spark.data_quality.normalization.normalization_zscore import *
from .spark.data_quality.normalization.denormalization import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import DataFrame as PySparkDataFrame
from .....data_wranglers.interfaces import WranglerBaseInterface
from ....._pipeline_utils.models import Libraries, SystemType
from .normalization import (
NormalizationBaseClass,
)


class Denormalization(WranglerBaseInterface):
"""
#TODO
Applies the appropriate denormalization method to revert values to their original scale.

Example
--------
```python
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers import Denormalization
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

denormalization = Denormalization(normalized_df, normalization)
denormalized_df = denormalization.filter()
```

Parameters:
df (DataFrame): PySpark DataFrame to be reverted to its original scale.
normalization_to_revert (NormalizationBaseClass): An instance of the specific normalization subclass (NormalizationZScore, NormalizationMinMax, NormalizationMean) that was originally used to normalize the data.
"""

df: PySparkDataFrame
normalization_to_revert: NormalizationBaseClass

def __init__(
self, df: PySparkDataFrame, normalization_to_revert: NormalizationBaseClass
) -> None:
self.df = df
self.normalization_to_revert = normalization_to_revert

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def filter(self) -> PySparkDataFrame:
return self.normalization_to_revert.denormalize(self.df)
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod
from pyspark.sql import DataFrame as PySparkDataFrame
from typing import List
from .....data_wranglers.interfaces import WranglerBaseInterface
from ....._pipeline_utils.models import Libraries, SystemType


class NormalizationBaseClass(WranglerBaseInterface):
"""
A base class for applying normalization techniques to multiple columns in a PySpark DataFrame.
This class serves as a framework to support various normalization methods (e.g., Z-Score, Min-Max, and Mean),
with specific implementations in separate subclasses for each normalization type.

Subclasses should implement specific normalization and denormalization methods by inheriting from this base class.


Example
--------
```python
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers import NormalizationZScore
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

normalization = NormalizationZScore(df, column_names=["value_column_1", "value_column_2"], in_place=False)
normalized_df = normalization.filter()
```

Parameters:
df (DataFrame): PySpark DataFrame to be normalized.
column_names (List[str]): List of columns in the DataFrame to be normalized.
in_place (bool): If true, then result of normalization is stored in the same column.

Attributes:
NORMALIZATION_NAME_POSTFIX : str
Suffix added to the column name if a new column is created for normalized values.

"""

df: PySparkDataFrame
column_names: List[str]
in_place: bool

reversal_value: List[float]

# Appended to column name if new column is added
NORMALIZATION_NAME_POSTFIX: str = "normalization"

def __init__(
self, df: PySparkDataFrame, column_names: List[str], in_place: bool = False
) -> None:

for column_name in column_names:
if not column_name in df.columns:
raise ValueError("{} not found in the DataFrame.".format(column_name))

self.df = df
self.column_names = column_names
self.in_place = in_place

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def filter(self):
return self.normalize()

def normalize(self) -> PySparkDataFrame:
"""
Applies the specified normalization to each column in column_names.

Returns:
DataFrame: A PySpark DataFrame with the normalized values.
"""
normalized_df = self.df
for column in self.column_names:
normalized_df = self._normalize_column(normalized_df, column)
return normalized_df

def denormalize(self, input_df) -> PySparkDataFrame:
"""
Denormalizes the input DataFrame. Intended to be used by the denormalization component.

Parameters:
input_df (DataFrame): Dataframe containing the current data.
"""
denormalized_df = input_df
if not self.in_place:
for column in self.column_names:
denormalized_df = denormalized_df.drop(
self._get_norm_column_name(column)
)
else:
for column in self.column_names:
denormalized_df = self._denormalize_column(denormalized_df, column)
return denormalized_df

@property
@abstractmethod
def NORMALIZED_COLUMN_NAME(self): ...

@abstractmethod
def _normalize_column(self, df: PySparkDataFrame, column: str) -> PySparkDataFrame:
pass

@abstractmethod
def _denormalize_column(
self, df: PySparkDataFrame, column: str
) -> PySparkDataFrame:
pass

def _get_norm_column_name(self, column_name: str) -> str:
if not self.in_place:
return f"{column_name}_{self.NORMALIZED_COLUMN_NAME}_{self.NORMALIZATION_NAME_POSTFIX}"
else:
return column_name
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math

from .normalization import NormalizationBaseClass
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F


class NormalizationMean(NormalizationBaseClass):

NORMALIZED_COLUMN_NAME = "mean"

def _normalize_column(self, df: PySparkDataFrame, column: str) -> PySparkDataFrame:
"""
Private method to apply Mean normalization to the specified column.
Mean normalization: (value - mean) / (max - min)
"""
mean_val = df.select(F.mean(F.col(column))).collect()[0][0]
min_val = df.select(F.min(F.col(column))).collect()[0][0]
max_val = df.select(F.max(F.col(column))).collect()[0][0]

divisor = max_val - min_val
if math.isclose(divisor, 0.0, abs_tol=10e-8) or not math.isfinite(divisor):
raise ZeroDivisionError("Division by Zero in Mean")

store_column = self._get_norm_column_name(column)
self.reversal_value = [mean_val, min_val, max_val]

return df.withColumn(
store_column,
(F.col(column) - F.lit(mean_val)) / (F.lit(max_val) - F.lit(min_val)),
)

def _denormalize_column(
self, df: PySparkDataFrame, column: str
) -> PySparkDataFrame:
"""
Private method to revert Mean normalization to the specified column.
Mean denormalization: normalized_value * (max - min) + mean = value
"""
mean_val = self.reversal_value[0]
min_val = self.reversal_value[1]
max_val = self.reversal_value[2]

store_column = self._get_norm_column_name(column)

return df.withColumn(
store_column,
F.col(column) * (F.lit(max_val) - F.lit(min_val)) + F.lit(mean_val),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math

from .normalization import NormalizationBaseClass
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F


class NormalizationMinMax(NormalizationBaseClass):

NORMALIZED_COLUMN_NAME = "minmax"

def _normalize_column(self, df: PySparkDataFrame, column: str) -> PySparkDataFrame:
"""
Private method to revert Min-Max normalization to the specified column.
Min-Max denormalization: normalized_value * (max - min) + min = value
"""
min_val = df.select(F.min(F.col(column))).collect()[0][0]
max_val = df.select(F.max(F.col(column))).collect()[0][0]

divisor = max_val - min_val
if math.isclose(divisor, 0.0, abs_tol=10e-8) or not math.isfinite(divisor):
raise ZeroDivisionError("Division by Zero in MinMax")

store_column = self._get_norm_column_name(column)
self.reversal_value = [min_val, max_val]

return df.withColumn(
store_column,
(F.col(column) - F.lit(min_val)) / (F.lit(max_val) - F.lit(min_val)),
)

def _denormalize_column(
self, df: PySparkDataFrame, column: str
) -> PySparkDataFrame:
"""
Private method to revert Z-Score normalization to the specified column.
Z-Score denormalization: normalized_value * std_dev + mean = value
"""
min_val = self.reversal_value[0]
max_val = self.reversal_value[1]

store_column = self._get_norm_column_name(column)

return df.withColumn(
store_column,
(F.col(column) * (F.lit(max_val) - F.lit(min_val))) + F.lit(min_val),
)
Loading