Skip to content

Commit

Permalink
[Dist] Allow reading and writing single-column vector Parquet files. (d…
Browse files Browse the repository at this point in the history
…mlc#5098)

* Allow reading and writing single-column vector Parquet files.

These files are commonly produced by Spark ML's feature processing code.

* [Dist] Only write single-column vector files for Parquet in tests.
  • Loading branch information
thvasilo authored Jan 5, 2023
1 parent 7ee550f commit 9890201
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 11 deletions.
7 changes: 7 additions & 0 deletions tests/tools/test_dist_part.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def _test_chunk_graph(
num_chunks,
data_fmt = 'numpy',
edges_fmt = 'csv',
vector_rows = False,
num_chunks_nodes = None,
num_chunks_edges = None,
num_chunks_node_data = None,
Expand All @@ -91,6 +92,7 @@ def _test_chunk_graph(

g = create_chunked_dataset(root_dir, num_chunks,
data_fmt=data_fmt, edges_fmt=edges_fmt,
vector_rows=vector_rows,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
Expand Down Expand Up @@ -191,6 +193,11 @@ def test_data(
def test_chunk_graph_basics(num_chunks, data_fmt, edges_fmt):
_test_chunk_graph(num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt)

@pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("vector_rows", [True, False])
def test_chunk_graph_vector_rows(num_chunks, vector_rows):
_test_chunk_graph(num_chunks, data_fmt='parquet', edges_fmt='parquet', vector_rows=vector_rows)


@pytest.mark.parametrize(
"num_chunks, "
Expand Down
28 changes: 23 additions & 5 deletions tests/tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import dgl
from distpartitioning import array_readwriter
from distpartitioning.array_readwriter.parquet import ParquetArrayParser
from files import setdir


def _chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
def _chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt, vector_rows=False):
paths = []
offset = 0

for j, n in enumerate(chunk_sizes):
path = os.path.abspath(path_fmt % j)
arr_chunk = arr[offset: offset + n]
shape = arr_chunk.shape
logging.info("Chunking %d-%d" % (offset, offset + n))
array_readwriter.get_array_parser(**fmt_meta).write(path, arr_chunk)
# If requested we write multi-column arrays as single-column vector Parquet files
array_parser = array_readwriter.get_array_parser(**fmt_meta)
if isinstance(array_parser, ParquetArrayParser) and len(shape) > 1 and shape[1] > 1:
array_parser.write(path, arr_chunk, vector_rows=vector_rows)
else:
array_parser.write(path, arr_chunk)
offset += n
paths.append(path)

Expand Down Expand Up @@ -76,7 +83,8 @@ def _init(g, num_chunks, key, kwargs=None):


def _chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_format, **kwargs
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_format,
vector_rows=False, **kwargs
):
# First deal with ndata and edata that are homogeneous
# (i.e. not a dict-of-dict)
Expand Down Expand Up @@ -190,6 +198,7 @@ def _chunk_graph(
writer_fmt_meta,
chunk_sizes,
key + "-%d." + file_suffix,
vector_rows=vector_rows,
)
ndata_meta[key] = ndata_key_meta

Expand Down Expand Up @@ -230,6 +239,7 @@ def _chunk_graph(
writer_fmt_meta,
chunk_sizes,
key + "-%d." + file_suffix,
vector_rows=vector_rows,
)
edata_meta[key] = edata_key_meta

Expand All @@ -250,6 +260,7 @@ def chunk_graph(
output_path,
data_fmt="numpy",
edges_fmt='csv',
vector_rows=False,
**kwargs,
):
"""
Expand All @@ -276,6 +287,10 @@ def chunk_graph(
The output directory saving the chunked graph.
data_fmt : str
Format of node/edge data: 'numpy' or 'parquet'.
edges_fmt : str
Format of edges files: 'csv' or 'parquet'.
vector_rows : str
When true will write parquet files as single-column vector row files.
kwargs : dict
Key word arguments to control chunk details.
"""
Expand All @@ -287,12 +302,14 @@ def chunk_graph(
edata[key] = os.path.abspath(edata[key])
with setdir(output_path):
_chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_fmt, **kwargs
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_fmt,
vector_rows, **kwargs
)


def create_chunked_dataset(
root_dir, num_chunks, data_fmt="numpy", edges_fmt='csv', **kwargs):
root_dir, num_chunks, data_fmt="numpy", edges_fmt='csv',
vector_rows=False, **kwargs):
"""
This function creates a sample dataset, based on MAG240 dataset.
Expand Down Expand Up @@ -531,6 +548,7 @@ def rand_edges(num_src, num_dst, num_edges):
output_path=output_dir,
data_fmt=data_fmt,
edges_fmt=edges_fmt,
vector_rows=vector_rows,
**kwargs,
)
print("Done with creating chunked graph")
Expand Down
31 changes: 25 additions & 6 deletions tools/distpartitioning/array_readwriter/parquet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet
Expand All @@ -16,26 +17,44 @@ def read(self, path):
logging.info("Reading from %s using parquet format" % path)
metadata = pyarrow.parquet.read_metadata(path)
metadata = metadata.schema.to_arrow_schema().metadata

# As parquet data are tabularized, we assume the dim of ndarray is 2.
# If not, it should be explictly specified in the file as metadata.
shape = metadata.get(b"shape", None)
if metadata:
shape = metadata.get(b"shape", None)
else:
shape = None
table = pyarrow.parquet.read_table(path, memory_map=True)
logging.info("Done reading from %s" % path)
arr = table.to_pandas().to_numpy()

data_types = table.schema.types
# Spark ML feature processing produces single-column parquet files where each row is a vector object
if len(data_types) == 1 and isinstance(data_types[0], pyarrow.ListType):
arr = np.array(table.to_pandas().iloc[:, 0].to_list())
logging.debug(f"Parquet data under {path} converted from single vector per row to ndarray")
else:
arr = table.to_pandas().to_numpy()
if not shape:
logging.warning(
"Shape information not found in the metadata, read the data as "
"a 2 dim array."
)
logging.info("Done reading from %s" % path)
shape = tuple(eval(shape.decode())) if shape else arr.shape
return arr.reshape(shape)

def write(self, path, array):
def write(self, path, array, vector_rows=False):
logging.info("Writing to %s using parquet format" % path)
shape = array.shape
if len(shape) > 2:
array = array.reshape(shape[0], -1)
table = pyarrow.Table.from_pandas(pd.DataFrame(array))
table = table.replace_schema_metadata({"shape": str(shape)})
if vector_rows:
table = pyarrow.table(
[pyarrow.array(array.tolist())],
names=["vector"])
logging.info("Writing to %s using single-vector rows..." % path)
else:
table = pyarrow.Table.from_pandas(pd.DataFrame(array))
table = table.replace_schema_metadata({"shape": str(shape)})

pyarrow.parquet.write_table(table, path)
logging.info("Done writing to %s" % path)

0 comments on commit 9890201

Please sign in to comment.