From c59795af8541682b8f06d2398c6a856b2aa0bc3a Mon Sep 17 00:00:00 2001 From: Jinfeng Li Date: Thu, 9 May 2024 09:39:45 -0700 Subject: [PATCH] add notebook for ann and remove exactNearestNeighborsJoin API from ANN class (#650) Signed-off-by: Jinfeng --- notebooks/approx-nearest-neighbors.ipynb | 466 +++++++++++++++++++++++ python/src/spark_rapids_ml/knn.py | 58 +-- 2 files changed, 499 insertions(+), 25 deletions(-) create mode 100644 notebooks/approx-nearest-neighbors.ipynb diff --git a/notebooks/approx-nearest-neighbors.ipynb b/notebooks/approx-nearest-neighbors.ipynb new file mode 100644 index 00000000..2f6838d5 --- /dev/null +++ b/notebooks/approx-nearest-neighbors.ipynb @@ -0,0 +1,466 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "db673eda-86ee-47a7-975d-aa3d36c2f407", + "metadata": {}, + "source": [ + "# Approximate Nearest Neighbors" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eeb78fae-ae08-4b64-8daa-b579af1d9ba6", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from spark_rapids_ml.knn import ApproximateNearestNeighbors\n", + "from pyspark.sql.functions import col" + ] + }, + { + "cell_type": "markdown", + "id": "17955bd7-b911-4da6-ad1b-02ae4a5f0a2b", + "metadata": {}, + "source": [ + "### Create synthetic dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "38c671ba-b6de-4414-992f-957e28f3a8be", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "dim = 2000\n", + "dtype = 'float32'\n", + "np.random.seed(1)\n", + "\n", + "# items\n", + "num_vecs = 100000\n", + "vec = np.random.randn(dim).reshape([1,dim])\n", + "arr = np.random.randn(num_vecs).reshape([num_vecs,1])\n", + "items = arr * vec\n", + "items = items.astype(dtype)\n", + "\n", + "# items extra data\n", + "items_extra = np.random.randn(num_vecs)\n", + "\n", + "# queries\n", + "num_vecs = 50\n", + "vec = np.random.randn(dim).reshape([1,dim])\n", + "arr = np.random.randn(num_vecs).reshape([num_vecs,1])\n", + "queries = arr * vec\n", + "queries = queries.astype(dtype)\n", + "\n", + "# queries extra data\n", + "queries_extra = np.random.randn(num_vecs)" + ] + }, + { + "cell_type": "markdown", + "id": "4fb6683a", + "metadata": {}, + "source": [ + "### Configure Spark\n", + "It is highly recommend to increase the spark.sql.execution.arrow.maxRecordsPerBatch from the default 10000 to a larger value. Spark Rapids ML applies cuML approximate nearest neighbor search on every data batch independently, and some algorithms have requirements on the batch size. For example, the ivfflat algorithm requires that the number of vectors in a batch must be more than the number of kmeans centroids (specified by 'nlist')." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "901aab99", + "metadata": {}, + "outputs": [], + "source": [ + "spark.conf.set(\"spark.sql.execution.arrow.maxRecordsPerBatch\", 0) # set to unlimited" + ] + }, + { + "cell_type": "markdown", + "id": "01d6b576-460a-41d1-a22b-0eb303afcccc", + "metadata": {}, + "source": [ + "### Convert dataset to Spark DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0be00961-aac8-4bb1-bcc8-8971971ff78f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "pd_items = pd.DataFrame({\"features\": list(items), \"extra\": items_extra})\n", + "item_df = spark.createDataFrame(pd_items, \"features array, extra float\")\n", + "\n", + "pd_queries = pd.DataFrame({\"features\": list(queries), \"extra\": queries_extra})\n", + "query_df = spark.createDataFrame(pd_queries, \"features array, extra float\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "05c0155a-b72f-4db3-a9aa-51522b18ee61", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "item_df.show(5, truncate=80)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aafd06b2-8150-49d4-840f-3271e9914e76", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "query_df.show(5, truncate=80)" + ] + }, + { + "cell_type": "markdown", + "id": "204b7e72-737e-4a8d-81ce-5a275cb7446a", + "metadata": {}, + "source": [ + "## Spark RAPIDS ML (GPU)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c105d965-cec5-430b-be88-3a9d1476147c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "knn = ApproximateNearestNeighbors(k=2)\n", + "knn.setInputCol(\"features\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "731d2f06-7ca4-4395-a856-e5a8ca403d49", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "knn_model = knn.fit(item_df)" + ] + }, + { + "cell_type": "markdown", + "id": "2d1680c4-fde6-4016-a06c-1db89b53db43", + "metadata": {}, + "source": [ + "Note: `fit` just stores a reference to the `item_df` in the returned model. As such, saving the estimator or model is not supported, since their only state is the referenced dataset. Instead, just re-create and re-fit the estimator on the dataset, as needed." + ] + }, + { + "cell_type": "markdown", + "id": "7210e792-f7aa-4581-8b84-aae2b52d6baf", + "metadata": {}, + "source": [ + "#### kneighbors\n", + "\n", + "This API takes a DataFrame of query vectors, and returns the `k` approximate nearest item vectors for each query vector, represented by their unique ids and distances. The unique ids are automatically generated if not provided, so the input datasets are also returned with their unique ids." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "61d09a34-15a1-456a-a6dd-fc9abad2716a", + "metadata": {}, + "outputs": [], + "source": [ + "item_id_df, query_id_df, neighbor_df = knn_model.kneighbors(query_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fde68c05-5cde-4b1e-8ed3-ae5fe2d7015b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# original item_df is returned with unique identifiers\n", + "item_id_df.show(5, truncate=80)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "281c204b-5804-4ae8-9064-e6fe08355618", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# original query_df is returned with unique identifiers\n", + "query_id_df.show(5, truncate=80)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "457b702d-1af0-4c38-b1b5-dd1a1f5f1dec", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# neighbor_df shows the nearest item vectors for each query vector, represented by their unique ids and distances.\n", + "neighbor_df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "62f81b49-6ff3-4656-8b0c-27b32a3f60de", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# change the value of 'k'\n", + "knn_model.setK(3)\n", + "_, _, neighbor_df = knn_model.kneighbors(query_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a52e7e03-0bac-4378-b219-eda5d00523da", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "neighbor_df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "21d0f88c-8ba0-4f25-a42f-45c864972f54", + "metadata": {}, + "source": [ + "#### approxNearestNeighborsJoin\n", + "\n", + "This API returns a join of the query vectors and their `k` approximate nearest item vectors." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6dc0fd29-f25e-4d89-95fe-ebd2efbfe9ea", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "result_df = knn_model.approxSimilarityJoin(query_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5196d375-db08-4292-a865-61fecd07fe41", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "result_df.orderBy(\"query_df\", \"item_df\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "7d1d41c5-c6c5-4f37-942a-cecb882a7862", + "metadata": {}, + "source": [ + "For each returned query or item vector, all columns from the original input DataFrame will be returned as a single struct column." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27993cac-bc6d-4f57-b6da-549857c9218f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "result_df.select(\"query_df.*\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "8cd56670-7633-4fe6-ab75-0fd680c63baa", + "metadata": {}, + "source": [ + "# PySpark\n", + "\n", + "PySpark does not have an exact kNN implementation, but it does have an LSH-based Approximate Nearest Neighbors implementation, shown here to illustrate the similarity between the APIs. However, the algorithms are very different, so their results are only roughly comparable, and it would require elaborate tuning of parameters to produce similar results." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a21c783f-c4ed-43b3-a869-7395b94152f3", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from pyspark.ml.feature import BucketedRandomProjectionLSH\n", + "from pyspark.ml.functions import array_to_vector\n", + "from pyspark.ml.linalg import Vectors\n", + "from pyspark.sql.functions import col\n", + "\n", + "item_vector_df = item_df.select(array_to_vector(item_df.features).alias(\"features\"))\n", + "query_vector_df = query_df.select(array_to_vector(query_df.features).alias(\"features\"))\n", + "key = Vectors.dense([1.0] * dim)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4288d38-6426-41c0-87a2-a75bc9d4bbda", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "item_vector_df.show(5, truncate=80)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "655f8071-c90e-464b-aa25-58d96bbeebea", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "query_vector_df.show(5, truncate=80)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c8712f3f-4662-409e-8172-29308ec84e0c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "brp = BucketedRandomProjectionLSH(inputCol=\"features\", outputCol=\"hashes\", bucketLength=2.0, numHashTables=3)\n", + "model = brp.fit(item_vector_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4aa5813b-41b9-4b51-9977-721e6f90118e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Feature Transformation\n", + "print(\"The hashed dataset where hashed values are stored in the column 'hashes':\")\n", + "model.transform(item_vector_df).show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90f7049a-5ed1-44a8-b0d0-be63f6458e0a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Compute the locality sensitive hashes for the input rows, then perform approximate similarity join.\n", + "# We could avoid computing hashes by passing in the already-transformed dataset, e.g.\n", + "# `model.approxSimilarityJoin(transformed_item_vector_df, transformed_query_vector_df, 3.0)`\n", + "print(\"Approximately joining items and queries on Euclidean distance smaller than 3.0:\")\n", + "model.approxSimilarityJoin(item_vector_df, query_vector_df, 3.0, distCol=\"EuclideanDistance\")\\\n", + " .select(col(\"datasetA.features\").alias(\"item\"),\n", + " col(\"datasetB.features\").alias(\"query\"),\n", + " col(\"EuclideanDistance\")).orderBy(\"query\", \"item\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e70d24b7-db70-4041-9794-7cd5451bad76", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Compute the locality sensitive hashes for the input rows, then perform approximate nearest neighbor search.\n", + "# We could avoid computing hashes by passing in the already-transformed dataset, e.g.\n", + "# `model.approxNearestNeighbors(transformed_item_vector_df, key, 2)`\n", + "print(\"Approximately searching item vectors for 2 nearest neighbors of the key:\")\n", + "model.approxNearestNeighbors(item_vector_df, key, 2).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79a8dc90-379a-409b-8c8a-a26f33910c0d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# saves the LSH hashes for the input rows\n", + "model.write().overwrite().save(\"/tmp/ann_model\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/python/src/spark_rapids_ml/knn.py b/python/src/spark_rapids_ml/knn.py index eb720689..97709f9c 100644 --- a/python/src/spark_rapids_ml/knn.py +++ b/python/src/spark_rapids_ml/knn.py @@ -394,34 +394,11 @@ def _get_cuml_transform_func( def kneighbors(self, query_df: DataFrame) -> Tuple[DataFrame, DataFrame, DataFrame]: raise NotImplementedError() - def exactNearestNeighborsJoin( + def _nearest_neighbors_join( self, query_df: DataFrame, distCol: str = "distCol", ) -> DataFrame: - """ - This function returns the k exact nearest neighbors (knn) in item_df of each query vector in query_df. - item_df is the dataframe passed to the fit function of the NearestNeighbors estimator. - Note that the knn relationship is asymmetric with respect to the input datasets (e.g., if x is a knn of y - , y is not necessarily a knn of x). - - Parameters - ---------- - query_df: pyspark.sql.DataFrame - the query_df dataframe. Each row represents a query vector. - - distCol: str - the name of the output distance column - - Returns - ------- - knnjoin_df: pyspark.sql.DataFrame - the result dataframe that has three columns (item_df, query_df, distCol). - item_df column is of struct type that includes as fields all the columns of input item dataframe. - Similarly, query_df column is of struct type that includes as fields all the columns of input query dataframe. - distCol is the distance column. A row in knnjoin_df is in the format (v1, v2, dist(v1, v2)), - where item_vector v1 is one of the k nearest neighbors of query_vector v2 and their distance is dist(v1, v2). - """ id_col_name = self._getIdColOrDefault() @@ -723,6 +700,37 @@ async def do_allGather() -> List[str]: return _cuml_fit + def exactNearestNeighborsJoin( + self, + query_df: DataFrame, + distCol: str = "distCol", + ) -> DataFrame: + """ + This function returns the k exact nearest neighbors (knn) in item_df of each query vector in query_df. + item_df is the dataframe passed to the fit function of the NearestNeighbors estimator. + Note that the knn relationship is asymmetric with respect to the input datasets (e.g., if x is a knn of y + , y is not necessarily a knn of x). + + Parameters + ---------- + query_df: pyspark.sql.DataFrame + the query_df dataframe. Each row represents a query vector. + + distCol: str + the name of the output distance column + + Returns + ------- + knnjoin_df: pyspark.sql.DataFrame + the result dataframe that has three columns (item_df, query_df, distCol). + item_df column is of struct type that includes as fields all the columns of input item dataframe. + Similarly, query_df column is of struct type that includes as fields all the columns of input query dataframe. + distCol is the distance column. A row in knnjoin_df is in the format (v1, v2, dist(v1, v2)), + where item_vector v1 is one of the k nearest neighbors of query_vector v2 and their distance is dist(v1, v2). + """ + + return self._nearest_neighbors_join(query_df=query_df, distCol=distCol) + class ApproximateNearestNeighborsClass(_CumlClass): @@ -1323,4 +1331,4 @@ def approxSimilarityJoin( where item_vector v1 is one of the k nearest neighbors of query_vector v2 and their distance is dist(v1, v2). """ - return self.exactNearestNeighborsJoin(query_df, distCol) + return self._nearest_neighbors_join(query_df, distCol)