Skip to content

Commit

Permalink
Staging/main/0.10.6 (#1065)
Browse files Browse the repository at this point in the history
* Add null ratio to column stats (#1052)

* Delay transforming priority_order into ndarray (#1045)

In the changed code, we had a mypy error because numpy ndarrays are not
compatible with random.Random.shuffle() (expected argument type is
MutableSequence[Any])

We fix this by first instantiating priority_order as a list, then
shuffling it, then creating an ndarray from it afterwards.

* Rename references to degree of freedom from df to deg_of_free (#1056)

* change references to degrees of freedom in chi2 from df to deg_of_free

* reformated using black pre-commit hook

* add_s3_connection_remote_loading_s3uri_feature (#1054)

* add_s3_connection_remote_loading_s3uri_feature

* pre-commit fix

* created S3Helper class and refactored data_utils and unit test

* enhanced test_data.py with test_read_s3_uri

* enhanced unit tests and refactored is_s3_uri

* refactored some unit-tests structure

* rename TestCreateS3Client to TestS3Helper

* fix directions for contrib branch (#1059)

* Feature: Plugins (#1060)

* Reservoir sampling (#826)

* add code for reservoir sampling and insert sample_nrows options

* pre commit fix

* add tests for reservoir sampling

* fixed mypy issues

* fix import to relative path

---------

Co-authored-by: Taylor Turner <[email protected]>
Co-authored-by: Richard Bann <[email protected]>

* plugins loading + preset plugin fetching implementation (#911)

* test

* Plugin implementation

* comments added to functions

* plugin test implementation for plugin presets

* forgot an import

* added None catch

* preset plugin test

* removing stuff I forgot to delete

* snake_case function names

* relative path

* relative path

* made new file for plugin testing

* forgot to delete function from old file

* now ive fixed if statement

* ok this should be it

* Plugin testing (#947)

* test

* plugin test implementation for plugin presets

* forgot an import

* added None catch

* preset plugin test

* snake_case function names

* relative path

* relative path

* forgot to delete function from old file

* nothing yet, just want this in two different repos

* new test for plugins feature and small update to plugin init

* pass

* didnt want dir to be overwritten

* forgot a dir

* fix isort pre-commit

* reservoir sample

* fix imports

* fix testing

* fix req to match dev

---------

Co-authored-by: Rushabh Vinchhi <[email protected]>
Co-authored-by: Richard Bann <[email protected]>
Co-authored-by: Liz Smith <[email protected]>

* version bump (#1064)

* empty test

---------

Co-authored-by: Suprabhat Gurrala <[email protected]>
Co-authored-by: Junho Lee <[email protected]>
Co-authored-by: Main Uddin Khan <[email protected]>
Co-authored-by: Mohammad Motamedi <[email protected]>
Co-authored-by: Rushabh Vinchhi <[email protected]>
Co-authored-by: Richard Bann <[email protected]>
Co-authored-by: Liz Smith <[email protected]>
  • Loading branch information
8 people authored Nov 13, 2023
1 parent 3ef1daa commit 302a458
Show file tree
Hide file tree
Showing 27 changed files with 550 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ For more nuanced testing runs, check out more detailed documentation [here](http
## Creating [Pull Requests](https://github.com/capitalone/DataProfiler/pulls)
Pull requests are the best way to propose changes to the codebase. We actively welcome your pull requests:

1. Fork the repo and create your branch from `main`.
1. Fork the repo and create your branch from `dev`.
2. If you've added code that should be tested, add tests.
3. If you've changed APIs, update the documentation.
4. Ensure the test suite passes.
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dataprofiler/labelers/embeddings/glove-reduced-64D.txt

.cache/
.idea/
.vscode
.vscode*
*.pyc
*.pkl
*.whl
Expand Down Expand Up @@ -134,3 +134,6 @@ venv.bak/
env3/

*.bak

#Pipfiles
Pipfile*
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ repos:
typing-extensions>=3.10.0.2,
HLL>=2.0.3,
datasketches>=4.1.0,
boto3>=1.28.61,

# requirements-dev.txt
check-manifest>=0.48,
Expand Down Expand Up @@ -110,7 +111,7 @@ repos:
additional_dependencies: ['h5py', 'wheel', 'future', 'numpy', 'pandas',
'python-dateutil', 'pytz', 'pyarrow', 'chardet', 'fastavro',
'python-snappy', 'charset-normalizer', 'psutil', 'scipy', 'requests',
'networkx','typing-extensions', 'HLL', 'datasketches']
'networkx','typing-extensions', 'HLL', 'datasketches', 'boto3']
# Pyupgrade - standardize and modernize Python syntax for newer versions of the language
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.0
Expand Down
4 changes: 4 additions & 0 deletions dataprofiler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
UnstructuredDataLabeler,
train_structured_labeler,
)
from .plugins import load_plugins
from .profilers.graph_profiler import GraphProfiler
from .profilers.profile_builder import (
Profiler,
Expand Down Expand Up @@ -41,3 +42,6 @@ def set_seed(seed=None):
if seed is not None and (not isinstance(seed, int) or seed < 0):
raise ValueError("Seed should be a non-negative integer.")
settings._seed = seed


load_plugins()
11 changes: 9 additions & 2 deletions dataprofiler/data_readers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .. import dp_logging
from .avro_data import AVROData
from .csv_data import CSVData
from .data_utils import is_valid_url, url_to_bytes
from .data_utils import S3Helper, is_valid_url, url_to_bytes
from .graph_data import GraphData
from .json_data import JSONData
from .parquet_data import ParquetData
Expand Down Expand Up @@ -65,7 +65,14 @@ def __new__(
options = dict()

if is_valid_url(input_file_path):
input_file_path = url_to_bytes(input_file_path, options)
if S3Helper.is_s3_uri(input_file_path, logger=logger):
storage_options = options.pop("storage_options", {})
s3 = S3Helper.create_s3_client(**storage_options)
input_file_path = S3Helper.get_s3_uri(
s3_uri=input_file_path, s3_client=s3
)
else:
input_file_path = url_to_bytes(input_file_path, options)

for data_class_info in cls.data_classes:
data_class = data_class_info["data_class"]
Expand Down
126 changes: 126 additions & 0 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Contains functions for data readers."""
import json
import logging
import os
import re
import urllib
from collections import OrderedDict
Expand All @@ -19,6 +21,8 @@
cast,
)

import boto3
import botocore
import dateutil
import pandas as pd
import pyarrow.parquet as pq
Expand Down Expand Up @@ -843,3 +847,125 @@ def url_to_bytes(url_as_string: Url, options: Dict) -> BytesIO:

stream.seek(0)
return stream


class S3Helper:
"""
A utility class for working with Amazon S3.
This class provides methods to check if a path is an S3 URI
and to create an S3 client.
"""

@staticmethod
def is_s3_uri(path: str, logger: logging.Logger) -> bool:
"""
Check if the given path is an S3 URI.
This function checks for common S3 URI prefixes "s3://" and "s3a://".
Args:
path (str): The path to check for an S3 URI.
logger (logging.Logger): The logger instance for logging.
Returns:
bool: True if the path is an S3 URI, False otherwise.
"""
# Define the S3 URI prefixes to check
s3_uri_prefixes = ["s3://", "s3a://"]
path = path.strip()
# Check if the path starts with any of the specified prefixes
is_s3 = any(path.startswith(prefix) for prefix in s3_uri_prefixes)
if not is_s3:
logger.debug(f"'{path}' is not a valid S3 URI")

return is_s3

@staticmethod
def _create_boto3_client(
aws_access_key_id: Optional[str],
aws_secret_access_key: Optional[str],
aws_session_token: Optional[str],
region_name: Optional[str],
) -> boto3.client:
return boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
region_name=region_name,
)

@staticmethod
def create_s3_client(
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
region_name: Optional[str] = None,
) -> boto3.client:
"""
Create and return an S3 client.
Args:
aws_access_key_id (str): The AWS access key ID.
aws_secret_access_key (str): The AWS secret access key.
aws_session_token (str): The AWS session token
(optional, typically used for temporary credentials).
region_name (str): The AWS region name (default is 'us-east-1').
Returns:
boto3.client: A S3 client instance.
"""
# Check if credentials are not provided
# and use environment variables as fallback
if aws_access_key_id is None:
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
if aws_secret_access_key is None:
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY")
if aws_session_token is None:
aws_session_token = os.environ.get("AWS_SESSION_TOKEN")

# Check if region is not provided and use environment variable as fallback
if region_name is None:
region_name = os.environ.get("AWS_REGION", "us-east-1")

# Check if IAM roles for service accounts are available
try:
s3 = S3Helper._create_boto3_client(
aws_access_key_id, aws_secret_access_key, aws_session_token, region_name
)
except botocore.exceptions.NoCredentialsError:
# IAM roles are not available, so fall back to provided credentials
if aws_access_key_id is None or aws_secret_access_key is None:
raise ValueError(
"AWS access key ID and secret access key are required."
)
s3 = S3Helper._create_boto3_client(
aws_access_key_id, aws_secret_access_key, aws_session_token, region_name
)

return s3

@staticmethod
def get_s3_uri(s3_uri: str, s3_client: boto3.client) -> BytesIO:
"""
Download an object from an S3 URI and return its content as BytesIO.
Args:
s3_uri (str): The S3 URI specifying the location of the object to download.
s3_client (boto3.client): An initialized AWS S3 client
for accessing the S3 service.
Returns:
BytesIO: A BytesIO object containing the content of
the downloaded S3 object.
"""
# Parse the S3 URI
parsed_uri = urllib.parse.urlsplit(s3_uri)
bucket_name = parsed_uri.netloc
file_key = parsed_uri.path.lstrip("/")
# Download the S3 object
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)

# Return the object's content as BytesIO
return BytesIO(response["Body"].read())
6 changes: 3 additions & 3 deletions dataprofiler/labelers/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2047,9 +2047,9 @@ def process(
elif aggregation_func == "random":
num_labels = max(label_mapping.values()) + 1
random_state: random.Random = self._parameters["random_state"]
priority_order = np.array(list(range(num_labels)))
random_state.shuffle(priority_order) # type: ignore
self.priority_prediction(results, priority_order)
priority_order = list(range(num_labels))
random_state.shuffle(priority_order)
self.priority_prediction(results, np.array(priority_order))
else:
raise ValueError(
f"`{aggregation_func}` is not a valid aggregation function"
Expand Down
38 changes: 38 additions & 0 deletions dataprofiler/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import importlib
import os

from .decorators import plugin_decorator, plugins_dict


def load_plugins():
"""
Digs through plugins folder for possible plugins to be imported
and consequently added to the plugins_dict if properly decorated
:return: None
"""
plugin_path = os.path.dirname(os.path.abspath(__file__))
for folder in os.listdir(plugin_path):
option_path = os.path.join(plugin_path, folder)
if os.path.isdir(option_path):
if folder == "__pycache__":
continue
for filename in os.listdir(option_path):
if filename is None or not filename.endswith(".py"):
continue
spec = importlib.util.spec_from_file_location(
filename, os.path.join(option_path, filename)
)
if spec is not None:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)


def get_plugins(typ):
"""
Fetches a dictionary of plugins of a certain type
:param typ: Broader classification/type of a plugin
:return: dict
"""
return plugins_dict.get(typ)
28 changes: 28 additions & 0 deletions dataprofiler/plugins/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Contains function for generating plugins data."""
from collections import defaultdict
from typing import Any, DefaultDict, Dict

plugins_dict: DefaultDict[str, Dict[str, Any]] = defaultdict(dict)


def plugin_decorator(typ, name):
"""
Populate plugins_dict with decorated plugin functions.
:param typ: Broader classification/type of a plugin
:param name: Specific name of a plugin
:return: function
"""

def __inner_factory_function(fn):
"""
Actual population of plugin_dict.
:param fn: Plugin function
:return: function
"""
global plugins_dict
plugins_dict[typ][name] = fn
return fn

return __inner_factory_function
16 changes: 8 additions & 8 deletions dataprofiler/profilers/numerical_column_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,8 @@ def _perform_t_test(
) -> dict:
results: dict = {
"t-statistic": None,
"conservative": {"df": None, "p-value": None},
"welch": {"df": None, "p-value": None},
"conservative": {"deg_of_free": None, "p-value": None},
"welch": {"deg_of_free": None, "p-value": None},
}

invalid_stats = False
Expand Down Expand Up @@ -647,17 +647,17 @@ def _perform_t_test(

s_delta = var1 / n1 + var2 / n2
t = (mean1 - mean2) / np.sqrt(s_delta)
conservative_df = min(n1, n2) - 1
welch_df = s_delta**2 / (
conservative_deg_of_free = min(n1, n2) - 1
welch_deg_of_free = s_delta**2 / (
(var1 / n1) ** 2 / (n1 - 1) + (var2 / n2) ** 2 / (n2 - 1)
)
results["t-statistic"] = t
results["conservative"]["df"] = float(conservative_df)
results["welch"]["df"] = float(welch_df)
results["conservative"]["deg_of_free"] = float(conservative_deg_of_free)
results["welch"]["deg_of_free"] = float(welch_deg_of_free)

conservative_t = scipy.stats.t(conservative_df)
conservative_t = scipy.stats.t(conservative_deg_of_free)
conservative_p_val = (1 - conservative_t.cdf(abs(t))) * 2
welch_t = scipy.stats.t(welch_df)
welch_t = scipy.stats.t(welch_deg_of_free)
welch_p_val = (1 - welch_t.cdf(abs(t))) * 2

results["conservative"]["p-value"] = float(conservative_p_val)
Expand Down
7 changes: 7 additions & 0 deletions dataprofiler/profilers/profile_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(
self.sample_size: int = 0
self.sample: list[str] = list()
self.null_count: int = 0
self.null_ratio: float | None = None
self.null_types: list[str] = list()
self.null_types_index: dict = {}
self._min_id: int | None = None
Expand Down Expand Up @@ -292,6 +293,9 @@ def diff(self, other_profile: StructuredColProfiler, options: dict = None) -> di
"null_count": profiler_utils.find_diff_of_numbers(
self.null_count, other_profile.null_count
),
"null_ratio": profiler_utils.find_diff_of_numbers(
self.null_ratio, other_profile.null_ratio
),
"null_types": profiler_utils.find_diff_of_lists_and_sets(
self.null_types, other_profile.null_types
),
Expand Down Expand Up @@ -428,6 +432,7 @@ def _update_base_stats(self, base_stats: dict) -> None:
self._last_batch_size = base_stats["sample_size"]
self.sample = base_stats["sample"]
self.null_count += base_stats["null_count"]
self.null_ratio = base_stats["null_count"] / base_stats["sample_size"]
self.null_types = profiler_utils._combine_unique_sets(
self.null_types, list(base_stats["null_types"].keys())
)
Expand Down Expand Up @@ -570,6 +575,7 @@ def clean_data_and_get_base_stats(
{
"sample_size": 0,
"null_count": 0,
"null_ratio": None,
"null_types": dict(),
"sample": [],
"min_id": None,
Expand Down Expand Up @@ -658,6 +664,7 @@ def clean_data_and_get_base_stats(
base_stats = {
"sample_size": total_sample_size,
"null_count": total_na,
"null_ratio": total_na / total_sample_size,
"null_types": na_columns,
"sample": rng.choice(
list(df_series.values), (min(len(df_series), 5),), replace=False
Expand Down
Loading

0 comments on commit 302a458

Please sign in to comment.