diff --git a/integration_tests/src/main/python/hashing_test.py b/integration_tests/src/main/python/hashing_test.py index e3ef67bc9f9..5180418c8e0 100644 --- a/integration_tests/src/main/python/hashing_test.py +++ b/integration_tests/src/main/python/hashing_test.py @@ -36,7 +36,7 @@ _struct_of_xxhash_gens = StructGen([(f"c{i}", g) for i, g in enumerate(_atomic_gens)]) -# will be used by HyperLogLogPlusPLus(approx_count_distinct) +# This is also used by HyperLogLogPlusPLus(approx_count_distinct) xxhash_gens = (_atomic_gens + [_struct_of_xxhash_gens] + single_level_array_gens + nested_array_gens_sample + [ all_basic_struct_gen, diff --git a/integration_tests/src/main/python/hyper_log_log_plus_plus_test.py b/integration_tests/src/main/python/hyper_log_log_plus_plus_test.py new file mode 100644 index 00000000000..3b9ebfe927e --- /dev/null +++ b/integration_tests/src/main/python/hyper_log_log_plus_plus_test.py @@ -0,0 +1,38 @@ +# Copyright (c) 2021-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_sql +from data_gen import * +from hashing_test import xxhash_gens +from marks import ignore_order + + +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', xxhash_gens, ids=idfn) +def test_hllpp_groupby(data_gen): + assert_gpu_and_cpu_are_equal_sql( + lambda spark: gen_df(spark, [("c1", int_gen), ("c2", data_gen)]), + "tab", + "select c1, APPROX_COUNT_DISTINCT(c2) from tab group by c1") + + +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', xxhash_gens, ids=idfn) +def test_hllpp_reduction(data_gen): + assert_gpu_and_cpu_are_equal_sql( + lambda spark: unary_op_df(spark, data_gen), + "tab", + "select APPROX_COUNT_DISTINCT(a) from tab") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index c1ede7554a6..9a1b1dfc2bb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3988,6 +3988,36 @@ object GpuOverrides extends Logging { GpuDynamicPruningExpression(child) } }), + expr[HyperLogLogPlusPlus]( + "Aggregation approximate count distinct", + ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG, + // HyperLogLogPlusPlus depends on Xxhash64 + // HyperLogLogPlusPlus supports all the types that Xxhash 64 supports + Seq(ParamCheck("input",XxHash64Shims.supportedTypes, TypeSig.all))), + (a, conf, p, r) => new UnaryExprMeta[HyperLogLogPlusPlus](a, conf, p, r) { + + // It's the same as Xxhash64 + override def tagExprForGpu(): Unit = { + val maxDepth = a.children.map( + c => XxHash64Utils.computeMaxStackSize(c.dataType)).max + if (maxDepth > Hash.MAX_STACK_DEPTH) { + willNotWorkOnGpu(s"The data type requires a stack depth of $maxDepth, " + + s"which exceeds the GPU limit of ${Hash.MAX_STACK_DEPTH}. " + + "The algorithm to calculate stack depth: " + + "1: Primitive type counts 1 depth; " + + "2: Array of Structure counts: 1 + depthOf(Structure); " + + "3: Array of Other counts: depthOf(Other); " + + "4: Structure counts: 1 + max of depthOf(child); " + + "5: Map counts: 2 + max(depthOf(key), depthOf(value)); " + ) + } + } + + override def convertToGpu(child: Expression): GpuExpression = { + GpuHyperLogLogPlusPlus(child, a.relativeSD) + } + } + ), SparkShimImpl.ansiCastRule ).collect { case r if r != null => (r.getClassFor.asSubclass(classOf[Expression]), r)}.toMap diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHyperLogLogPlusPlus.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHyperLogLogPlusPlus.scala new file mode 100644 index 00000000000..f78ef47a5e4 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHyperLogLogPlusPlus.scala @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.aggregate + +import scala.collection.immutable.Seq + +import ai.rapids.cudf +import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation} +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.withResourceIfAllowed +import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression +import com.nvidia.spark.rapids.jni.HyperLogLogPlusPlusHostUDF +import com.nvidia.spark.rapids.shims.ShimExpression + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.rapids.{GpuCreateNamedStruct, GpuGetStructField} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class CudfHLLPP(override val dataType: DataType, + precision: Int) extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => input.reduce( + ReductionAggregation.hostUDF( + HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF( + HyperLogLogPlusPlusHostUDF.AggregationType.Reduction, precision)), + DType.STRUCT) + override lazy val groupByAggregate: GroupByAggregation = + GroupByAggregation.hostUDF( + HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF( + HyperLogLogPlusPlusHostUDF.AggregationType.GroupBy, precision) + ) + override val name: String = "CudfHyperLogLogPlusPlus" +} + +case class CudfMergeHLLPP(override val dataType: DataType, + precision: Int) + extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => input.reduce( + ReductionAggregation.hostUDF( + HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF( + HyperLogLogPlusPlusHostUDF.AggregationType.Reduction_MERGE, precision)), + DType.STRUCT) + override lazy val groupByAggregate: GroupByAggregation = + GroupByAggregation.hostUDF( + HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF( + HyperLogLogPlusPlusHostUDF.AggregationType.GroupByMerge, precision) + ) + override val name: String = "CudfMergeHyperLogLogPlusPlus" +} + +/** + * Perform the final evaluation step to compute approximate count distinct from sketches. + * Input is long columns, first construct struct of long then feed to cuDF + */ +case class GpuHyperLogLogPlusPlusEvaluation(childExpr: Expression, + precision: Int) + extends GpuExpression with ShimExpression { + override def dataType: DataType = LongType + + override def nullable: Boolean = false + + override def prettyName: String = "HyperLogLogPlusPlus_evaluation" + + override def children: scala.Seq[Expression] = Seq(childExpr) + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches => + val distinctValues = HyperLogLogPlusPlusHostUDF.estimateDistinctValueFromSketches( + sketches.getBase, precision) + GpuColumnVector.from(distinctValues, LongType) + } + } +} + +/** + * Gpu version of HyperLogLogPlusPlus + * Spark APPROX_COUNT_DISTINCT on NULLs returns zero + */ +case class GpuHyperLogLogPlusPlus(childExpr: Expression, relativeSD: Double) + extends GpuAggregateFunction with Serializable { + + // Consistent with Spark + private lazy val precision: Int = + Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt; + + private lazy val numRegistersPerSketch: Int = 1 << precision; + + // Each long contains 10 register(max 6 bits) + private lazy val numWords = numRegistersPerSketch / 10 + 1 + + // Spark agg buffer type: long array + private lazy val sparkAggBufferAttributes: Seq[AttributeReference] = { + Seq.tabulate(numWords) { i => + AttributeReference(s"MS[$i]", LongType)() + } + } + + /** + * Spark uses long columns to save agg buffer, e.g.: long[52] + * Each long compacts multiple registers to save memory + */ + override val aggBufferAttributes: Seq[AttributeReference] = sparkAggBufferAttributes + + /** + * init long array with all zero + */ + override lazy val initialValues: Seq[Expression] = Seq.tabulate(numWords) { _ => + GpuLiteral(0L, LongType) + } + + override lazy val inputProjection: Seq[Expression] = Seq(childExpr) + + /** + * cuDF HLLPP sketch type: struct + */ + private lazy val cuDFBufferType: DataType = StructType.fromAttributes(aggBufferAttributes) + + /** + * cuDF uses Struct column to do aggregate + */ + override lazy val updateAggregates: Seq[CudfAggregate] = + Seq(CudfHLLPP(cuDFBufferType, precision)) + + /** + * Convert long columns to Struct column + */ + private def genStruct: Seq[Expression] = { + val names = Seq.tabulate(numWords) { i => GpuLiteral(s"MS[$i]", StringType) } + Seq(GpuCreateNamedStruct(names.zip(aggBufferAttributes).flatten { case (a, b) => List(a, b) })) + } + + /** + * Convert Struct column to long columns + */ + override lazy val postUpdate: Seq[Expression] = Seq.tabulate(numWords) { + i => GpuGetStructField(postUpdateAttr.head, i) + } + + /** + * convert to Struct + */ + override lazy val preMerge: Seq[Expression] = genStruct + + override lazy val mergeAggregates: Seq[CudfAggregate] = + Seq(CudfMergeHLLPP(cuDFBufferType, precision)) + + /** + * Convert Struct column to long columns + */ + override lazy val postMerge: Seq[Expression] = Seq.tabulate(numWords) { + i => GpuGetStructField(postMergeAttr.head, i) + } + + override lazy val evaluateExpression: Expression = + GpuHyperLogLogPlusPlusEvaluation(genStruct.head, precision) + + override def dataType: DataType = LongType + + // Spark APPROX_COUNT_DISTINCT on NULLs returns zero + override def nullable: Boolean = false + + override def prettyName: String = "approx_count_distinct" + + override def children: Seq[Expression] = Seq(childExpr) +} diff --git a/tools/generated_files/330/operatorsScore.csv b/tools/generated_files/330/operatorsScore.csv index e86e30e606c..6707e7a0c00 100644 --- a/tools/generated_files/330/operatorsScore.csv +++ b/tools/generated_files/330/operatorsScore.csv @@ -139,6 +139,7 @@ HiveGenericUDF,4 HiveHash,4 HiveSimpleUDF,4 Hour,4 +HyperLogLogPlusPlus,4 Hypot,4 If,4 In,4 diff --git a/tools/generated_files/330/supportedExprs.csv b/tools/generated_files/330/supportedExprs.csv index aa744638220..9d0268b7f33 100644 --- a/tools/generated_files/330/supportedExprs.csv +++ b/tools/generated_files/330/supportedExprs.csv @@ -713,6 +713,10 @@ First,S,`first_value`; `first`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,N First,S,`first_value`; `first`,None,reduction,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS First,S,`first_value`; `first`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS First,S,`first_value`; `first`,None,window,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS +HyperLogLogPlusPlus,S,`approx_count_distinct`,None,aggregation,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,NS,NS,NS,NS,NS,NS +HyperLogLogPlusPlus,S,`approx_count_distinct`,None,aggregation,result,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +HyperLogLogPlusPlus,S,`approx_count_distinct`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,NS,NS,NS,NS,NS,NS +HyperLogLogPlusPlus,S,`approx_count_distinct`,None,reduction,result,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Last,S,`last_value`; `last`,None,aggregation,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS Last,S,`last_value`; `last`,None,aggregation,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS Last,S,`last_value`; `last`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS