From 9890201d586726f88df70cb5ee66ce0ff4191b6d Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Thu, 5 Jan 2023 04:37:28 -0800 Subject: [PATCH] [Dist] Allow reading and writing single-column vector Parquet files. (#5098) * Allow reading and writing single-column vector Parquet files. These files are commonly produced by Spark ML's feature processing code. * [Dist] Only write single-column vector files for Parquet in tests. --- tests/tools/test_dist_part.py | 7 +++++ tests/tools/utils.py | 28 ++++++++++++++--- .../array_readwriter/parquet.py | 31 +++++++++++++++---- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index 7b866bc3b341..b431ef55947a 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -82,6 +82,7 @@ def _test_chunk_graph( num_chunks, data_fmt = 'numpy', edges_fmt = 'csv', + vector_rows = False, num_chunks_nodes = None, num_chunks_edges = None, num_chunks_node_data = None, @@ -91,6 +92,7 @@ def _test_chunk_graph( g = create_chunked_dataset(root_dir, num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt, + vector_rows=vector_rows, num_chunks_nodes=num_chunks_nodes, num_chunks_edges=num_chunks_edges, num_chunks_node_data=num_chunks_node_data, @@ -191,6 +193,11 @@ def test_data( def test_chunk_graph_basics(num_chunks, data_fmt, edges_fmt): _test_chunk_graph(num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt) +@pytest.mark.parametrize("num_chunks", [1, 8]) +@pytest.mark.parametrize("vector_rows", [True, False]) +def test_chunk_graph_vector_rows(num_chunks, vector_rows): + _test_chunk_graph(num_chunks, data_fmt='parquet', edges_fmt='parquet', vector_rows=vector_rows) + @pytest.mark.parametrize( "num_chunks, " diff --git a/tests/tools/utils.py b/tests/tools/utils.py index 331ba16ea7b6..a2a46a1f7817 100644 --- a/tests/tools/utils.py +++ b/tests/tools/utils.py @@ -6,18 +6,25 @@ import dgl from distpartitioning import array_readwriter +from distpartitioning.array_readwriter.parquet import ParquetArrayParser from files import setdir -def _chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt): +def _chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt, vector_rows=False): paths = [] offset = 0 for j, n in enumerate(chunk_sizes): path = os.path.abspath(path_fmt % j) arr_chunk = arr[offset: offset + n] + shape = arr_chunk.shape logging.info("Chunking %d-%d" % (offset, offset + n)) - array_readwriter.get_array_parser(**fmt_meta).write(path, arr_chunk) + # If requested we write multi-column arrays as single-column vector Parquet files + array_parser = array_readwriter.get_array_parser(**fmt_meta) + if isinstance(array_parser, ParquetArrayParser) and len(shape) > 1 and shape[1] > 1: + array_parser.write(path, arr_chunk, vector_rows=vector_rows) + else: + array_parser.write(path, arr_chunk) offset += n paths.append(path) @@ -76,7 +83,8 @@ def _init(g, num_chunks, key, kwargs=None): def _chunk_graph( - g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_format, **kwargs + g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_format, + vector_rows=False, **kwargs ): # First deal with ndata and edata that are homogeneous # (i.e. not a dict-of-dict) @@ -190,6 +198,7 @@ def _chunk_graph( writer_fmt_meta, chunk_sizes, key + "-%d." + file_suffix, + vector_rows=vector_rows, ) ndata_meta[key] = ndata_key_meta @@ -230,6 +239,7 @@ def _chunk_graph( writer_fmt_meta, chunk_sizes, key + "-%d." + file_suffix, + vector_rows=vector_rows, ) edata_meta[key] = edata_key_meta @@ -250,6 +260,7 @@ def chunk_graph( output_path, data_fmt="numpy", edges_fmt='csv', + vector_rows=False, **kwargs, ): """ @@ -276,6 +287,10 @@ def chunk_graph( The output directory saving the chunked graph. data_fmt : str Format of node/edge data: 'numpy' or 'parquet'. + edges_fmt : str + Format of edges files: 'csv' or 'parquet'. + vector_rows : str + When true will write parquet files as single-column vector row files. kwargs : dict Key word arguments to control chunk details. """ @@ -287,12 +302,14 @@ def chunk_graph( edata[key] = os.path.abspath(edata[key]) with setdir(output_path): _chunk_graph( - g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_fmt, **kwargs + g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_fmt, + vector_rows, **kwargs ) def create_chunked_dataset( - root_dir, num_chunks, data_fmt="numpy", edges_fmt='csv', **kwargs): + root_dir, num_chunks, data_fmt="numpy", edges_fmt='csv', + vector_rows=False, **kwargs): """ This function creates a sample dataset, based on MAG240 dataset. @@ -531,6 +548,7 @@ def rand_edges(num_src, num_dst, num_edges): output_path=output_dir, data_fmt=data_fmt, edges_fmt=edges_fmt, + vector_rows=vector_rows, **kwargs, ) print("Done with creating chunked graph") diff --git a/tools/distpartitioning/array_readwriter/parquet.py b/tools/distpartitioning/array_readwriter/parquet.py index f6bc2b353e04..cbb0dd24517c 100644 --- a/tools/distpartitioning/array_readwriter/parquet.py +++ b/tools/distpartitioning/array_readwriter/parquet.py @@ -1,5 +1,6 @@ import logging +import numpy as np import pandas as pd import pyarrow import pyarrow.parquet @@ -16,26 +17,44 @@ def read(self, path): logging.info("Reading from %s using parquet format" % path) metadata = pyarrow.parquet.read_metadata(path) metadata = metadata.schema.to_arrow_schema().metadata + # As parquet data are tabularized, we assume the dim of ndarray is 2. # If not, it should be explictly specified in the file as metadata. - shape = metadata.get(b"shape", None) + if metadata: + shape = metadata.get(b"shape", None) + else: + shape = None table = pyarrow.parquet.read_table(path, memory_map=True) - logging.info("Done reading from %s" % path) - arr = table.to_pandas().to_numpy() + + data_types = table.schema.types + # Spark ML feature processing produces single-column parquet files where each row is a vector object + if len(data_types) == 1 and isinstance(data_types[0], pyarrow.ListType): + arr = np.array(table.to_pandas().iloc[:, 0].to_list()) + logging.debug(f"Parquet data under {path} converted from single vector per row to ndarray") + else: + arr = table.to_pandas().to_numpy() if not shape: logging.warning( "Shape information not found in the metadata, read the data as " "a 2 dim array." ) + logging.info("Done reading from %s" % path) shape = tuple(eval(shape.decode())) if shape else arr.shape return arr.reshape(shape) - def write(self, path, array): + def write(self, path, array, vector_rows=False): logging.info("Writing to %s using parquet format" % path) shape = array.shape if len(shape) > 2: array = array.reshape(shape[0], -1) - table = pyarrow.Table.from_pandas(pd.DataFrame(array)) - table = table.replace_schema_metadata({"shape": str(shape)}) + if vector_rows: + table = pyarrow.table( + [pyarrow.array(array.tolist())], + names=["vector"]) + logging.info("Writing to %s using single-vector rows..." % path) + else: + table = pyarrow.Table.from_pandas(pd.DataFrame(array)) + table = table.replace_schema_metadata({"shape": str(shape)}) + pyarrow.parquet.write_table(table, path) logging.info("Done writing to %s" % path)