diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 61c45ab2..e8024b45 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -26,22 +26,4 @@ jobs:
restore-keys: ${{ runner.os }}-m2
- name: Build with Maven
run: ./mvnw -B package --file pom.xml -Pscala-2.12
- build-scala-11:
- runs-on: ubuntu-latest
-
- steps:
- - uses: actions/checkout@v2
- - name: Set up JDK 8
- uses: actions/setup-java@v1
- with:
- java-version: 8
- - name: Cache Maven packages
- uses: actions/cache@v2
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2
- - name: Build with Maven
- run: ./mvnw -B package --file pom.xml -Pscala-2.11
-
# vim: ts=2:sts=2:sw=2:expandtab
diff --git a/README.md b/README.md
index ab634c43..3f518316 100644
--- a/README.md
+++ b/README.md
@@ -27,7 +27,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
- [Code of Conduct](#code-of-conduct)
- [License](#license)
-## Supported versions of Apache Spark
+## Supported versions of Apache Spark #TODO
| Apache Spark | Scala | Kotlin for Apache Spark |
|:------------:|:-----:|:-------------------------------:|
diff --git a/core/2.4/pom_2.11.xml b/core/2.4/pom_2.11.xml
deleted file mode 100644
index afab252a..00000000
--- a/core/2.4/pom_2.11.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-
-
- 4.0.0
-
- Kotlin Spark API: Scala core for Spark 2.4+ (Scala 2.11)
- core-2.4_2.11
- Scala-Spark 2.4+ compatibility layer for Kotlin for Apache Spark
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-parent_2.11
- 1.0.3-SNAPSHOT
- ../../pom_2.11.xml
-
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
-
-
- org.apache.spark
- spark-sql_${scala.compat.version}
- ${spark2-scala-2.11.version}
- provided
-
-
-
-
- src/main/scala
- src/test/scala
- target/${scala.compat.version}
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
- compile
-
- compile
- testCompile
-
-
-
- -dependencyfile
- ${project.build.directory}/.scala_dependencies
-
-
-
-
- docjar
-
- doc-jar
-
- pre-integration-test
-
-
-
-
- org.apache.maven.plugins
- maven-site-plugin
-
- true
-
-
-
-
-
diff --git a/core/2.4/pom_2.12.xml b/core/2.4/pom_2.12.xml
deleted file mode 100644
index 5c09d151..00000000
--- a/core/2.4/pom_2.12.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-
-
- 4.0.0
-
- Kotlin Spark API: Scala core for Spark 2.4+ (Scala 2.12)
- core-2.4_2.12
- Scala-Spark 2.4+ compatibility layer for Kotlin for Apache Spark
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-parent_2.12
- 1.0.3-SNAPSHOT
- ../../pom_2.12.xml
-
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
-
-
- org.apache.spark
- spark-sql_${scala.compat.version}
- ${spark2-scala-2.12.version}
- provided
-
-
-
-
- src/main/scala
- src/test/scala
- target/${scala.compat.version}
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
- compile
-
- compile
- testCompile
-
-
-
- -dependencyfile
- ${project.build.directory}/.scala_dependencies
-
-
-
-
- docjar
-
- doc-jar
-
- pre-integration-test
-
-
-
-
- org.apache.maven.plugins
- maven-site-plugin
-
- true
-
-
-
-
-
diff --git a/core/2.4/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala b/core/2.4/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala
deleted file mode 100644
index 7f0e6c87..00000000
--- a/core/2.4/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*-
- * =LICENSE=
- * Kotlin Spark API: Examples
- * ----------
- * Copyright (C) 2019 - 2020 JetBrains
- * ----------
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * =LICENSEEND=
- */
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
-import org.apache.spark.sql.types.{DataType, Metadata, StructField, StructType}
-
-
-trait DataTypeWithClass {
- val dt: DataType
- val cls: Class[_]
- val nullable: Boolean
-}
-
-trait ComplexWrapper extends DataTypeWithClass
-
-class KDataTypeWrapper(val dt: StructType
- , val cls: Class[_]
- , val nullable: Boolean = true) extends StructType with ComplexWrapper {
- override def fieldNames: Array[String] = dt.fieldNames
-
- override def names: Array[String] = dt.names
-
- override def equals(that: Any): Boolean = dt.equals(that)
-
- override def hashCode(): Int = dt.hashCode()
-
- override def add(field: StructField): StructType = dt.add(field)
-
- override def add(name: String, dataType: DataType): StructType = dt.add(name, dataType)
-
- override def add(name: String, dataType: DataType, nullable: Boolean): StructType = dt.add(name, dataType, nullable)
-
- override def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType = dt.add(name, dataType, nullable, metadata)
-
- override def add(name: String, dataType: DataType, nullable: Boolean, comment: String): StructType = dt.add(name, dataType, nullable, comment)
-
- override def add(name: String, dataType: String): StructType = dt.add(name, dataType)
-
- override def add(name: String, dataType: String, nullable: Boolean): StructType = dt.add(name, dataType, nullable)
-
- override def add(name: String, dataType: String, nullable: Boolean, metadata: Metadata): StructType = dt.add(name, dataType, nullable, metadata)
-
- override def add(name: String, dataType: String, nullable: Boolean, comment: String): StructType = dt.add(name, dataType, nullable, comment)
-
- override def apply(name: String): StructField = dt.apply(name)
-
- override def apply(names: Set[String]): StructType = dt.apply(names)
-
- override def fieldIndex(name: String): Int = dt.fieldIndex(name)
-
- override private[sql] def getFieldIndex(name: String) = dt.getFieldIndex(name)
-
- override protected[sql] def toAttributes: Seq[AttributeReference] = dt.toAttributes
-
- override def treeString: String = dt.treeString
-
- override def printTreeString(): Unit = dt.printTreeString()
-
- override private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = dt.buildFormattedString(prefix, builder)
-
- private[sql] override def jsonValue = dt.jsonValue
-
- override def apply(fieldIndex: Int): StructField = dt.apply(fieldIndex)
-
- override def length: Int = dt.length
-
- override def iterator: Iterator[StructField] = dt.iterator
-
- override def defaultSize: Int = dt.defaultSize
-
- override def simpleString: String = dt.simpleString
-
- override def catalogString: String = dt.catalogString
-
- override def sql: String = dt.sql
-
- override def toDDL: String = dt.toDDL
-
- private[sql] override def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
-
- override private[sql] def merge(that: StructType) = dt.merge(that)
-
- private[spark] override def asNullable = dt.asNullable
-
- private[spark] override def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
-
- override private[sql] lazy val interpretedOrdering = dt.interpretedOrdering
-}
-
-case class KComplexTypeWrapper(dt: DataType, cls: Class[_], nullable: Boolean) extends DataType with ComplexWrapper {
- override private[sql] def unapply(e: Expression) = dt.unapply(e)
-
- override def typeName: String = dt.typeName
-
- override private[sql] def jsonValue = dt.jsonValue
-
- override def json: String = dt.json
-
- override def prettyJson: String = dt.prettyJson
-
- override def simpleString: String = dt.simpleString
-
- override def catalogString: String = dt.catalogString
-
- override private[sql] def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
-
- override def sql: String = dt.sql
-
- override private[spark] def sameType(other: DataType) = dt.sameType(other)
-
- override private[spark] def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
-
- private[sql] override def defaultConcreteType = dt.defaultConcreteType
-
- private[sql] override def acceptsType(other: DataType) = dt.acceptsType(other)
-
- override def defaultSize: Int = dt.defaultSize
-
- override private[spark] def asNullable = dt.asNullable
-
-}
-
-case class KSimpleTypeWrapper(dt: DataType, cls: Class[_], nullable: Boolean) extends DataType with DataTypeWithClass {
- override private[sql] def unapply(e: Expression) = dt.unapply(e)
-
- override def typeName: String = dt.typeName
-
- override private[sql] def jsonValue = dt.jsonValue
-
- override def json: String = dt.json
-
- override def prettyJson: String = dt.prettyJson
-
- override def simpleString: String = dt.simpleString
-
- override def catalogString: String = dt.catalogString
-
- override private[sql] def simpleString(maxNumberFields: Int) = dt.simpleString(maxNumberFields)
-
- override def sql: String = dt.sql
-
- override private[spark] def sameType(other: DataType) = dt.sameType(other)
-
- override private[spark] def existsRecursively(f: DataType => Boolean) = dt.existsRecursively(f)
-
- private[sql] override def defaultConcreteType = dt.defaultConcreteType
-
- private[sql] override def acceptsType(other: DataType) = dt.acceptsType(other)
-
- override def defaultSize: Int = dt.defaultSize
-
- override private[spark] def asNullable = dt.asNullable
-}
-
-class KStructField(val getterName: String, val delegate: StructField) extends StructField {
- override private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = delegate.buildFormattedString(prefix, builder)
-
- override def toString(): String = f"KStructField(${delegate.toString()})"
-
- override private[sql] def jsonValue = delegate.jsonValue
-
- override def withComment(comment: String): StructField = delegate.withComment(comment)
-
- override def getComment(): Option[String] = delegate.getComment()
-
- override def toDDL: String = delegate.toDDL
-
- override def productElement(n: Int): Any = delegate.productElement(n)
-
- override def productArity: Int = delegate.productArity
-
- override def productIterator: Iterator[Any] = delegate.productIterator
-
- override def productPrefix: String = delegate.productPrefix
-
- override def canEqual(that: Any): Boolean = delegate.canEqual(that)
-
- override val dataType: DataType = delegate.dataType
- override val metadata: Metadata = delegate.metadata
- override val nullable: Boolean = delegate.nullable
- override val name: String = delegate.name
-}
-
-object helpme {
-
- def listToSeq(i: java.util.List[_]): Seq[_] = Seq(i.toArray: _*)
-}
\ No newline at end of file
diff --git a/core/2.4/src/main/scala/org/apache/spark/sql/catalyst/KotlinReflection.scala b/core/2.4/src/main/scala/org/apache/spark/sql/catalyst/KotlinReflection.scala
deleted file mode 100644
index 89dc33ee..00000000
--- a/core/2.4/src/main/scala/org/apache/spark/sql/catalyst/KotlinReflection.scala
+++ /dev/null
@@ -1,702 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-
-import com.google.common.reflect.TypeToken
-import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.objects._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{ComplexWrapper, DataTypeWithClass, KDataTypeWrapper, KStructField}
-import org.apache.spark.unsafe.types.UTF8String
-
-import java.beans.{Introspector, PropertyDescriptor}
-import java.lang.reflect.Type
-import java.lang.{Iterable => JIterable}
-import java.time.LocalDate
-import java.util.{Iterator => JIterator, List => JList, Map => JMap}
-import scala.language.existentials
-
-/**
- * Type-inference utilities for POJOs and Java collections.
- */
-//noinspection UnstableApiUsage
-object KotlinReflection {
-
- private val iterableType = TypeToken.of(classOf[JIterable[_]])
- private val mapType = TypeToken.of(classOf[JMap[_, _]])
- private val listType = TypeToken.of(classOf[JList[_]])
- private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType
- private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType
- private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType
- private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType
-
- /**
- * Infers the corresponding SQL data type of a JavaBean class.
- *
- * @param beanClass Java type
- * @return (SQL data type, nullable)
- */
- def inferDataType(beanClass: Class[_]): (DataType, Boolean) = {
- inferDataType(TypeToken.of(beanClass))
- }
-
- /**
- * Infers the corresponding SQL data type of a Java type.
- *
- * @param beanType Java type
- * @return (SQL data type, nullable)
- */
- private[sql] def inferDataType(beanType: Type): (DataType, Boolean) = {
- inferDataType(TypeToken.of(beanType))
- }
-
- /**
- * Infers the corresponding SQL data type of a Java type.
- *
- * @param typeToken Java type
- * @return (SQL data type, nullable)
- */
- private def inferDataType(typeToken: TypeToken[_], seenTypeSet: Set[Class[_]] = Set.empty)
- : (DataType, Boolean) = {
- typeToken.getRawType match {
- case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
- (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
-
- case c: Class[_] if UDTRegistration.exists(c.getName) =>
- val udt = UDTRegistration.getUDTFor(c.getName).get.newInstance()
- .asInstanceOf[UserDefinedType[_ >: Null]]
- (udt, true)
-
- case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
- case c: Class[_] if c == classOf[Array[Byte]] => (BinaryType, true)
-
- case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
- case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
- case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
- case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
- case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
- case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
- case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
-
- case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
- case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
- case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
- case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
- case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
- case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
- case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
-
- case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType.SYSTEM_DEFAULT, true)
- case c: Class[_] if c == classOf[java.math.BigInteger] => (DecimalType.BigIntDecimal, true)
- case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
- case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
-
- case _ if typeToken.isArray =>
- val (dataType, nullable) = inferDataType(typeToken.getComponentType, seenTypeSet)
- (ArrayType(dataType, nullable), true)
-
- case _ if iterableType.isAssignableFrom(typeToken) =>
- val (dataType, nullable) = inferDataType(elementType(typeToken), seenTypeSet)
- (ArrayType(dataType, nullable), true)
-
- case _ if mapType.isAssignableFrom(typeToken) =>
- val (keyType, valueType) = mapKeyValueType(typeToken)
- val (keyDataType, _) = inferDataType(keyType, seenTypeSet)
- val (valueDataType, nullable) = inferDataType(valueType, seenTypeSet)
- (MapType(keyDataType, valueDataType, nullable), true)
-
- case other if other.isEnum =>
- (StringType, true)
-
- case other =>
- if (seenTypeSet.contains(other)) {
- throw new UnsupportedOperationException(
- "Cannot have circular references in bean class, but got the circular reference " +
- s"of class $other")
- }
-
- // TODO: we should only collect properties that have getter and setter. However, some tests
- // pass in scala case class as java bean class which doesn't have getter and setter.
- val properties = getJavaBeanReadableProperties(other)
- val fields = properties.map { property =>
- val returnType = typeToken.method(property.getReadMethod).getReturnType
- val (dataType, nullable) = inferDataType(returnType, seenTypeSet + other)
- new StructField(property.getName, dataType, nullable)
- }
- (new StructType(fields), true)
- }
- }
-
- def getJavaBeanReadableProperties(beanClass: Class[_]): Array[PropertyDescriptor] = {
- val beanInfo = Introspector.getBeanInfo(beanClass)
- beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
- .filterNot(_.getName == "declaringClass")
- .filter(_.getReadMethod != null)
- }
-
- private def getJavaBeanReadableAndWritableProperties(
- beanClass: Class[_]): Array[PropertyDescriptor] = {
- getJavaBeanReadableProperties(beanClass).filter(_.getWriteMethod != null)
- }
-
- private def elementType(typeToken: TypeToken[_]): TypeToken[_] = {
- val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JIterable[_]]]
- val iterableSuperType = typeToken2.getSupertype(classOf[JIterable[_]])
- val iteratorType = iterableSuperType.resolveType(iteratorReturnType)
- iteratorType.resolveType(nextReturnType)
- }
-
- private def mapKeyValueType(typeToken: TypeToken[_]): (TypeToken[_], TypeToken[_]) = {
- val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JMap[_, _]]]
- val mapSuperType = typeToken2.getSupertype(classOf[JMap[_, _]])
- val keyType = elementType(mapSuperType.resolveType(keySetReturnType))
- val valueType = elementType(mapSuperType.resolveType(valuesReturnType))
- keyType -> valueType
- }
-
- /**
- * Returns the Spark SQL DataType for a given java class. Where this is not an exact mapping
- * to a native type, an ObjectType is returned.
- *
- * Unlike `inferDataType`, this function doesn't do any massaging of types into the Spark SQL type
- * system. As a result, ObjectType will be returned for things like boxed Integers.
- */
- private def inferExternalType(cls: Class[_]): DataType = cls match {
- case c if c == java.lang.Boolean.TYPE => BooleanType
- case c if c == java.lang.Byte.TYPE => ByteType
- case c if c == java.lang.Short.TYPE => ShortType
- case c if c == java.lang.Integer.TYPE => IntegerType
- case c if c == java.lang.Long.TYPE => LongType
- case c if c == java.lang.Float.TYPE => FloatType
- case c if c == java.lang.Double.TYPE => DoubleType
- case c if c == classOf[Array[Byte]] => BinaryType
- case _ => ObjectType(cls)
- }
-
- /**
- * Returns an expression that can be used to deserialize an internal row to an object of java bean
- * `T` with a compatible schema. Fields of the row will be extracted using UnresolvedAttributes
- * of the same name as the constructor arguments. Nested classes will have their fields accessed
- * using UnresolvedExtractValue.
- */
- def deserializerFor(beanClass: Class[_], dt: DataTypeWithClass): Expression = {
- deserializerFor(TypeToken.of(beanClass), None, Some(dt))
- }
-
- private def deserializerFor(typeToken: TypeToken[_], path: Option[Expression], predefinedDt: Option[DataTypeWithClass] = None): Expression = {
- /** Returns the current path with a sub-field extracted. */
- def addToPath(part: String): Expression = path
- .map(p => UnresolvedExtractValue(p, expressions.Literal(part)))
- .getOrElse(UnresolvedAttribute(part))
-
- /** Returns the current path or `GetColumnByOrdinal`. */
- def getPath: Expression = path.getOrElse(GetColumnByOrdinal(0, inferDataType(typeToken)._1))
-
- typeToken.getRawType match {
- case c if !inferExternalType(c).isInstanceOf[ObjectType] => getPath
-
- case c if c == classOf[java.lang.Short] ||
- c == classOf[java.lang.Integer] ||
- c == classOf[java.lang.Long] ||
- c == classOf[java.lang.Double] ||
- c == classOf[java.lang.Float] ||
- c == classOf[java.lang.Byte] ||
- c == classOf[java.lang.Boolean] =>
- StaticInvoke(
- c,
- ObjectType(c),
- "valueOf",
- getPath :: Nil,
- returnNullable = false)
-
- case c if c == classOf[java.sql.Date] =>
- StaticInvoke(
- DateTimeUtils.getClass,
- ObjectType(c),
- "toJavaDate",
- getPath :: Nil,
- returnNullable = false)
-
- case c if c == classOf[java.sql.Timestamp] =>
- StaticInvoke(
- DateTimeUtils.getClass,
- ObjectType(c),
- "toJavaTimestamp",
- getPath :: Nil,
- returnNullable = false)
-
- case c if c == classOf[java.lang.String] =>
- Invoke(getPath, "toString", ObjectType(classOf[String]))
-
- case c if c == classOf[java.math.BigDecimal] =>
- Invoke(getPath, "toJavaBigDecimal", ObjectType(classOf[java.math.BigDecimal]))
-
- case c if c == classOf[java.time.LocalDate] =>
- StaticInvoke(
- KotlinReflection.getClass,
- ObjectType(classOf[java.time.LocalDate]),
- "daysToLocalDate",
- getPath :: Nil,
- returnNullable = false)
-
-
- case c if c.isArray =>
- val elementType = c.getComponentType
- val primitiveMethod = elementType match {
- case c if c == java.lang.Boolean.TYPE => Some("toBooleanArray")
- case c if c == java.lang.Byte.TYPE => Some("toByteArray")
- case c if c == java.lang.Short.TYPE => Some("toShortArray")
- case c if c == java.lang.Integer.TYPE => Some("toIntArray")
- case c if c == java.lang.Long.TYPE => Some("toLongArray")
- case c if c == java.lang.Float.TYPE => Some("toFloatArray")
- case c if c == java.lang.Double.TYPE => Some("toDoubleArray")
- case _ => None
- }
-
- val maybeType = predefinedDt.filter(_.dt.isInstanceOf[ArrayType]).map(_.dt.asInstanceOf[ArrayType].elementType)
- val reifiedElementType = maybeType match {
- case Some(dt: DataTypeWithClass) => dt.cls
- case _ => c.getComponentType
- }
- primitiveMethod.map { method =>
- Invoke(getPath, method, ObjectType(c))
- }.getOrElse {
- Invoke(
- MapObjects(
- p => {
- deserializerFor(TypeToken.of(reifiedElementType), Some(p), maybeType.filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper]))
- },
- getPath,
- maybeType.filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper].dt).getOrElse(inferDataType(reifiedElementType)._1)
- ),
- "array",
- ObjectType(c)
- )
- }
-
- case c if listType.isAssignableFrom(typeToken) && predefinedDt.isEmpty =>
- val et = elementType(typeToken)
- UnresolvedMapObjects(
- p => deserializerFor(et, Some(p)),
- getPath,
- customCollectionCls = Some(c))
-
- case _ if mapType.isAssignableFrom(typeToken) && predefinedDt.isEmpty =>
- val (keyType, valueType) = mapKeyValueType(typeToken)
- val keyDataType = inferDataType(keyType)._1
- val valueDataType = inferDataType(valueType)._1
-
- val keyData =
- Invoke(
- MapObjects(
- p => deserializerFor(keyType, Some(p)),
- Invoke(getPath, "keyArray", ArrayType(keyDataType)),
- keyDataType),
- "array",
- ObjectType(classOf[Array[Any]]))
-
- val valueData =
- Invoke(
- MapObjects(
- p => deserializerFor(valueType, Some(p)),
- Invoke(getPath, "valueArray", ArrayType(valueDataType)),
- valueDataType),
- "array",
- ObjectType(classOf[Array[Any]]))
-
- StaticInvoke(
- ArrayBasedMapData.getClass,
- ObjectType(classOf[JMap[_, _]]),
- "toJavaMap",
- keyData :: valueData :: Nil,
- returnNullable = false)
-
- case other if other.isEnum =>
- StaticInvoke(
- other,
- ObjectType(other),
- "valueOf",
- Invoke(getPath, "toString", ObjectType(classOf[String]), returnNullable = false) :: Nil,
- returnNullable = false)
-
- case _ if predefinedDt.isDefined =>
- predefinedDt.get match {
- case wrapper: KDataTypeWrapper =>
- val structType = wrapper.dt
- val cls = wrapper.cls
- val arguments: Seq[Expression] = structType
- .fields
- .map(field => {
- val dataType = field.asInstanceOf[KStructField].delegate.dataType.asInstanceOf[DataTypeWithClass]
- val nullable = dataType.nullable
- val fieldCls = dataType.cls
- val clsName = fieldCls.getName
- val fieldName = field.asInstanceOf[KStructField].delegate.name
- val newPath = addToPath(fieldName)
- deserializerFor(TypeToken.of(fieldCls), Some(newPath), Some(dataType).filter(_.isInstanceOf[ComplexWrapper]))
-
- })
- val newInstance = NewInstance(cls, arguments, ObjectType(cls), propagateNull = false)
-
- if (path.nonEmpty) {
- expressions.If(
- IsNull(getPath),
- expressions.Literal.create(null, ObjectType(cls)),
- newInstance
- )
- } else {
- newInstance
- }
-
- case t: ComplexWrapper =>
- t.dt match {
- case MapType(kt, vt, _) =>
- val Seq(keyType, valueType) = Seq(kt, vt).map(_.asInstanceOf[DataTypeWithClass].cls).map(TypeToken.of(_))
- val Seq(keyDT, valueDT) = Seq(kt, vt).map(_.asInstanceOf[DataTypeWithClass])
- val keyData =
- Invoke(
- MapObjects(
- p => deserializerFor(keyType, Some(p), Some(keyDT.dt).filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper])),
- Invoke(getPath, "keyArray", ArrayType(keyDT.dt, keyDT.nullable)),
- keyDT.dt),
- "array",
- ObjectType(classOf[Array[Any]]))
-
- val valueData =
- Invoke(
- MapObjects(
- p => deserializerFor(valueType, Some(p), Some(valueDT.dt).filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper])),
- Invoke(getPath, "valueArray", ArrayType(valueDT.dt, containsNull = valueDT.nullable)),
- valueDT.dt),
- "array",
- ObjectType(classOf[Array[Any]]))
-
- StaticInvoke(
- ArrayBasedMapData.getClass,
- ObjectType(classOf[JMap[_, _]]),
- "toJavaMap",
- keyData :: valueData :: Nil,
- returnNullable = false)
-
-
- case ArrayType(elementType, containsNull) =>
- val dt = elementType.asInstanceOf[DataTypeWithClass]
- val et = TypeToken.of(dt.cls)
- UnresolvedMapObjects(
- p => deserializerFor(et, Some(p), Some(dt).filter(_.isInstanceOf[ComplexWrapper])),
- getPath,
- customCollectionCls = Some(predefinedDt.get.cls))
-
- case StructType(elementType: Array[StructField]) =>
- val cls = t.cls
-
- val arguments = elementType.map { field =>
- val dataType = field.dataType.asInstanceOf[DataTypeWithClass]
- val nullable = dataType.nullable
- val clsName = dataType.cls.getName
- val fieldName = field.asInstanceOf[KStructField].delegate.name
- val newPath = addToPath(fieldName)
-
- deserializerFor(
- TypeToken.of(dataType.cls),
- Some(newPath),
- Some(dataType).filter(_.isInstanceOf[ComplexWrapper])
- )
- }
- val newInstance = NewInstance(cls, arguments, ObjectType(cls), propagateNull = false)
-
-
- if (path.nonEmpty) {
- expressions.If(
- IsNull(getPath),
- expressions.Literal.create(null, ObjectType(cls)),
- newInstance
- )
- } else {
- newInstance
- }
-
- case _ =>
- throw new UnsupportedOperationException(
- s"No Encoder found for $typeToken in deserializerFor\n" + path)
- }
- }
-
-
- case other =>
- val properties = getJavaBeanReadableAndWritableProperties(other)
- val setters = properties.map { p =>
- val fieldName = p.getName
- val fieldType = typeToken.method(p.getReadMethod).getReturnType
- val (_, nullable) = inferDataType(fieldType)
- val constructor = deserializerFor(fieldType, Some(addToPath(fieldName)))
- val setter = if (nullable) {
- constructor
- } else {
- AssertNotNull(constructor, Seq("currently no type path record in java"))
- }
- p.getWriteMethod.getName -> setter
- }.toMap
-
- val newInstance = NewInstance(other, Nil, ObjectType(other), propagateNull = false)
- val result = InitializeJavaBean(newInstance, setters)
-
- if (path.nonEmpty) {
- expressions.If(
- IsNull(getPath),
- expressions.Literal.create(null, ObjectType(other)),
- result
- )
- } else {
- result
- }
- }
- }
-
- def deserializerForWithNullSafetyAndUpcast(
- expr: Expression,
- dataType: DataType,
- nullable: Boolean,
- funcForCreatingDeserializer: (Expression) => Expression): Expression = {
- expressionWithNullSafety(funcForCreatingDeserializer(expr), nullable)
- }
-
- def expressionWithNullSafety(
- expr: Expression,
- nullable: Boolean): Expression = {
- if (nullable) {
- expr
- } else {
- AssertNotNull(expr)
- }
- }
-
-
- /**
- * Returns an expression for serializing an object of the given type to an internal row.
- */
- def serializerFor(beanClass: Class[_], dt: DataTypeWithClass): CreateNamedStruct = {
- val inputObject = BoundReference(0, ObjectType(beanClass), nullable = true)
- val nullSafeInput = AssertNotNull(inputObject, Seq("top level input bean"))
- serializerFor(nullSafeInput, TypeToken.of(beanClass), Some(dt)) match {
- case expressions.If(_, _, s: CreateNamedStruct) => s
- case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil)
- }
- }
-
- private def serializerFor(inputObject: Expression, typeToken: TypeToken[_], optionalDt: Option[DataTypeWithClass] = None): Expression = {
-
- def toCatalystArray(input: Expression, elementType: TypeToken[_], predefinedDt: Option[DataTypeWithClass] = None): Expression = {
- val (dataType, nullable) = predefinedDt.map(x => (x.dt, x.nullable)).getOrElse(inferDataType(elementType))
- if (ScalaReflection.isNativeType(dataType)) {
- NewInstance(
- classOf[GenericArrayData],
- input :: Nil,
- dataType = ArrayType(dataType, nullable))
- } else {
- val next = predefinedDt.filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper])
- MapObjects(serializerFor(_, elementType, next), input, ObjectType(elementType.getRawType))
- }
- }
-
- if (!inputObject.dataType.isInstanceOf[ObjectType]) {
- inputObject
- } else {
- typeToken.getRawType match {
- case c if c == classOf[String] =>
- StaticInvoke(
- classOf[UTF8String],
- StringType,
- "fromString",
- inputObject :: Nil,
- returnNullable = false)
-
- case c if c == classOf[java.sql.Timestamp] =>
- StaticInvoke(
- DateTimeUtils.getClass,
- TimestampType,
- "fromJavaTimestamp",
- inputObject :: Nil,
- returnNullable = false)
-
- case c if c == classOf[java.sql.Date] =>
- StaticInvoke(
- DateTimeUtils.getClass,
- DateType,
- "fromJavaDate",
- inputObject :: Nil,
- returnNullable = false)
-
- case c if c == classOf[java.math.BigDecimal] =>
- StaticInvoke(
- Decimal.getClass,
- DecimalType.SYSTEM_DEFAULT,
- "apply",
- inputObject :: Nil,
- returnNullable = false)
-
- case c if c == classOf[java.lang.Boolean] =>
- Invoke(inputObject, "booleanValue", BooleanType)
- case c if c == classOf[java.lang.Byte] =>
- Invoke(inputObject, "byteValue", ByteType)
- case c if c == classOf[java.lang.Short] =>
- Invoke(inputObject, "shortValue", ShortType)
- case c if c == classOf[java.lang.Integer] =>
- Invoke(inputObject, "intValue", IntegerType)
- case c if c == classOf[java.lang.Long] =>
- Invoke(inputObject, "longValue", LongType)
- case c if c == classOf[java.lang.Float] =>
- Invoke(inputObject, "floatValue", FloatType)
- case c if c == classOf[java.lang.Double] =>
- Invoke(inputObject, "doubleValue", DoubleType)
-
- case c if c == classOf[LocalDate] =>
- StaticInvoke(
- KotlinReflection.getClass,
- DateType,
- "localDateToDays",
- inputObject :: Nil,
- returnNullable = false)
-
- case _ if typeToken.isArray && optionalDt.isEmpty =>
- toCatalystArray(inputObject, typeToken.getComponentType)
-
- case _ if listType.isAssignableFrom(typeToken) && optionalDt.isEmpty =>
- toCatalystArray(inputObject, elementType(typeToken))
-
- case _ if mapType.isAssignableFrom(typeToken) && optionalDt.isEmpty =>
- val (keyType, valueType) = mapKeyValueType(typeToken)
-
- ExternalMapToCatalyst(
- inputObject,
- ObjectType(keyType.getRawType),
- serializerFor(_, keyType),
- keyNullable = true,
- ObjectType(valueType.getRawType),
- serializerFor(_, valueType),
- valueNullable = true
- )
-
- case other if other.isEnum =>
- StaticInvoke(
- classOf[UTF8String],
- StringType,
- "fromString",
- Invoke(inputObject, "name", ObjectType(classOf[String]), returnNullable = false) :: Nil,
- returnNullable = false)
-
- case _ if optionalDt.isDefined =>
- optionalDt.get match {
- case dataType: KDataTypeWrapper =>
- val cls = dataType.cls
- val properties = getJavaBeanReadableProperties(cls)
- val structFields = dataType.dt.fields.map(_.asInstanceOf[KStructField])
- val fields = structFields.map { structField =>
- val maybeProp = properties.find(it => it.getReadMethod.getName == structField.getterName)
- if (maybeProp.isEmpty) throw new IllegalArgumentException(s"Field ${structField.name} is not found among available props, which are: ${properties.map(_.getName).mkString(", ")}")
- val fieldName = structField.delegate.name
- val propClass = structField.delegate.dataType.asInstanceOf[DataTypeWithClass].cls
- val propDt = structField.delegate.dataType.asInstanceOf[DataTypeWithClass]
- val fieldValue = Invoke(
- inputObject,
- maybeProp.get.getReadMethod.getName,
- inferExternalType(propClass))
-
- expressions.Literal(fieldName) :: serializerFor(fieldValue, TypeToken.of(propClass), propDt match { case c: ComplexWrapper => Some(c) case _ => None }) :: Nil
- }
- val nonNullOutput = CreateNamedStruct(fields.flatten.seq)
- val nullOutput = expressions.Literal.create(null, nonNullOutput.dataType)
- expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)
- case otherTypeWrapper: ComplexWrapper =>
- otherTypeWrapper.dt match {
- case MapType(kt, vt, _) =>
- val Seq(keyType, valueType) = Seq(kt, vt).map(_.asInstanceOf[DataTypeWithClass].cls).map(TypeToken.of(_))
- val Seq(keyDT, valueDT) = Seq(kt, vt).map(_.asInstanceOf[DataTypeWithClass])
- ExternalMapToCatalyst(
- inputObject,
- ObjectType(keyType.getRawType),
- serializerFor(_, keyType, keyDT match { case c: ComplexWrapper => Some(c) case _ => None }),
- keyNullable = true,
- ObjectType(valueType.getRawType),
- serializerFor(_, valueType, valueDT match { case c: ComplexWrapper => Some(c) case _ => None }),
- valueNullable = true
- )
- case ArrayType(elementType, _) =>
- toCatalystArray(inputObject, TypeToken.of(elementType.asInstanceOf[DataTypeWithClass].cls), Some(elementType.asInstanceOf[DataTypeWithClass]))
-
- case StructType(elementType: Array[StructField]) =>
- val cls = otherTypeWrapper.cls
- val names = elementType.map(_.name)
-
- val beanInfo = Introspector.getBeanInfo(cls)
- val methods = beanInfo.getMethodDescriptors.filter(it => names.contains(it.getName))
-
- val fields = elementType.map { structField =>
-
- val maybeProp = methods.find(it => it.getName == structField.name)
- if (maybeProp.isEmpty) throw new IllegalArgumentException(s"Field ${structField.name} is not found among available props, which are: ${methods.map(_.getName).mkString(", ")}")
- val fieldName = structField.name
- val propClass = structField.dataType.asInstanceOf[DataTypeWithClass].cls
- val propDt = structField.dataType.asInstanceOf[DataTypeWithClass]
- val fieldValue = Invoke(
- inputObject,
- maybeProp.get.getName,
- inferExternalType(propClass),
- returnNullable = propDt.nullable
- )
- expressions.Literal(fieldName) :: serializerFor(fieldValue, TypeToken.of(propClass), propDt match { case c: ComplexWrapper => Some(c) case _ => None }) :: Nil
- }
- val nonNullOutput = CreateNamedStruct(fields.flatten.seq)
- val nullOutput = expressions.Literal.create(null, nonNullOutput.dataType)
- expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)
-
- case _ =>
- throw new UnsupportedOperationException(s"No Encoder found for $typeToken in serializerFor. $otherTypeWrapper")
-
- }
-
- }
-
-
- case other =>
- val properties = getJavaBeanReadableAndWritableProperties(other)
- val nonNullOutput = CreateNamedStruct(properties.flatMap { p =>
- val fieldName = p.getName
- val fieldType = typeToken.method(p.getReadMethod).getReturnType
- val fieldValue = Invoke(
- inputObject,
- p.getReadMethod.getName,
- inferExternalType(fieldType.getRawType))
- expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType) :: Nil
- })
-
- val nullOutput = expressions.Literal.create(null, nonNullOutput.dataType)
- expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)
- }
- }
- }
-
- def localDateToDays(localDate: LocalDate): Int = {
- Math.toIntExact(localDate.toEpochDay)
- }
-
- def daysToLocalDate(days: Int): LocalDate = LocalDate.ofEpochDay(days)
-
-}
diff --git a/core/2.4/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala b/core/2.4/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
deleted file mode 100644
index 390dee73..00000000
--- a/core/2.4/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*-
- * =LICENSE=
- * Kotlin Spark API: Examples
- * ----------
- * Copyright (C) 2019 - 2020 JetBrains
- * ----------
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * =LICENSEEND=
- */
-package org.jetbrains.kotlinx.spark.extensions
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-
-import java.util
-import scala.collection.JavaConverters._
-
-object KSparkExtensions {
- def col(d: Dataset[_], name: String): Column = d.col(name)
-
- def col(name: String): Column = functions.col(name)
-
- def lit(literal: Any): Column = functions.lit(literal)
-
- def collectAsList[T](ds: Dataset[T]): util.List[T] = ds.collect().toSeq.asJava
-
-
- def debugCodegen(df: Dataset[_]): Unit = {
- import org.apache.spark.sql.execution.debug._
- df.debugCodegen()
- }
-
- def debug(df: Dataset[_]): Unit = {
- import org.apache.spark.sql.execution.debug._
- df.debug()
- }
-
- def sparkContext(s: SparkSession): SparkContext = s.sparkContext
-}
diff --git a/core/3.0/pom_2.12.xml b/core/3.2/pom_2.12.xml
similarity index 91%
rename from core/3.0/pom_2.12.xml
rename to core/3.2/pom_2.12.xml
index c3c2e972..16f77500 100644
--- a/core/3.0/pom_2.12.xml
+++ b/core/3.2/pom_2.12.xml
@@ -2,9 +2,9 @@
4.0.0
- Kotlin Spark API: Scala core for Spark 3.0+ (Scala 2.12)
- Scala-Spark 3.0+ compatibility layer for Kotlin for Apache Spark
- core-3.0_2.12
+ Kotlin Spark API: Scala core for Spark 3.2+ (Scala 2.12)
+ Scala-Spark 3.2+ compatibility layer for Kotlin for Apache Spark
+ core-3.2_2.12
org.jetbrains.kotlinx.spark
kotlin-spark-api-parent_2.12
@@ -39,7 +39,7 @@
net.alchim31.maven
scala-maven-plugin
- 4.4.0
+ ${scala-maven-plugin.version}
compile
diff --git a/core/3.0/src/main/scala/org/apache/spark/sql/KotlinReflection.scala b/core/3.2/src/main/scala/org/apache/spark/sql/KotlinReflection.scala
similarity index 100%
rename from core/3.0/src/main/scala/org/apache/spark/sql/KotlinReflection.scala
rename to core/3.2/src/main/scala/org/apache/spark/sql/KotlinReflection.scala
diff --git a/core/3.0/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala b/core/3.2/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala
similarity index 97%
rename from core/3.0/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala
rename to core/3.2/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala
index fb022d6f..675110be 100644
--- a/core/3.0/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala
+++ b/core/3.2/src/main/scala/org/apache/spark/sql/KotlinWrappers.scala
@@ -70,7 +70,7 @@ class KDataTypeWrapper(val dt: StructType
override private[sql] def getFieldIndex(name: String) = dt.getFieldIndex(name)
- override private[sql] def findNestedField(fieldNames: Seq[String], includeCollections: Boolean, resolver: Resolver) = dt.findNestedField(fieldNames, includeCollections, resolver)
+ private[sql] def findNestedField(fieldNames: Seq[String], includeCollections: Boolean, resolver: Resolver) = dt.findNestedField(fieldNames, includeCollections, resolver)
override private[sql] def buildFormattedString(prefix: String, stringConcat: StringUtils.StringConcat, maxDepth: Int): Unit = dt.buildFormattedString(prefix, stringConcat, maxDepth)
diff --git a/core/3.0/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/core/3.2/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
similarity index 100%
rename from core/3.0/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
rename to core/3.2/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
diff --git a/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala b/core/3.2/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
similarity index 100%
rename from core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
rename to core/3.2/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
diff --git a/dummy/pom.xml b/dummy/pom.xml
index 67861af7..41044133 100644
--- a/dummy/pom.xml
+++ b/dummy/pom.xml
@@ -11,27 +11,12 @@
Module to workaround https://issues.sonatype.org/browse/NEXUS-9138
dummy
-
- scala-2.11
-
-
- org.jetbrains.kotlinx.spark
- examples-2.4_2.11
- ${project.parent.version}
-
-
-
scala-2.12
org.jetbrains.kotlinx.spark
- examples-2.4_2.12
- ${project.parent.version}
-
-
- org.jetbrains.kotlinx.spark
- examples-3.0_2.12
+ examples-3.2_2.12
${project.parent.version}
diff --git a/examples/pom-2.4_2.11.xml b/examples/pom-2.4_2.11.xml
deleted file mode 100644
index 30a42d10..00000000
--- a/examples/pom-2.4_2.11.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-
-
-
- 4.0.0
-
- Kotlin Spark API: Examples for Spark 2.4+ (Scala 2.11)
- Example of usage
- examples-2.4_2.11
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-parent_2.11
- 1.0.3-SNAPSHOT
- ../pom_2.11.xml
-
-
-
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-2.4_${scala.compat.version}
- ${project.version}
-
-
- org.apache.spark
- spark-sql_${scala.compat.version}
- ${spark2-scala-2.11.version}
-
-
-
-
- src/main/kotlin
- src/test/kotlin
- target/2.4/${scala.compat.version}
-
-
- org.jetbrains.kotlin
- kotlin-maven-plugin
-
-
- org.apache.maven.plugins
- maven-assembly-plugin
- ${maven-assembly-plugin.version}
-
-
- jar-with-dependencies
-
-
-
- org.jetbrains.spark.api.examples.WordCountKt
-
-
-
-
-
- org.apache.maven.plugins
- maven-site-plugin
-
- true
-
-
-
- org.apache.maven.plugins
- maven-deploy-plugin
-
- true
-
-
-
- org.sonatype.plugins
- nexus-staging-maven-plugin
-
- true
-
-
-
-
-
diff --git a/examples/pom-2.4_2.12.xml b/examples/pom-2.4_2.12.xml
deleted file mode 100644
index 95045820..00000000
--- a/examples/pom-2.4_2.12.xml
+++ /dev/null
@@ -1,75 +0,0 @@
-
-
-
- 4.0.0
-
- Kotlin Spark API: Examples for Spark 2.4+ (Scala 2.12)
- Example of usage
- examples-2.4_2.12
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-parent_2.12
- 1.0.3-SNAPSHOT
- ../pom_2.12.xml
-
-
-
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-2.4_${scala.compat.version}
- ${project.version}
-
-
- org.apache.spark
- spark-sql_${scala.compat.version}
- ${spark2-scala-2.12.version}
-
-
-
-
- src/main/kotlin
- target/2.4/${scala.compat.version}
-
-
- org.jetbrains.kotlin
- kotlin-maven-plugin
-
-
- org.apache.maven.plugins
- maven-assembly-plugin
- ${maven-assembly-plugin.version}
-
-
- jar-with-dependencies
-
-
-
- org.jetbrains.spark.api.examples.WordCountKt
-
-
-
-
-
- org.apache.maven.plugins
- maven-site-plugin
-
- true
-
-
-
- org.apache.maven.plugins
- maven-deploy-plugin
-
- true
-
-
-
- org.sonatype.plugins
- nexus-staging-maven-plugin
-
- true
-
-
-
-
-
diff --git a/examples/pom-3.0_2.12.xml b/examples/pom-3.2_2.12.xml
similarity index 80%
rename from examples/pom-3.0_2.12.xml
rename to examples/pom-3.2_2.12.xml
index 866b53d9..668d6ced 100644
--- a/examples/pom-3.0_2.12.xml
+++ b/examples/pom-3.2_2.12.xml
@@ -3,9 +3,9 @@
4.0.0
- Kotlin Spark API: Examples for Spark 3.0+ (Scala 2.12)
+ Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12)
Example of usage
- examples-3.0_2.12
+ examples-3.2_2.12
org.jetbrains.kotlinx.spark
kotlin-spark-api-parent_2.12
@@ -16,7 +16,7 @@
org.jetbrains.kotlinx.spark
- kotlin-spark-api-3.0
+ kotlin-spark-api-3.2
${project.version}
@@ -29,11 +29,25 @@
src/main/kotlin
src/test/kotlin
- target/3.0/${scala.compat.version}
+ target/3.2/${scala.compat.version}
org.jetbrains.kotlin
kotlin-maven-plugin
+
+
+ compile
+
+ compile
+
+
+
+ test-compile
+
+ test-compile
+
+
+
org.apache.maven.plugins
diff --git a/kotlin-spark-api/2.4/pom_2.11.xml b/kotlin-spark-api/2.4/pom_2.11.xml
deleted file mode 100644
index c5a8f2bd..00000000
--- a/kotlin-spark-api/2.4/pom_2.11.xml
+++ /dev/null
@@ -1,120 +0,0 @@
-
-
-
- 4.0.0
-
- Kotlin Spark API: API for Spark 2.4+ (Scala 2.11)
- Kotlin API compatible with Spark 2.4+ Kotlin for Apache Spark
- kotlin-spark-api-2.4_2.11
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-parent_2.11
- 1.0.3-SNAPSHOT
- ../../pom_2.11.xml
-
- jar
-
-
-
- org.jetbrains.kotlin
- kotlin-stdlib-jdk8
-
-
- org.jetbrains.kotlin
- kotlin-reflect
-
-
- org.jetbrains.kotlinx.spark
- core-2.4_${scala.compat.version}
-
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-common
-
-
-
-
- org.apache.spark
- spark-sql_${scala.compat.version}
- ${spark2-scala-2.11.version}
- provided
-
-
-
-
- io.kotest
- kotest-runner-junit5-jvm
- ${kotest.version}
- test
-
-
- io.kotest.extensions
- kotest-extensions-allure
- ${kotest-extension-allure.version}
- test
-
-
- com.beust
- klaxon
- ${klaxon.version}
- test
-
-
- ch.tutteli.atrium
- atrium-fluent-en_GB
- ${atrium.version}
- test
-
-
-
-
- src/main/kotlin
- src/test/kotlin
- target/${scala.compat.version}
-
-
- org.jetbrains.kotlin
- kotlin-maven-plugin
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
-
- org.jetbrains.dokka
- dokka-maven-plugin
- ${dokka.version}
-
- 8
-
-
-
- dokka
-
- dokka
-
- pre-site
-
-
- javadocjar
-
- javadocJar
-
- pre-integration-test
-
-
-
-
- io.qameta.allure
- allure-maven
-
- ${project.basedir}/allure-results/${scala.compat.version}
-
-
-
- org.jacoco
- jacoco-maven-plugin
-
-
-
-
diff --git a/kotlin-spark-api/2.4/pom_2.12.xml b/kotlin-spark-api/2.4/pom_2.12.xml
deleted file mode 100644
index 66796d40..00000000
--- a/kotlin-spark-api/2.4/pom_2.12.xml
+++ /dev/null
@@ -1,120 +0,0 @@
-
-
-
- 4.0.0
-
- Kotlin Spark API: API for Spark 2.4+ (Scala 2.12)
- Kotlin API compatible with Spark 2.4+ Kotlin for Apache Spark
- kotlin-spark-api-2.4_2.12
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-parent_2.12
- 1.0.3-SNAPSHOT
- ../../pom_2.12.xml
-
- jar
-
-
-
- org.jetbrains.kotlin
- kotlin-stdlib-jdk8
-
-
- org.jetbrains.kotlin
- kotlin-reflect
-
-
- org.jetbrains.kotlinx.spark
- core-2.4_${scala.compat.version}
-
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-common
-
-
-
-
- org.apache.spark
- spark-sql_${scala.compat.version}
- ${spark2-scala-2.12.version}
- provided
-
-
-
-
- io.kotest
- kotest-runner-junit5-jvm
- ${kotest.version}
- test
-
-
- io.kotest.extensions
- kotest-extensions-allure
- ${kotest-extension-allure.version}
- test
-
-
- com.beust
- klaxon
- ${klaxon.version}
- test
-
-
- ch.tutteli.atrium
- atrium-fluent-en_GB
- ${atrium.version}
- test
-
-
-
-
- src/main/kotlin
- src/test/kotlin
- target/${scala.compat.version}
-
-
- org.jetbrains.kotlin
- kotlin-maven-plugin
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
-
- org.jetbrains.dokka
- dokka-maven-plugin
- ${dokka.version}
-
- 8
-
-
-
- dokka
-
- dokka
-
- pre-site
-
-
- javadocjar
-
- javadocJar
-
- pre-integration-test
-
-
-
-
- io.qameta.allure
- allure-maven
-
- ${project.basedir}/allure-results/${scala.compat.version}
-
-
-
- org.jacoco
- jacoco-maven-plugin
-
-
-
-
diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
deleted file mode 100644
index 15fcae93..00000000
--- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*-
- * =LICENSE=
- * Kotlin Spark API
- * ----------
- * Copyright (C) 2019 - 2020 JetBrains
- * ----------
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * =LICENSEEND=
- */
-@file:Suppress("HasPlatformType", "unused", "FunctionName")
-
-package org.jetbrains.kotlinx.spark.api
-
-import org.apache.spark.SparkContext
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.api.java.function.*
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.*
-import org.apache.spark.sql.Encoders.*
-import org.apache.spark.sql.catalyst.JavaTypeInference
-import org.apache.spark.sql.catalyst.KotlinReflection
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.streaming.GroupState
-import org.apache.spark.sql.streaming.GroupStateTimeout
-import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.*
-import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions
-import scala.Product
-import scala.Tuple2
-import scala.collection.Seq
-import scala.reflect.`ClassTag$`
-import java.beans.PropertyDescriptor
-import java.math.BigDecimal
-import java.sql.Date
-import java.sql.Timestamp
-import java.time.Instant
-import java.time.LocalDate
-import java.util.*
-import java.util.concurrent.ConcurrentHashMap
-import kotlin.Any
-import kotlin.Array
-import kotlin.Boolean
-import kotlin.BooleanArray
-import kotlin.Byte
-import kotlin.ByteArray
-import kotlin.Deprecated
-import kotlin.DeprecationLevel
-import kotlin.Double
-import kotlin.DoubleArray
-import kotlin.ExperimentalStdlibApi
-import kotlin.Float
-import kotlin.FloatArray
-import kotlin.IllegalArgumentException
-import kotlin.Int
-import kotlin.IntArray
-import kotlin.Long
-import kotlin.LongArray
-import kotlin.OptIn
-import kotlin.Pair
-import kotlin.ReplaceWith
-import kotlin.Short
-import kotlin.ShortArray
-import kotlin.String
-import kotlin.Suppress
-import kotlin.Triple
-import kotlin.Unit
-import kotlin.also
-import kotlin.apply
-import kotlin.invoke
-import kotlin.reflect.*
-import kotlin.reflect.full.findAnnotation
-import kotlin.reflect.full.isSubclassOf
-import kotlin.reflect.full.primaryConstructor
-import kotlin.to
-
-@JvmField
-val ENCODERS = mapOf, Encoder<*>>(
- Boolean::class to BOOLEAN(),
- Byte::class to BYTE(),
- Short::class to SHORT(),
- Int::class to INT(),
- Long::class to LONG(),
- Float::class to FLOAT(),
- Double::class to DOUBLE(),
- String::class to STRING(),
- BigDecimal::class to DECIMAL(),
- Date::class to DATE(),
- Timestamp::class to TIMESTAMP(),
- ByteArray::class to BINARY()
-)
-
-/**
- * Broadcast a read-only variable to the cluster, returning a
- * [org.apache.spark.broadcast.Broadcast] object for reading it in distributed functions.
- * The variable will be sent to each cluster only once.
- *
- * @param value value to broadcast to the Spark nodes
- * @return `Broadcast` object, a read-only variable cached on each machine
- */
-inline fun SparkSession.broadcast(value: T): Broadcast = try {
- sparkContext.broadcast(value, encoder().clsTag())
-} catch (e: ClassNotFoundException) {
- JavaSparkContext(sparkContext).broadcast(value)
-}
-
-/**
- * Broadcast a read-only variable to the cluster, returning a
- * [org.apache.spark.broadcast.Broadcast] object for reading it in distributed functions.
- * The variable will be sent to each cluster only once.
- *
- * @param value value to broadcast to the Spark nodes
- * @return `Broadcast` object, a read-only variable cached on each machine
- * @see broadcast
- */
-@Deprecated("You can now use `spark.broadcast()` instead.",
- ReplaceWith("spark.broadcast(value)"),
- DeprecationLevel.WARNING)
-inline fun SparkContext.broadcast(value: T): Broadcast = try {
- broadcast(value, encoder().clsTag())
-} catch (e: ClassNotFoundException) {
- JavaSparkContext(this).broadcast(value)
-}
-
-/**
- * Utility method to create dataset from list
- */
-inline fun SparkSession.toDS(list: List): Dataset =
- createDataset(list, encoder())
-
-/**
- * Utility method to create dataset from list
- */
-inline fun SparkSession.dsOf(vararg t: T): Dataset =
- createDataset(listOf(*t), encoder())
-
-/**
- * Utility method to create dataset from list
- */
-inline fun List.toDS(spark: SparkSession): Dataset =
- spark.createDataset(this, encoder())
-
-/**
- * Main method of API, which gives you seamless integration with Spark:
- * It creates encoder for any given supported type T
- *
- * Supported types are data classes, primitives, and Lists, Maps and Arrays containing them
- * @param T type, supported by Spark
- * @return generated encoder
- */
-@OptIn(ExperimentalStdlibApi::class)
-inline fun encoder(): Encoder = generateEncoder(typeOf(), T::class)
-
-fun generateEncoder(type: KType, cls: KClass<*>): Encoder {
- @Suppress("UNCHECKED_CAST")
- return when {
- isSupportedClass(cls) -> kotlinClassEncoder(memoizedSchema(type), cls)
- else -> ENCODERS[cls] as? Encoder? ?: bean(cls.java)
- } as Encoder
-}
-
-private fun isSupportedClass(cls: KClass<*>): Boolean = cls.isData
- || cls.isSubclassOf(Map::class)
- || cls.isSubclassOf(Iterable::class)
- || cls.isSubclassOf(Product::class)
- || cls.java.isArray
-
-@Suppress("UNCHECKED_CAST")
-private fun kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder {
- KotlinReflection.inferDataType(kClass.java)
- val serializer = if (schema is DataTypeWithClass) KotlinReflection.serializerFor(kClass.java,
- schema) else JavaTypeInference.serializerFor(kClass.java)
- return ExpressionEncoder(
- serializer.dataType(),
- false,
- serializer.flatten() as Seq,
- if (schema is DataTypeWithClass) KotlinReflection.deserializerFor(kClass.java,
- schema) else JavaTypeInference.deserializerFor(kClass.java),
- `ClassTag$`.`MODULE$`.apply(kClass.java)
- )
-}
-
-inline fun Dataset.map(noinline func: (T) -> R): Dataset =
- map(MapFunction(func), encoder())
-
-inline fun Dataset.flatMap(noinline func: (T) -> Iterator): Dataset =
- flatMap(func, encoder())
-
-inline fun > Dataset.flatten(): Dataset =
- flatMap(FlatMapFunction { it.iterator() }, encoder())
-
-inline fun Dataset.groupByKey(noinline func: (T) -> R): KeyValueGroupedDataset =
- groupByKey(MapFunction(func), encoder())
-
-inline fun Dataset.mapPartitions(noinline func: (Iterator) -> Iterator): Dataset =
- mapPartitions(func, encoder())
-
-fun Dataset.filterNotNull() = filter { it != null }
-
-inline fun KeyValueGroupedDataset.mapValues(noinline func: (VALUE) -> R): KeyValueGroupedDataset =
- mapValues(MapFunction(func), encoder())
-
-inline fun KeyValueGroupedDataset.mapGroups(noinline func: (KEY, Iterator) -> R): Dataset =
- mapGroups(MapGroupsFunction(func), encoder())
-
-inline fun KeyValueGroupedDataset.reduceGroupsK(noinline func: (VALUE, VALUE) -> VALUE): Dataset> =
- reduceGroups(ReduceFunction(func))
- .map { t -> t._1 to t._2 }
-
-/**
- * (Kotlin-specific)
- * Reduces the elements of this Dataset using the specified binary function. The given `func`
- * must be commutative and associative or the result may be non-deterministic.
- */
-inline fun Dataset.reduceK(noinline func: (T, T) -> T): T =
- reduce(ReduceFunction(func))
-
-@JvmName("takeKeysTuple2")
-inline fun Dataset>.takeKeys(): Dataset = map { it._1() }
-
-inline fun Dataset>.takeKeys(): Dataset = map { it.first }
-
-@JvmName("takeKeysArity2")
-inline fun Dataset>.takeKeys(): Dataset = map { it._1 }
-
-@JvmName("takeValuesTuple2")
-inline fun Dataset>.takeValues(): Dataset = map { it._2() }
-
-inline fun Dataset>.takeValues(): Dataset = map { it.second }
-
-@JvmName("takeValuesArity2")
-inline fun Dataset>.takeValues(): Dataset = map { it._2 }
-
-inline fun KeyValueGroupedDataset.flatMapGroups(
- noinline func: (key: K, values: Iterator) -> Iterator,
-): Dataset = flatMapGroups(
- FlatMapGroupsFunction(func),
- encoder()
-)
-
-fun GroupState.getOrNull(): S? = if (exists()) get() else null
-
-operator fun GroupState.getValue(thisRef: Any?, property: KProperty<*>): S? = getOrNull()
-operator fun GroupState.setValue(thisRef: Any?, property: KProperty<*>, value: S?): Unit = update(value)
-
-
-inline fun KeyValueGroupedDataset.mapGroupsWithState(
- noinline func: (key: K, values: Iterator, state: GroupState) -> U,
-): Dataset = mapGroupsWithState(
- MapGroupsWithStateFunction(func),
- encoder(),
- encoder()
-)
-
-inline fun KeyValueGroupedDataset.mapGroupsWithState(
- timeoutConf: GroupStateTimeout,
- noinline func: (key: K, values: Iterator, state: GroupState) -> U,
-): Dataset = mapGroupsWithState(
- MapGroupsWithStateFunction(func),
- encoder(),
- encoder(),
- timeoutConf
-)
-
-inline fun KeyValueGroupedDataset.flatMapGroupsWithState(
- outputMode: OutputMode,
- timeoutConf: GroupStateTimeout,
- noinline func: (key: K, values: Iterator, state: GroupState) -> Iterator,
-): Dataset = flatMapGroupsWithState(
- FlatMapGroupsWithStateFunction(func),
- outputMode,
- encoder(),
- encoder(),
- timeoutConf
-)
-
-inline fun KeyValueGroupedDataset.cogroup(
- other: KeyValueGroupedDataset,
- noinline func: (key: K, left: Iterator, right: Iterator) -> Iterator,
-): Dataset = cogroup(
- other,
- CoGroupFunction(func),
- encoder()
-)
-
-inline fun Dataset.downcast(): Dataset = `as`(encoder())
-inline fun Dataset<*>.`as`(): Dataset = `as`(encoder())
-inline fun Dataset<*>.to(): Dataset = `as`(encoder())
-
-inline fun Dataset.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func))
-
-inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit) =
- foreachPartition(ForeachPartitionFunction(func))
-
-/**
- * It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that
- */
-fun Dataset.debugCodegen() = also { KSparkExtensions.debugCodegen(it) }
-
-val SparkSession.sparkContext
- get() = KSparkExtensions.sparkContext(this)
-
-/**
- * It's hard to call `Dataset.debug` from kotlin, so here is utility for that
- */
-fun Dataset.debug() = also { KSparkExtensions.debug(it) }
-
-@Suppress("FunctionName")
-@Deprecated("Changed to \"`===`\" to better reflect Scala API.", ReplaceWith("this `===` c"))
-infix fun Column.`==`(c: Column) = `$eq$eq$eq`(c)
-
-/**
- * Unary minus, i.e. negate the expression.
- * ```
- * // Scala: select the amount column and negates all values.
- * df.select( -df("amount") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.select( -df("amount") )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.select( negate(col("amount") );
- * ```
- */
-operator fun Column.unaryMinus(): Column = `unary_$minus`()
-
-/**
- * Inversion of boolean expression, i.e. NOT.
- * ```
- * // Scala: select rows that are not active (isActive === false)
- * df.filter( !df("isActive") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.select( !df("amount") )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.filter( not(df.col("isActive")) );
- * ```
- */
-operator fun Column.not(): Column = `unary_$bang`()
-
-/**
- * Equality test.
- * ```
- * // Scala:
- * df.filter( df("colA") === df("colB") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.filter( df("colA") eq df("colB") )
- * // or
- * df.filter( df("colA") `===` df("colB") )
- *
- * // Java
- * import static org.apache.spark.sql.functions.*;
- * df.filter( col("colA").equalTo(col("colB")) );
- * ```
- */
-infix fun Column.eq(other: Any): Column = `$eq$eq$eq`(other)
-
-/**
- * Equality test.
- * ```
- * // Scala:
- * df.filter( df("colA") === df("colB") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.filter( df("colA") eq df("colB") )
- * // or
- * df.filter( df("colA") `===` df("colB") )
- *
- * // Java
- * import static org.apache.spark.sql.functions.*;
- * df.filter( col("colA").equalTo(col("colB")) );
- * ```
- */
-infix fun Column.`===`(other: Any): Column = `$eq$eq$eq`(other)
-
-/**
- * Inequality test.
- * ```
- * // Scala:
- * df.select( df("colA") =!= df("colB") )
- * df.select( !(df("colA") === df("colB")) )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.select( df("colA") neq df("colB") )
- * df.select( !(df("colA") eq df("colB")) )
- * // or
- * df.select( df("colA") `=!=` df("colB") )
- * df.select( !(df("colA") `===` df("colB")) )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.filter( col("colA").notEqual(col("colB")) );
- * ```
- */
-infix fun Column.neq(other: Any): Column = `$eq$bang$eq`(other)
-
-/**
- * Inequality test.
- * ```
- * // Scala:
- * df.select( df("colA") =!= df("colB") )
- * df.select( !(df("colA") === df("colB")) )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.select( df("colA") neq df("colB") )
- * df.select( !(df("colA") eq df("colB")) )
- * // or
- * df.select( df("colA") `=!=` df("colB") )
- * df.select( !(df("colA") `===` df("colB")) )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.filter( col("colA").notEqual(col("colB")) );
- * ```
- */
-infix fun Column.`=!=`(other: Any): Column = `$eq$bang$eq`(other)
-
-/**
- * Greater than.
- * ```
- * // Scala: The following selects people older than 21.
- * people.select( people("age") > 21 )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.select( people("age") gt 21 )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("age").gt(21) );
- * ```
- */
-infix fun Column.gt(other: Any): Column = `$greater`(other)
-
-/**
- * Less than.
- * ```
- * // Scala: The following selects people younger than 21.
- * people.select( people("age") < 21 )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.select( people("age") lt 21 )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("age").lt(21) );
- * ```
- */
-infix fun Column.lt(other: Any): Column = `$less`(other)
-
-/**
- * Less than or equal to.
- * ```
- * // Scala: The following selects people age 21 or younger than 21.
- * people.select( people("age") <= 21 )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.select( people("age") leq 21 )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("age").leq(21) );
- * ```
- */
-infix fun Column.leq(other: Any): Column = `$less$eq`(other)
-
-/**
- * Greater than or equal to an expression.
- * ```
- * // Scala: The following selects people age 21 or older than 21.
- * people.select( people("age") >= 21 )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.select( people("age") geq 21 )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("age").geq(21) );
- * ```
- */
-infix fun Column.geq(other: Any): Column = `$greater$eq`(other)
-
-/**
- * True if the current column is in the given [range].
- * ```
- * // Scala:
- * df.where( df("colA").between(1, 5) )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.where( df("colA") inRangeOf 1..5 )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.where( df.col("colA").between(1, 5) );
- * ```
- */
-infix fun Column.inRangeOf(range: ClosedRange<*>): Column = between(range.start, range.endInclusive)
-
-/**
- * Boolean OR.
- * ```
- * // Scala: The following selects people that are in school or employed.
- * people.filter( people("inSchool") || people("isEmployed") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.filter( people("inSchool") or people("isEmployed") )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.filter( people.col("inSchool").or(people.col("isEmployed")) );
- * ```
- */
-infix fun Column.or(other: Any): Column = `$bar$bar`(other)
-
-/**
- * Boolean AND.
- * ```
- * // Scala: The following selects people that are in school and employed at the same time.
- * people.select( people("inSchool") && people("isEmployed") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.filter( people("inSchool") and people("isEmployed") )
- * // or
- * people.filter( people("inSchool") `&&` people("isEmployed") )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("inSchool").and(people.col("isEmployed")) );
- * ```
- */
-infix fun Column.and(other: Any): Column = `$amp$amp`(other)
-
-/**
- * Boolean AND.
- * ```
- * // Scala: The following selects people that are in school and employed at the same time.
- * people.select( people("inSchool") && people("isEmployed") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.filter( people("inSchool") and people("isEmployed") )
- * // or
- * people.filter( people("inSchool") `&&` people("isEmployed") )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("inSchool").and(people.col("isEmployed")) );
- * ```
- */
-infix fun Column.`&&`(other: Any): Column = `$amp$amp`(other)
-
-/**
- * Multiplication of this expression and another expression.
- * ```
- * // Scala: The following multiplies a person's height by their weight.
- * people.select( people("height") * people("weight") )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * people.select( people("height") * people("weight") )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("height").multiply(people.col("weight")) );
- * ```
- */
-operator fun Column.times(other: Any): Column = `$times`(other)
-
-/**
- * Division this expression by another expression.
- * ```
- * // Scala: The following divides a person's height by their weight.
- * people.select( people("height") / people("weight") )
- *
- * // Kotlin
- * import org.jetbrains.kotlinx.spark.api.*
- * people.select( people("height") / people("weight") )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * people.select( people.col("height").divide(people.col("weight")) );
- * ```
- */
-operator fun Column.div(other: Any): Column = `$div`(other)
-
-/**
- * Modulo (a.k.a. remainder) expression.
- * ```
- * // Scala:
- * df.where( df("colA") % 2 === 0 )
- *
- * // Kotlin:
- * import org.jetbrains.kotlinx.spark.api.*
- * df.where( df("colA") % 2 eq 0 )
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.where( df.col("colA").mod(2).equalTo(0) );
- * ```
- */
-operator fun Column.rem(other: Any): Column = `$percent`(other)
-
-/**
- * An expression that gets an item at position `ordinal` out of an array,
- * or gets a value by key `key` in a `MapType`.
- * ```
- * // Scala:
- * df.where( df("arrayColumn").getItem(0) === 5 )
- *
- * // Kotlin
- * import org.jetbrains.kotlinx.spark.api.*
- * df.where( df("arrayColumn")[0] eq 5 )
- *
- * // Java
- * import static org.apache.spark.sql.functions.*;
- * df.where( df.col("arrayColumn").getItem(0).equalTo(5) );
- * ```
- */
-operator fun Column.get(key: Any): Column = getItem(key)
-
-fun lit(a: Any) = functions.lit(a)
-
-/**
- * Provides a type hint about the expected return value of this column. This information can
- * be used by operations such as `select` on a [Dataset] to automatically convert the
- * results into the correct JVM types.
- *
- * ```
- * val df: Dataset = ...
- * val typedColumn: Dataset = df.selectTyped( col("a").`as`() )
- * ```
- */
-@Suppress("UNCHECKED_CAST")
-inline fun Column.`as`(): TypedColumn = `as`(encoder())
-
-
-/**
- * Alias for [Dataset.joinWith] which passes "left" argument
- * and respects the fact that in result of left join right relation is nullable
- *
- * @receiver left dataset
- * @param right right dataset
- * @param col join condition
- *
- * @return dataset of pairs where right element is forced nullable
- */
-inline fun Dataset.leftJoin(right: Dataset, col: Column): Dataset> {
- return joinWith(right, col, "left").map { it._1 to it._2 }
-}
-
-/**
- * Alias for [Dataset.joinWith] which passes "right" argument
- * and respects the fact that in result of right join left relation is nullable
- *
- * @receiver left dataset
- * @param right right dataset
- * @param col join condition
- *
- * @return dataset of [Pair] where left element is forced nullable
- */
-inline fun Dataset.rightJoin(right: Dataset, col: Column): Dataset> {
- return joinWith(right, col, "right").map { it._1 to it._2 }
-}
-
-/**
- * Alias for [Dataset.joinWith] which passes "inner" argument
- *
- * @receiver left dataset
- * @param right right dataset
- * @param col join condition
- *
- * @return resulting dataset of [Pair]
- */
-inline fun Dataset.innerJoin(right: Dataset, col: Column): Dataset> {
- return joinWith(right, col, "inner").map { it._1 to it._2 }
-}
-
-/**
- * Alias for [Dataset.joinWith] which passes "full" argument
- * and respects the fact that in result of join any element of resulting tuple is nullable
- *
- * @receiver left dataset
- * @param right right dataset
- * @param col join condition
- *
- * @return dataset of [Pair] where both elements are forced nullable
- */
-inline fun Dataset.fullJoin(
- right: Dataset,
- col: Column,
-): Dataset> {
- return joinWith(right, col, "full").map { it._1 to it._2 }
-}
-
-/**
- * Alias for [Dataset.sort] which forces user to provide sorted columns from the source dataset
- *
- * @receiver source [Dataset]
- * @param columns producer of sort columns
- * @return sorted [Dataset]
- */
-inline fun Dataset.sort(columns: (Dataset) -> Array) = sort(*columns(this))
-
-/**
- * This function creates block, where one can call any further computations on already cached dataset
- * Data will be unpersisted automatically at the end of computation
- *
- * it may be useful in many situations, for example, when one needs to write data to several targets
- * ```kotlin
- * ds.withCached {
- * write()
- * .also { it.orc("First destination") }
- * .also { it.avro("Second destination") }
- * }
- * ```
- *
- * @param blockingUnpersist if execution should be blocked until everything persisted will be deleted
- * @param executeOnCached Block which should be executed on cached dataset.
- * @return result of block execution for further usage. It may be anything including source or new dataset
- */
-inline fun Dataset.withCached(
- blockingUnpersist: Boolean = false,
- executeOnCached: Dataset.() -> R,
-): R {
- val cached = this.cache()
- return cached.executeOnCached().also { cached.unpersist(blockingUnpersist) }
-}
-
-inline fun Dataset.toList() = KSparkExtensions.collectAsList(to())
-inline fun Dataset<*>.toArray(): Array = to().collect() as Array
-
-/**
- * Selects column based on the column name and returns it as a [Column].
- *
- * @note The column name can also reference to a nested column like `a.b`.
- */
-operator fun Dataset.invoke(colName: String): Column = col(colName)
-
-/**
- * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
- * ```kotlin
- * val dataset: Dataset = ...
- * val columnA: TypedColumn = dataset.col(YourClass::a)
- * ```
- * @see invoke
- */
-
-@Suppress("UNCHECKED_CAST")
-inline fun Dataset.col(column: KProperty1): TypedColumn =
- col(column.name).`as`() as TypedColumn
-
-/**
- * Returns a [Column] based on the given class attribute, not connected to a dataset.
- * ```kotlin
- * val dataset: Dataset = ...
- * val new: Dataset> = dataset.select( col(YourClass::a), col(YourClass::b) )
- * ```
- * TODO: change example to [Pair]s when merged
- */
-@Suppress("UNCHECKED_CAST")
-inline fun col(column: KProperty1): TypedColumn =
- functions.col(column.name).`as`() as TypedColumn
-
-/**
- * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
- * ```kotlin
- * val dataset: Dataset = ...
- * val columnA: TypedColumn = dataset(YourClass::a)
- * ```
- * @see col
- */
-inline operator fun Dataset.invoke(column: KProperty1): TypedColumn = col(column)
-
-/**
- * Allows to sort data class dataset on one or more of the properties of the data class.
- * ```kotlin
- * val sorted: Dataset = unsorted.sort(YourClass::a)
- * val sorted2: Dataset = unsorted.sort(YourClass::a, YourClass::b)
- * ```
- */
-fun Dataset.sort(col: KProperty1, vararg cols: KProperty1): Dataset =
- sort(col.name, *cols.map { it.name }.toTypedArray())
-
-/**
- * Alternative to [Dataset.show] which returns source dataset.
- * Useful for debug purposes when you need to view content of a dataset as an intermediate operation
- */
-fun Dataset.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) }
-
-/**
- * Returns a new Dataset by computing the given [Column] expressions for each element.
- */
-@Suppress("UNCHECKED_CAST")
-inline fun Dataset.selectTyped(
- c1: TypedColumn,
-): Dataset = select(c1 as TypedColumn)
-
-/**
- * Returns a new Dataset by computing the given [Column] expressions for each element.
- */
-@Suppress("UNCHECKED_CAST")
-inline fun Dataset.selectTyped(
- c1: TypedColumn,
- c2: TypedColumn,
-): Dataset> =
- select(
- c1 as TypedColumn,
- c2 as TypedColumn,
- ).map { Pair(it._1(), it._2()) }
-
-/**
- * Returns a new Dataset by computing the given [Column] expressions for each element.
- */
-@Suppress("UNCHECKED_CAST")
-inline fun Dataset.selectTyped(
- c1: TypedColumn,
- c2: TypedColumn,
- c3: TypedColumn,
-): Dataset> =
- select(
- c1 as TypedColumn,
- c2 as TypedColumn,
- c3 as TypedColumn,
- ).map { Triple(it._1(), it._2(), it._3()) }
-
-/**
- * Returns a new Dataset by computing the given [Column] expressions for each element.
- */
-@Suppress("UNCHECKED_CAST")
-inline fun Dataset.selectTyped(
- c1: TypedColumn,
- c2: TypedColumn,
- c3: TypedColumn,
- c4: TypedColumn,
-): Dataset