Skip to content

Commit

Permalink
Staging/main/0.10.8 (#1081)
Browse files Browse the repository at this point in the history
* Feature: added parquet sampling (#1070)

* parquet sampling function developed in data_utils.py; Added sample_nrows argument in ParquetData class; Added test_len_sampled_data in test_parquet_data.py

* resolved conflict with dev, added more tests

* fixed sample empty column bug

* fixed comments in data_utils.py, including:
1. added type of return in sample_parquet function;
2. changed variable names in sample_parquet function to more descriptive names (select -> sample_index, out -> sample_df);
3. created convert_unicode_col_to_utf8 function to reduce repeating code in sample_parquet and read_parquet_df functions

* 1. renamed variable names in covert_unicode_col_to_utf8 function (data_utils.py) to be more descriptive (types -> input_column_types, col -> iter_column), other part unchanged

2. test_parquet_data.py, move import statement to the top of file

3. test_parquet_data.py, merged all tests about parquet sample feature to their original tests

* checked the datatype and input file path before and after reload with sampling option enabled

* test

* delete test edit in avro_data.py, updated fastavro version in  requirment.txt

* remove fastavro.reader type

* change fastavro version back to original

* 1. sample_parquet function description
2. test_len_data method keep one sample length test
3. remove sampling test in test_specifying_data_type
4. remove sampling test in test_reload_data

* Depedency: `matplotlib` version bump  (#1072)

* bump tag matplotlib

* bumpt to most recent

* 3.9.0 update

* Bump actions/setup-python from 4 to 5 (#1078)

Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4 to 5.
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](actions/setup-python@v4...v5)

---
updated-dependencies:
- dependency-name: actions/setup-python
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Taylor Turner <[email protected]>

* Make _assimilate_histogram not use self (#1071)

Co-authored-by: Taylor Turner <[email protected]>

* version bump

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: WML <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Junho Lee <[email protected]>
  • Loading branch information
4 people authored Jan 11, 2024
1 parent ad2eb80 commit a92ab1e
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/data_readers/avro_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _load_data_from_file(self, input_file_path: str) -> List:
# even when the option encoding='utf-8' is added. It may come from
# some special compression codec, e.g., snappy. Then, binary mode
# reading is currently used to get the dict-formatted lines.
df_reader: fastavro.reader = fastavro.reader(input_file)
df_reader = fastavro.reader(input_file)
lines: List = list()
for line in df_reader:
lines.append(line)
Expand Down
134 changes: 107 additions & 27 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import random
import re
import urllib
from collections import OrderedDict
Expand All @@ -24,6 +25,7 @@
import boto3
import botocore
import dateutil
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import requests
Expand Down Expand Up @@ -439,52 +441,130 @@ def read_csv_df(
return data


def read_parquet_df(
def convert_unicode_col_to_utf8(input_df: pd.DataFrame) -> pd.DataFrame:
"""
Convert all unicode columns in input dataframe to utf-8.
:param input_df: input dataframe
:type input_df: pd.DataFrame
:return: corrected dataframe
:rtype: pd.DataFrame
"""
# Convert all the unicode columns to utf-8
input_column_types = input_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)

mixed_and_unicode_cols = input_column_types[
input_column_types == "unicode"
].index.union(input_column_types[input_column_types == "mixed"].index)

for iter_column in mixed_and_unicode_cols:
# Encode sting to bytes
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)

# Decode bytes back to string
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)

return input_df


def sample_parquet(
file_path: str,
sample_nrows: int,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
Read parquet file, sample specified number of rows from it and return a data frame.
:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):
# read parquet file into table
if selected_columns:
parquet_table = pq.read_table(file_path, columns=selected_columns)
else:
parquet_table = pq.read_table(file_path)

data_row_df = parquet_file.read_row_group(i).to_pandas()
# sample
n_rows = parquet_table.num_rows
if n_rows > sample_nrows:
sample_index = np.array([False] * n_rows)
sample_index[random.sample(range(n_rows), sample_nrows)] = True
else:
sample_index = np.array([True] * n_rows)
sample_df = parquet_table.filter(sample_index).to_pandas()

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
# Convert all the unicode columns to utf-8
sample_df = convert_unicode_col_to_utf8(sample_df)

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)
original_df_dtypes = sample_df.dtypes
if read_in_string:
sample_df = sample_df.astype(str)

for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)
return sample_df, original_df_dtypes

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])
def read_parquet_df(
file_path: str,
sample_nrows: Optional[int] = None,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
if sample_nrows is None:
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):

data_row_df = parquet_file.read_row_group(i).to_pandas()

return data, original_df_dtypes
# Convert all the unicode columns to utf-8
data_row_df = convert_unicode_col_to_utf8(data_row_df)

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
return data, original_df_dtypes
else:
data, original_df_dtypes = sample_parquet(
file_path,
sample_nrows,
selected_columns=selected_columns,
read_in_string=read_in_string,
)
return data, original_df_dtypes


