Skip to content

Commit

Permalink
Closes #1456 - df.save() with SegArray Column (#1467)
Browse files Browse the repository at this point in the history
* Updating dataframe to only use save and load functions. Updating segarray save to use file_format. Updating test.

* Updated load to properly format the segarray within dataframes. Added test to ensure functional.

* Updating to use save/load in place of save_table/load_table

* removing old commented out code.

* Updating with black and isort to avoid merge conflicts.

* Rebase cleanup

* Removing unused import
  • Loading branch information
Ethan-DeBandi99 authored Jun 6, 2022
1 parent ce55b35 commit 8db89d0
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 41 deletions.
57 changes: 31 additions & 26 deletions arkouda/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ def to_pandas(self, datalimit=maxTransferBytes, retain_index=False):
else:
return pd.DataFrame(data=pandas_data)

def save(self, path, index=False):
def save(self, path, index=False, columns=None, file_format="HDF5"):
"""
Save DataFrame to disk, preserving column names.
Expand All @@ -1287,34 +1287,16 @@ def save(self, path, index=False):
File path to save data
index : bool
If True, save the index column. By default, do not save the index.
Notes
-----
This method saves one file per locale of the arkouda server. All
files are prefixed by the path argument and suffixed by their
locale number.
"""
tosave = {k: v for k, v in self.data.items() if (index or k != "index")}
save_all(tosave, path)

def save_table(self, prefix_path, columns=None, index=False, file_format="HDF5"):
"""
Save a dataframe as a table in Parquet
Parameters
__________
prefix_path: str
Path and filename prefix to save to
columns: List
List of columns to include in the file. If None, writes out all columns
file_format: str
'HDF5' or 'Parquet'. Defaults to 'HDF5'
index: Bool
If true, include the index values in the save file.
Notes
______
This function currently uses 'truncate' mode to ensure the file exists before appending.
-----
This method saves one file per locale of the arkouda server. All
files are prefixed by the path argument and suffixed by their
locale number.
"""
# if no columns are stored, we will save all columns
if columns is None:
Expand All @@ -1324,16 +1306,39 @@ def save_table(self, prefix_path, columns=None, index=False, file_format="HDF5")

if index:
data["Index"] = self.index
save_all(data, prefix_path=prefix_path, file_format=file_format)
save_all(data, prefix_path=path, file_format=file_format)

@classmethod
def load_table(cls, prefix_path, file_format="INFER"):
def load(cls, prefix_path, file_format="INFER"):
"""
Load dataframe from file
file_format needed for consistency with other load functions
"""
prefix, extension = os.path.splitext(prefix_path)
first_file = f"{prefix}_LOCALE0000{extension}"
filetype = get_filetype(first_file) if file_format.lower() == "infer" else file_format

# columns load backwards
df = cls(load_all(prefix_path, file_format=filetype))
df_dict = load_all(prefix_path, file_format=filetype)

# this assumes segments will always have corresponding values.
# This should happen due to save config
seg_cols = [col.split("_")[0] for col in df_dict.keys() if col.endswith("_segments")]
df_dict_keys = [
col.split("_")[0] if col.endswith("_segments") or col.endswith("_values") else col
for col in df_dict.keys()
]

# update dict to contain segarrays where applicable if any exist
if len(seg_cols) > 0:
df_dict = {
col: SegArray(df_dict[col + "_segments"], df_dict[col + "_values"])
if col in seg_cols
else df_dict[col]
for col in df_dict_keys
}

