Skip to content

Commit

Permalink
Fix issue of 24.12 RC [skip ci] (#826)
Browse files Browse the repository at this point in the history
Additional changes to fix issue of 24.12 RC based on
#822

NOTE: this PR must be merged as `create a merge commit`
  • Loading branch information
YanxuanLiu authored Jan 15, 2025
2 parents 7c4686c + f9624be commit 5ec5020
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 18 deletions.
43 changes: 27 additions & 16 deletions python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,8 +790,9 @@ class UMAP(UMAPClass, _CumlEstimatorSupervised, _UMAPCumlParams):
sample_fraction : float (optional, default=1.0)
The fraction of the dataset to be used for fitting the model. Since fitting is done on a single node, very large
datasets must be subsampled to fit within the node's memory and execute in a reasonable time. Smaller fractions
will result in faster training, but may result in sub-optimal embeddings.
datasets must be subsampled to fit within the node's memory. Smaller fractions will result in faster training, but
may decrease embedding quality. Note: this is not guaranteed to provide exactly the fraction specified of the total
count of the given DataFrame.
featuresCol: str or List[str]
The feature column names, spark-rapids-ml supports vector, array and columnar as the input.\n
Expand Down Expand Up @@ -1463,22 +1464,30 @@ def write_sparse_array(array: scipy.sparse.spmatrix, df_dir: str) -> None:
schema=indices_data_schema,
)

indptr_df.write.parquet(
os.path.join(df_dir, "indptr.parquet"), mode="overwrite"
)
indices_data_df.write.parquet(
os.path.join(df_dir, "indices_data.parquet"), mode="overwrite"
)
indptr_df.write.parquet(os.path.join(df_dir, "indptr.parquet"))
indices_data_df.write.parquet(os.path.join(df_dir, "indices_data.parquet"))

def write_dense_array(array: np.ndarray, df_path: str) -> None:
assert (
spark.conf.get("spark.sql.execution.arrow.pyspark.enabled") == "true"
), "spark.sql.execution.arrow.pyspark.enabled must be set to true to persist array attributes"

schema = StructType(
[
StructField(f"_{i}", FloatType(), False)
for i in range(1, array.shape[1] + 1)
StructField("row_id", LongType(), False),
StructField("data", ArrayType(FloatType(), False), False),
]
)
data_df = spark.createDataFrame(pd.DataFrame(array), schema=schema)
data_df.write.parquet(df_path, mode="overwrite")
data_df = spark.createDataFrame(
pd.DataFrame(
{
"row_id": range(array.shape[0]),
"data": list(array),
}
),
schema=schema,
)
data_df.write.parquet(df_path)

DefaultParamsWriter.saveMetadata(
self.instance,
Expand All @@ -1491,12 +1500,12 @@ def write_dense_array(array: np.ndarray, df_path: str) -> None:
},
)

# get a copy, since we're going to modify the array attributes
model_attributes = self.instance._get_model_attributes()
assert model_attributes is not None
model_attributes = model_attributes.copy()

data_path = os.path.join(path, "data")
if not os.path.exists(data_path):
os.makedirs(data_path)

for key in ["embedding_", "raw_data_"]:
array = model_attributes[key]
Expand Down Expand Up @@ -1547,8 +1556,10 @@ def read_sparse_array(
return scipy.sparse.csr_matrix((data, indices, indptr), shape=csr_shape)

def read_dense_array(df_path: str) -> np.ndarray:
data_df = spark.read.parquet(df_path)
return np.array(data_df.collect(), dtype=np.float32)
data_df = spark.read.parquet(df_path).orderBy("row_id")
pdf = data_df.toPandas()
assert type(pdf) == pd.DataFrame
return np.array(list(pdf.data), dtype=np.float32)

metadata = DefaultParamsReader.loadMetadata(path, self.sc)
data_path = os.path.join(path, "data")
Expand Down
41 changes: 39 additions & 2 deletions python/tests/test_umap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@
from typing import Any, Dict, List, Optional, Tuple, Union

import cupy as cp
import cupyx
import numpy as np
import pytest
import scipy
Expand Down Expand Up @@ -415,6 +414,9 @@ def test_umap_copy() -> None:
def test_umap_model_persistence(
sparse_fit: bool, gpu_number: int, tmp_path: str
) -> None:
import os
import re

import pyspark
from packaging import version

Expand Down Expand Up @@ -459,7 +461,42 @@ def test_umap_model_persistence(
path = tmp_path + "/umap_tests"
model_path = f"{path}/umap_model"
umap_model.write().overwrite().save(model_path)

try:
umap_model.write().save(model_path)
assert False, "Overwriting should not be permitted"
except Exception as e:
assert re.search(r"Output directory .* already exists", str(e))

# double check expected files/directories
model_dir_contents = os.listdir(model_path)
data_dir_contents = os.listdir(f"{model_path}/data")
assert set(model_dir_contents) == {"data", "metadata"}
if sparse_fit:
assert set(data_dir_contents) == {
"metadata.json",
"embedding_.parquet",
"raw_data_csr",
}
assert set(os.listdir(f"{model_path}/data/raw_data_csr")) == {
"indptr.parquet",
"indices_data.parquet",
}
else:
assert set(data_dir_contents) == {
"metadata.json",
"embedding_.parquet",
"raw_data_.parquet",
}

# make sure we can overwrite
umap_model._cuml_params["n_neighbors"] = 10
umap_model._cuml_params["set_op_mix_ratio"] = 0.4
umap_model.write().overwrite().save(model_path)

umap_model_loaded = UMAPModel.load(model_path)
assert umap_model_loaded._cuml_params["n_neighbors"] == 10
assert umap_model_loaded._cuml_params["set_op_mix_ratio"] == 0.4
_assert_umap_model(umap_model_loaded, input_raw_data)


Expand Down

0 comments on commit 5ec5020

Please sign in to comment.