def read_text_as_list_of_strs(
Expand Down
11 changes: 10 additions & 1 deletion dataprofiler/data_readers/parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._data_formats["json"] = self._get_data_as_json
self._selected_data_format: str = options.get("data_format", "dataframe")
self._selected_columns: List[str] = options.get("selected_columns", list())
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand All @@ -83,6 +84,11 @@ def selected_columns(self) -> List[str]:
"""Return selected columns."""
return self._selected_columns

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand All @@ -100,7 +106,10 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
"""Return data from file."""
data, original_df_dtypes = data_utils.read_parquet_df(
input_file_path, self.selected_columns, read_in_string=True
input_file_path,
selected_columns=self.selected_columns,
read_in_string=True,
sample_nrows=self.sample_nrows,
)
self._original_df_dtypes = original_df_dtypes
return data
Expand Down
26 changes: 19 additions & 7 deletions dataprofiler/profilers/numerical_column_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import pandas as pd
import scipy.stats

from . import histogram_utils, profiler_utils
from . import float_column_profile, histogram_utils, profiler_utils
from .base_column_profilers import BaseColumnProfiler
from .profiler_options import NumericalOptions

Expand Down Expand Up @@ -710,6 +710,9 @@ def _preprocess_for_calculate_psi(
entity_count_per_bin=self_histogram["bin_counts"],
bin_edges=self_histogram["bin_edges"],
suggested_bin_count=num_psi_bins,
is_float_histogram=isinstance(
self, float_column_profile.FloatColumn
),
options={
"min_edge": min_min_edge,
"max_edge": max_max_edge,
Expand All @@ -731,6 +734,9 @@ def _preprocess_for_calculate_psi(
entity_count_per_bin=other_histogram["bin_counts"],
bin_edges=other_histogram["bin_edges"],
suggested_bin_count=num_psi_bins,
is_float_histogram=isinstance(
self, float_column_profile.FloatColumn
),
options={
"min_edge": min_min_edge,
"max_edge": max_max_edge,
Expand Down Expand Up @@ -1360,7 +1366,12 @@ def _update_histogram(self, df_series: pd.Series) -> None:
self._stored_histogram["total_loss"] += histogram_loss

def _regenerate_histogram(
self, entity_count_per_bin, bin_edges, suggested_bin_count, options=None
self,
entity_count_per_bin,
bin_edges,
suggested_bin_count,
is_float_histogram,
options=None,
) -> tuple[dict[str, np.ndarray], float]:

# create proper binning
Expand All @@ -1372,6 +1383,11 @@ def _regenerate_histogram(
new_bin_edges = np.linspace(
options["min_edge"], options["max_edge"], suggested_bin_count + 1
)

# if it's not a float histogram, then assume it only contains integer values
if not is_float_histogram:
bin_edges = np.round(bin_edges)

return self._assimilate_histogram(
from_hist_entity_count_per_bin=entity_count_per_bin,
from_hist_bin_edges=bin_edges,
Expand Down Expand Up @@ -1417,11 +1433,6 @@ def _assimilate_histogram(

bin_edge = from_hist_bin_edges[bin_id : bin_id + 3]

# if we know not float, we can assume values in bins are integers.
is_float_profile = self.__class__.__name__ == "FloatColumn"
if not is_float_profile:
bin_edge = np.round(bin_edge)

# loop until we have a new bin which contains the current bin.
while (
bin_edge[0] >= dest_hist_bin_edges[new_bin_id + 1]
Expand Down Expand Up @@ -1513,6 +1524,7 @@ def _histogram_for_profile(
entity_count_per_bin=bin_counts,
bin_edges=bin_edges,
suggested_bin_count=suggested_bin_count,
is_float_histogram=isinstance(self, float_column_profile.FloatColumn),
)

def _get_best_histogram_for_profile(self) -> dict:
Expand Down
22 changes: 20 additions & 2 deletions dataprofiler/tests/data_readers/test_parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import unittest
from io import BytesIO

import pandas as pd

from dataprofiler.data_readers.data import Data
from dataprofiler.data_readers.parquet_data import ParquetData

Expand Down Expand Up @@ -123,13 +125,24 @@ def test_data_formats(self):
self.assertEqual(input_data_obj.data_format, data_format)
data = input_data_obj.data
if data_format == "dataframe":
import pandas as pd

self.assertIsInstance(data, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

input_data_obj_sampled = Data(
input_file["path"], options={"sample_nrows": 100}
)
for data_format in list(input_data_obj_sampled._data_formats.keys()):
input_data_obj_sampled.data_format = data_format
self.assertEqual(input_data_obj_sampled.data_format, data_format)
data_sampled = input_data_obj_sampled.data
if data_format == "dataframe":
self.assertIsInstance(data_sampled, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data_sampled, list)
self.assertIsInstance(data_sampled[0], str)

def test_mixed_string_col(self):
"""
Determine if parquet can handle mixed string column types.
Expand Down Expand Up @@ -181,6 +194,11 @@ def test_len_data(self):
self.assertEqual(input_file["count"], len(data), msg=input_file["path"])
self.assertEqual(input_file["count"], data.length, msg=input_file["path"])

data_sampled = Data(input_file["path"], options={"sample_nrows": 100})
self.assertEqual(
min(100, input_file["count"]), len(data_sampled), msg=input_file["path"]
)

def test_file_encoding(self):
"""Tests to ensure file_encoding set to None"""
for input_file in self.file_or_buf_list:
Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

MAJOR = 0
MINOR = 10
MICRO = 7
MICRO = 8
POST = None # otherwise None

VERSION = "%d.%d.%d" % (MAJOR, MINOR, MICRO)
Expand Down
2 changes: 1 addition & 1 deletion requirements-reports.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
matplotlib>=3.3.3,<3.6.1
matplotlib>=3.3.3,<3.9.0
seaborn>=0.11.1

0 comments on commit a92ab1e

Please sign in to comment.