Skip to content

Commit 4177681

Browse files
authored
Merge branch 'master' into kubernetes_state
2 parents 9d567ed + ee677a6 commit 4177681

File tree

14 files changed

+142
-164
lines changed

14 files changed

+142
-164
lines changed

docs/client/python/index.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,4 @@ Python
2121
:maxdepth: 2
2222

2323
pyhive
24-
pyspark
2524
jaydebeapi

docs/client/python/pyspark.md

Lines changed: 0 additions & 133 deletions
This file was deleted.

docs/extensions/engines/spark/jdbc-dialect.md

Lines changed: 117 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,131 @@
1515
- limitations under the License.
1616
-->
1717

18-
# Hive Dialect Support
18+
# Hive JDBC Data Source Dialect
1919

20-
Hive Dialect plugin aims to provide Hive Dialect support to Spark's JDBC source.
20+
Hive JDBC Data Source dialect plugin aims to provide Hive Dialect support to [Spark's JDBC Data Source](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html).
2121
It will auto registered to Spark and applied to JDBC sources with url prefix of `jdbc:hive2://` or `jdbc:kyuubi://`.
2222

23-
Hive Dialect helps to solve failures access Kyuubi. It fails and unexpected results when querying data from Kyuubi as JDBC source with Hive JDBC Driver or Kyuubi Hive JDBC Driver in Spark, as Spark JDBC provides no Hive Dialect support out of box and quoting columns and other identifiers in ANSI as "table.column" rather than in HiveSQL style as \`table\`.\`column\`.
23+
Hive Dialect helps to solve failures access Kyuubi. It fails and unexpected results when querying data from Kyuubi as
24+
JDBC data source with Hive JDBC Driver or Kyuubi Hive JDBC Driver in Spark, as Spark JDBC provides no Hive Dialect support
25+
out of box and quoting columns and other identifiers in ANSI as "table.column" rather than in HiveSQL style as
26+
\`table\`.\`column\`.
27+
28+
Notes: this is an inefficient way to access data stored in Hive warehouse, you can see more discussions at [SPARK-47482](https://github.com/apache/spark/pull/45609).
2429

2530
## Features
2631

27-
- quote identifier in Hive SQL style
32+
- Quote identifier in Hive SQL style
33+
34+
e.g. Quote `table.column` in \`table\`.\`column\`
35+
36+
- Adapt to Hive data type definitions
37+
38+
Reference: https://cwiki.apache.org/confluence/display/hive/languagemanual+types
39+
40+
## Preparation
41+
42+
### Prepare JDBC driver
43+
44+
Prepare JDBC driver jar file. Supported Hive compatible JDBC Driver as below:
45+
46+
| Driver | Driver Class Name | Remarks |
47+
|-----------------------------------------------------------|-----------------------------------------|----------------------------------------------------------------------------------------------------------|
48+
| Kyuubi Hive JDBC Driver ([doc](../jdbc/kyuubi_jdbc.html)) | org.apache.kyuubi.jdbc.KyuubiHiveDriver | Use v1.6.1 or later versions, which includes [KYUUBI #3484](https://github.com/apache/kyuubi/pull/3485). |
49+
| Hive JDBC Driver ([doc](../jdbc/hive_jdbc.html)) | org.apache.hive.jdbc.HiveDriver | The Hive JDBC driver is already included in official Spark binary distribution. |
50+
51+
Refer to docs of the driver and prepare the JDBC driver jar file.
52+
53+
### Prepare JDBC Hive Dialect extension
54+
55+
Prepare the plugin jar file `kyuubi-extension-spark-jdbc-dialect_-*.jar`.
56+
57+
Get the Kyuubi Hive Dialect Extension jar from Maven Central
58+
59+
```
60+
<dependency>
61+
<groupId>org.apache.kyuubi</groupId>
62+
<artifactId>kyuubi-extension-spark-jdbc-dialect_2.12</artifactId>
63+
<version>{latest-version}</version>
64+
</dependency>
65+
```
66+
67+
Or, compile the extension by executing
68+
69+
```
70+
build/mvn clean package -pl :kyuubi-extension-spark-jdbc-dialect_2.12 -DskipTests
71+
```
2872

29-
eg. Quote `table.column` in \`table\`.\`column\`
73+
then get the extension jar under `extensions/spark/kyuubi-extension-spark-jdbc-dialect/target`.
74+
75+
If you like, you can compile the extension jar with the corresponding Maven's profile on you compile command,
76+
i.e. you can get extension jar for Spark 3.5 by compiling with `-Pspark-3.5`
77+
78+
### Including jars of JDBC driver and Hive Dialect extension
79+
80+
Choose one of the following ways to include jar files in Spark.
81+
82+
- Put the jar file of JDBC driver and Hive Dialect to `$SPARK_HOME/jars` directory to make it visible for all Spark applications. And adding `spark.sql.extensions = org.apache.spark.sql.dialect.KyuubiSparkJdbcDialectExtension` to `$SPARK_HOME/conf/spark_defaults.conf.`
83+
84+
- With each `spark-submit`(or `spark-sql`, `pyspark` etc.) commands, include the JDBC driver when submitting the application with `--packages`, and the Hive Dialect plugins with `--jars`
85+
86+
```
87+
$SPARK_HOME/bin/spark-submit \
88+
--packages org.apache.hive:hive-jdbc:x.y.z \
89+
--jars /path/kyuubi-extension-spark-jdbc-dialect_-*.jar \
90+
...
91+
```
92+
93+
- Setting jars and config with SparkSession builder
94+
95+
```
96+
val spark = SparkSession.builder
97+
.config("spark.jars", "/path/hive-jdbc-x.y.z.jar,/path/kyuubi-extension-spark-jdbc-dialect_-*.jar")
98+
.config("spark.sql.extensions", "org.apache.spark.sql.dialect.KyuubiSparkJdbcDialectExtension")
99+
.getOrCreate()
100+
```
30101

31102
## Usage
32103

33-
1. Get the Kyuubi Hive Dialect Extension jar
34-
1. compile the extension by executing `build/mvn clean package -pl :kyuubi-extension-spark-jdbc-dialect_2.12 -DskipTests`
35-
2. get the extension jar under `extensions/spark/kyuubi-extension-spark-jdbc-dialect/target`
36-
3. If you like, you can compile the extension jar with the corresponding Maven's profile on you compile command, i.e. you can get extension jar for Spark 3.5 by compiling with `-Pspark-3.5`
37-
2. Put the Kyuubi Hive Dialect Extension jar `kyuubi-extension-spark-jdbc-dialect_-*.jar` into `$SPARK_HOME/jars`
38-
3. Enable `KyuubiSparkJdbcDialectExtension`, by setting `spark.sql.extensions=org.apache.spark.sql.dialect.KyuubiSparkJdbcDialectExtension`, i.e.
39-
- add a config into `$SPARK_HOME/conf/spark-defaults.conf`
40-
- or add setting config in SparkSession builder
104+
### Using as JDBC Datasource programmingly
105+
106+
```
107+
# Loading data from Kyuubi via HiveDriver as JDBC datasource
108+
val jdbcDF = spark.read
109+
.format("jdbc")
110+
.option("driver", "org.apache.hive.jdbc.HiveDriver")
111+
.option("url", "jdbc:hive2://kyuubi_server_ip:port")
112+
.option("dbtable", "schema.tablename")
113+
.option("user", "username")
114+
.option("password", "password")
115+
.option("query", "select * from testdb.src_table")
116+
.load()
117+
```
118+
119+
### Using as JDBC Datasource table with SQL
120+
121+
From Spark 3.2.0, [`CREATE DATASOURCE TABLE`](https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-datasource.html)
122+
is supported to create jdbc source with SQL.
123+
124+
```sql
125+
-- create JDBC data source table
126+
CREATE TABLE kyuubi_table
127+
USING JDBC
128+
OPTIONS (
129+
driver='org.apache.hive.jdbc.HiveDriver',
130+
url='jdbc:hive2://kyuubi_server_ip:port',
131+
user='user',
132+
password='password',
133+
dbtable='testdb.some_table'
134+
)
135+
136+
-- query data
137+
SELECT * FROM kyuubi_table
138+
139+
-- write data in overwrite mode
140+
INSERT OVERWRITE kyuubi_table SELECT ...
141+
142+
-- write data in append mode
143+
INSERT INTO kyuubi_table SELECT ...
144+
```
41145

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiBaseResultSet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ public int findColumn(String columnName) throws SQLException {
6767
@Override
6868
public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
6969
final Object val = getObject(columnIndex);
70+
if (val == null) {
71+
return null;
72+
}
7073
if (val instanceof BigDecimal) {
7174
return (BigDecimal) val;
7275
}

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
2626
import org.apache.kyuubi.Logging
2727
import org.apache.kyuubi.config.KyuubiConf
2828
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
29+
import org.apache.kyuubi.server.metadata.MetadataManager
2930

3031
trait ApplicationOperation {
3132

3233
/**
3334
* Step for initializing the instance.
3435
*/
35-
def initialize(conf: KyuubiConf): Unit
36+
def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit
3637

3738
/**
3839
* Step to clean up the instance

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ import java.nio.file.Paths
2222
import scala.sys.process._
2323

2424
import org.apache.kyuubi.config.KyuubiConf
25+
import org.apache.kyuubi.server.metadata.MetadataManager
2526

2627
class JpsApplicationOperation extends ApplicationOperation {
2728
import ApplicationOperation._
2829

2930
private var runner: String = _
3031

31-
override def initialize(conf: KyuubiConf): Unit = {
32+
override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = {
3233
val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home"))
3334
.map(Paths.get(_, "bin", "jps").toString)
3435
.getOrElse("jps")

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{A
3737
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
3838
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
3939
import org.apache.kyuubi.operation.OperationState
40-
import org.apache.kyuubi.server.KyuubiServer
40+
import org.apache.kyuubi.server.metadata.MetadataManager
4141
import org.apache.kyuubi.server.metadata.api.KubernetesEngineInfo
42-
import org.apache.kyuubi.session.KyuubiSessionManager
4342
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
4443

4544
class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@@ -81,8 +80,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
8180
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
8281
}
8382

84-
private def metadataManager = KyuubiServer.kyuubiServer.backendService
85-
.sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
83+
private var metadataManager: Option[MetadataManager] = _
8684

8785
// Visible for testing
8886
private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): Unit = {
@@ -120,8 +118,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
120118
}
121119
}
122120

123-
override def initialize(conf: KyuubiConf): Unit = {
121+
override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = {
124122
kyuubiConf = conf
123+
this.metadataManager = metadataManager
125124
info("Start initializing Kubernetes application operation.")
126125
submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
127126
// Defer cleaning terminated application information

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import org.apache.kyuubi.config.KyuubiConf
3131
import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY
3232
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
3333
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
34+
import org.apache.kyuubi.server.metadata.MetadataManager
3435
import org.apache.kyuubi.service.AbstractService
3536
import org.apache.kyuubi.util.reflect.ReflectUtils._
3637

37-
class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager") {
38+
class KyuubiApplicationManager(metadataManager: Option[MetadataManager])
39+
extends AbstractService("KyuubiApplicationManager") {
3840

3941
// TODO: maybe add a configuration is better
4042
private val operations =
@@ -43,7 +45,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
4345
override def initialize(conf: KyuubiConf): Unit = {
4446
operations.foreach { op =>
4547
try {
46-
op.initialize(conf)
48+
op.initialize(conf, metadataManager)
4749
} catch {
4850
case NonFatal(e) => warn(s"Error starting ${op.getClass.getSimpleName}: ${e.getMessage}")
4951
}

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._
3232
import org.apache.kyuubi.engine.ApplicationOperation._
3333
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
3434
import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
35+
import org.apache.kyuubi.server.metadata.MetadataManager
3536
import org.apache.kyuubi.util.KyuubiHadoopUtils
3637

3738
class YarnApplicationOperation extends ApplicationOperation with Logging {
@@ -40,7 +41,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
4041
@volatile private var adminYarnClient: Option[YarnClient] = None
4142
private var submitTimeout: Long = _
4243

43-
override def initialize(conf: KyuubiConf): Unit = {
44+
override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = {
4445
submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT)
4546
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
4647

0 commit comments

Comments
 (0)