df = cls(df_dict)
if filetype == "HDF5":
return df
else:
Expand Down
4 changes: 2 additions & 2 deletions arkouda/pdarrayIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ def import_data(read_path: str, write_file: str = None, return_obj: bool = True,
df = DataFrame(df_def)

if write_file:
df.save_table(write_file, index=index, file_format=filetype)
df.save(write_file, index=index, file_format=filetype)

if return_obj:
return df
Expand Down Expand Up @@ -780,7 +780,7 @@ def export(
"File type not supported. Import is only supported for HDF5 and Parquet file formats."
)

akdf = DataFrame.load_table(read_path, file_format=filetype)
akdf = DataFrame.load(read_path, file_format=filetype)
df = akdf.to_pandas(retain_index=index)

if write_file:
Expand Down
11 changes: 9 additions & 2 deletions arkouda/segarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ def save(
segment_suffix="_segments",
value_suffix="_values",
mode="truncate",
file_format="HDF5",
):
"""
Save the SegArray to HDF5. The result is a collection of HDF5 files, one file
Expand All @@ -810,6 +811,8 @@ def save(
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', add data as a new column to existing files.
file_format : str {'HDF5' | 'Parquet'}
Defaults to `'HDF5'`. Indicates the file format to use to store data.
Returns
-------
Expand All @@ -822,8 +825,12 @@ def save(
"""
if segment_suffix == value_suffix:
raise ValueError("Segment suffix and value suffix must be different")
self.segments.save(prefix_path, dataset=dataset + segment_suffix, mode=mode)
self.values.save(prefix_path, dataset=dataset + value_suffix, mode="append")
self.segments.save(
prefix_path, dataset=dataset + segment_suffix, mode=mode, file_format=file_format
)
self.values.save(
prefix_path, dataset=dataset + value_suffix, mode="append", file_format=file_format
)

@classmethod
def load(
Expand Down
22 changes: 13 additions & 9 deletions tests/dataframe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from shutil import rmtree

import pandas as pd # type: ignore
import pytest
from base_test import ArkoudaTest
from context import arkouda as ak

Expand Down Expand Up @@ -451,8 +450,10 @@ def test_copy(self):
df_copy.__setitem__("userID", ak.array([1, 2, 1, 3, 2, 1]))
self.assertEqual(df.__repr__(), df_copy.__repr__())

def test_save_table(self):
def test_save(self):
d = f"{os.getcwd()}/save_table_test"
if os.path.exists(d):
rmtree(d)
i = list(range(3))
c1 = [9, 7, 17]
c2 = [2, 4, 6]
Expand All @@ -470,21 +471,24 @@ def test_save_table(self):

# make directory to save to so pandas read works
os.mkdir(d)
akdf.save_table(f"{d}/testName", file_format="Parquet")
akdf.save(f"{d}/testName", file_format="Parquet")

ak_loaded = ak.DataFrame.load_table(f"{d}/testName")
ak_loaded = ak.DataFrame.load(f"{d}/testName")
self.assertTrue(validation_df.equals(ak_loaded.to_pandas()))

# test save with index true
akdf.save_table(f"{d}/testName_with_index.pq", file_format="Parquet", index=True)
akdf.save(f"{d}/testName_with_index.pq", file_format="Parquet", index=True)
self.assertTrue(len(glob.glob(f"{d}/testName_with_index*.pq")) == ak.get_config()["numLocales"])

# Commenting the read into pandas out because it requires optional libraries
# pddf = pd.read_parquet("save_table_test", engine='pyarrow')
# self.assertTrue(pddf.equals(validation_df))
# Test for df having seg array col
df = ak.DataFrame({"a": ak.arange(10), "b": ak.SegArray(ak.arange(10), ak.arange(10))})
df.save(f"{d}/seg_test.h5")
self.assertTrue(len(glob.glob(f"{d}/seg_test*.h5")) == ak.get_config()["numLocales"])
ak_loaded = ak.DataFrame.load(f"{d}/seg_test.h5")
self.assertTrue(df.to_pandas().equals(ak_loaded.to_pandas()))

# clean up test files
rmtree("save_table_test/")
rmtree(d)

def test_isin(self):
df = ak.DataFrame({"col_A": ak.array([7, 3]), "col_B": ak.array([1, 9])})
Expand Down
4 changes: 2 additions & 2 deletions tests/import_export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_export_hdf(self):
os.mkdir(f_base)

akdf = self.build_arkouda_dataframe()
akdf.save_table(f"{f_base}/ak_write")
akdf.save(f"{f_base}/ak_write")

pddf = ak.export(f"{f_base}/ak_write", write_file=f"{f_base}/pd_from_ak.h5", index=True)
self.assertTrue(len(glob.glob(f"{f_base}/pd_from_ak.h5")) == 1)
Expand Down Expand Up @@ -103,7 +103,7 @@ def test_export_parquet(self):
os.mkdir(f_base)

akdf = self.build_arkouda_dataframe()
akdf.save_table(f"{f_base}/ak_write", file_format="Parquet")
akdf.save(f"{f_base}/ak_write", file_format="Parquet")
print(akdf.__repr__())

pddf = ak.export(f"{f_base}/ak_write", write_file=f"{f_base}/pd_from_ak.parquet", index=True)
Expand Down

0 comments on commit 8db89d0

Please sign in to comment.