Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Spark 3.4: Adapt to hash function under clickhouse-core
Browse files Browse the repository at this point in the history
Yxang committed Jul 25, 2023

Verified

This commit was signed with the committer’s verified signature.
fmalcher Ferdinand Malcher
1 parent 85a025f commit 4e201d6
Showing 16 changed files with 60 additions and 493 deletions.
Original file line number Diff line number Diff line change
@@ -15,8 +15,12 @@
package org.apache.spark.sql.clickhouse.cluster

import org.apache.spark.sql.clickhouse.TestUtils.om
import xenon.clickhouse.func.{CompositeFunctionRegistry, DynamicFunctionRegistry, StaticFunctionRegistry}
import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64Shard
import xenon.clickhouse.func.{
ClickHouseXxHash64Shard,
CompositeFunctionRegistry,
DynamicFunctionRegistry,
StaticFunctionRegistry
}

import java.lang.{Long => JLong}

@@ -30,15 +34,6 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest {
new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry))
}

def product[A](xs: Seq[Seq[A]]): Seq[Seq[A]] =
xs.toList match {
case Nil => Seq(Seq())
case head :: tail => for {
h <- head
t <- product(tail)
} yield h +: t
}

def runTest(funcSparkName: String, funcCkName: String, stringVal: String): Unit = {
val sparkResult = spark.sql(
s"""SELECT
Original file line number Diff line number Diff line change
@@ -26,8 +26,7 @@ import xenon.clickhouse.Constants._
import xenon.clickhouse.client.NodeClient
import xenon.clickhouse.exception.CHClientException
import xenon.clickhouse.exception.ClickHouseErrCode._
import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64Shard
import xenon.clickhouse.func.{FunctionRegistry, _}
import xenon.clickhouse.func.{ClickHouseXxHash64Shard, FunctionRegistry, _}
import xenon.clickhouse.spec._

import java.time.ZoneId
Original file line number Diff line number Diff line change
@@ -12,29 +12,15 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import io.netty.buffer.{ByteBuf, Unpooled}
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.MultiArgsHash
import xenon.clickhouse.func.clickhouse.cityhash.{CityHash_v1_0_2, UInt128}
import xenon.clickhouse.hash

object CityHash64 extends MultiArgsHash {
object CityHash64 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L694

override protected def funcName: String = "clickhouse_cityHash64"
override val ckFuncNames: Array[String] = Array("cityHash64")

def convertToByteBuf(array: Array[Byte]): ByteBuf = {
val byteBuf = Unpooled.buffer(array.length).writeBytes(array)
byteBuf
}

override def invokeBase(value: UTF8String): Long = {
// ignore UInt64 vs Int64
val data = value.getBytes
CityHash_v1_0_2.CityHash64(convertToByteBuf(data), 0, data.length)
}

override def combineHashes(v1: Long, v2: Long): Long = CityHash_v1_0_2.Hash128to64(new UInt128(v1, v2))
override def applyHash(input: Array[Any]): Long = hash.CityHash64(input)
}
Original file line number Diff line number Diff line change
@@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.time.LocalDate
import java.time.format.DateTimeFormatter
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import xenon.clickhouse.func.clickhouse._

import scala.collection.mutable

Original file line number Diff line number Diff line change
@@ -12,13 +12,12 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.sql.{Date, Timestamp}
import java.sql.Timestamp
import java.text.SimpleDateFormat

object Hours extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction {
Original file line number Diff line number Diff line change
@@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

object Mod extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction {

Original file line number Diff line number Diff line change
@@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.time.LocalDate
import java.time.format.DateTimeFormatter
Original file line number Diff line number Diff line change
@@ -19,32 +19,41 @@ import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFu
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

abstract class MultiArgsHash extends UnboundFunction with ClickhouseEquivFunction {
abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivFunction {

def applyHash(input: Array[Any]): Long

protected def funcName: String

override val ckFuncNames: Array[String]

override def description: String = s"$name: (value: string, ...) => hash_value: long"

private def isExceptedType(dt: DataType): Boolean =
dt.isInstanceOf[StringType]

final override def name: String = funcName

final override def bind(inputType: StructType): BoundFunction = {
val inputDataTypes = inputType.fields.map(_.dataType)
if (inputDataTypes.forall(isExceptedType)) new ScalarFunction[Long] {
override def inputTypes(): Array[DataType] = inputDataTypes
override def name: String = funcName
override def canonicalName: String = s"clickhouse.$name"
override def resultType: DataType = LongType
override def toString: String = name
override def produceResult(input: InternalRow): Long = {
val inputStrings: Seq[UTF8String] =
input.toSeq(Seq.fill(input.numFields)(StringType)).asInstanceOf[Seq[UTF8String]]
inputStrings.map(invokeBase).reduce(combineHashes)
if (inputDataTypes.forall(isExceptedType)) {
// need to new a ScalarFunction instance for each bind,
// because we do not know the number of arguments in advance
new ScalarFunction[Long] {
override def inputTypes(): Array[DataType] = inputDataTypes
override def name: String = funcName
override def canonicalName: String = s"clickhouse.$name"
override def resultType: DataType = LongType
override def toString: String = name
override def produceResult(input: InternalRow): Long = {
val inputStrings: Array[Any] =
input.toSeq(Seq.fill(input.numFields)(StringType)).asInstanceOf[Seq[UTF8String]].toArray
.map(_.getBytes)
applyHash(inputStrings)
}
}
}
else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description")
} else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description")

}

protected def funcName: String
override val ckFuncNames: Array[String]
override def description: String = s"$name: (value: string, ...) => hash_value: long"
def invokeBase(value: UTF8String): Long
def combineHashes(v1: Long, v2: Long): Long
}
Original file line number Diff line number Diff line change
@@ -12,40 +12,25 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.commons.codec.digest.{MurmurHash2, MurmurHash3}
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.{ClickhouseEquivFunction, MultiArgsHash, Util}
import xenon.clickhouse.hash
import xenon.clickhouse.hash.HashUtils

object MurmurHash2_64 extends MultiArgsHash {
object MurmurHash2_64 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L460

override protected def funcName: String = "clickhouse_murmurHash2_64"
override val ckFuncNames: Array[String] = Array("murmurHash2_64")

override def invokeBase(value: UTF8String): Long = {
// ignore UInt64 vs Int64
val data = value.getBytes
MurmurHash2.hash64(data, data.length, 0)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.intHash64Impl(v1) ^ v2
override def applyHash(input: Array[Any]): Long = hash.Murmurhash2_64(input)
}

object MurmurHash2_32 extends MultiArgsHash {
object MurmurHash2_32 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519

override protected def funcName: String = "clickhouse_murmurHash2_32"
override val ckFuncNames: Array[String] = Array("murmurHash2_32")

override def invokeBase(value: UTF8String): Long = {
val data = value.getBytes
val v = MurmurHash2.hash32(data, data.length, 0)
Util.toUInt32Range(v)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.toUInt32Range(Util.int32Impl(v1) ^ v2)
override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash2_32(input))
}
Original file line number Diff line number Diff line change
@@ -12,41 +12,25 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.commons.codec.digest.MurmurHash3
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.{ClickhouseEquivFunction, MultiArgsHash, Util}
import xenon.clickhouse.hash
import xenon.clickhouse.hash.HashUtils

object MurmurHash3_64 extends MultiArgsHash {
object MurmurHash3_64 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L543

override protected def funcName: String = "clickhouse_murmurHash3_64"
override val ckFuncNames: Array[String] = Array("murmurHash3_64")

override def invokeBase(value: UTF8String): Long = {
// ignore UInt64 vs Int64
val data = value.getBytes
val hashes = MurmurHash3.hash128x64(data, 0, data.length, 0)
hashes(0) ^ hashes(1)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.intHash64Impl(v1) ^ v2
override def applyHash(input: Array[Any]): Long = hash.Murmurhash3_64(input)
}

object MurmurHash3_32 extends MultiArgsHash {
object MurmurHash3_32 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519

override protected def funcName: String = "clickhouse_murmurHash3_32"
override val ckFuncNames: Array[String] = Array("murmurHash3_32")

override def invokeBase(value: UTF8String): Long = {
val data = value.getBytes
val v = MurmurHash3.hash32x86(data, 0, data.length, 0)
Util.toUInt32Range(v)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.toUInt32Range(Util.int32Impl(v1) ^ v2)
override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash3_32(input))
}
Original file line number Diff line number Diff line change
@@ -12,13 +12,12 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.catalyst.expressions.XxHash64Function
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.ClickhouseEquivFunction
import xenon.clickhouse.spec.{ClusterSpec, ShardUtils}

/**
Original file line number Diff line number Diff line change
@@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.time.LocalDate
import java.time.format.DateTimeFormatter

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -17,13 +17,7 @@ package org.apache.spark.sql.clickhouse
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.scalatest.funsuite.AnyFunSuite
import xenon.clickhouse.ClickHouseHelper
import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64
import xenon.clickhouse.func.{
ClickhouseEquivFunction,
CompositeFunctionRegistry,
DynamicFunctionRegistry,
StaticFunctionRegistry
}
import xenon.clickhouse.func.{ClickHouseXxHash64, ClickhouseEquivFunction, CompositeFunctionRegistry, DynamicFunctionRegistry, StaticFunctionRegistry}

import scala.collection.JavaConverters._

0 comments on commit 4e201d6

Please sign in to comment.