Skip to content

Commit

Permalink
Support for writes in FullAcid and InsertOnly tables. (#30)
Browse files Browse the repository at this point in the history
[SPAR-3779]:[oss]: Insert into/overwrite support for orc full acid tables
[SPAR-3917] Fix OSS bugs

Approved-by: Rajkumar Iyer <[email protected]>
(cherry picked from commit dec9109)

[SPAR-3841][oss] Insert Into/Overwrite support for Insert Only tables
[SPAR-3780][oss] Update/Delete support for FullAcid non bucketed tables

Co-authored-by: prakharjain09 <[email protected]>

Approved-by: Amogh Margoor
(cherry picked from commit 797a741)

[SPAR-4039][oss] Restructure code for new anti-join based reader
(cherry picked from commit 07475effdd4ebe1cc170e88d889bbc68b6d48cbe)

[SPAR-3780][oss][bugfix] Fix delete to use conditional filter properly.
(cherry picked from commit 9f9bc84)

[SPAR-4039][oss] Restructure code for new anti-join based reader II
(cherry picked from commit 2b7d2ad)

[SPAR-3790][oss] Code changes to fixing locking and snapshot semantics for read and write.
(cherry picked from commit 00209f9)
  • Loading branch information
citrusraj authored Jan 16, 2020
1 parent 7c01883 commit 2e66f76
Show file tree
Hide file tree
Showing 54 changed files with 3,780 additions and 1,443 deletions.
163 changes: 112 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@

A Datasource on top of Spark Datasource V1 APIs, that provides Spark support for [Hive ACID transactions](https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions).

This datasource provides the capability to work with Hive ACID V2 tables, both Full ACID tables as well as Insert-Only tables. Currently, it supports reading from these ACID tables only, and ability to write will be added in the near future.
This datasource provides the capability to work with Hive ACID V2 tables, both Full ACID tables as well as Insert-Only tables.

functionality availability matrix

Functionality | Full ACID table | Insert Only Table |
------------- | --------------- | ----------------- |
READ | >= v0.4.0 | >= v0.4.0 |
INSERT INTO / OVERWRITE | >= v0.4.3 | >=0.4.5 |
CTAS | >= v0.4.3 | >=0.4.5 |
UPDATE | >=0.4.5 | Not Supported |
DELETE | >=0.4.5 | Not Supported |
MERGE | Not Supported | Not Supported |

*Note: In case of insert only table for support of write operation compatibility check needs to be disabled*

## Quick Start

These are the pre-requisites to using this library:

1. You already have Hive ACID tables (ACID V2) and need to read it from Spark (as currently write is not _NOT_ supported).
2. You have Hive Metastore DB with version 3.0.0 or higher. Please refer to [Hive Metastore](https://cwiki.apache.org/confluence/display/Hive/Design#Design-MetastoreArchitecture) for details.
3. You have a Hive Metastore Server running with version 3.0.0 or higher, as Hive ACID needs a standalone Hive Metastore Server to operate. Please refer to [Hive Configuration](https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions#HiveTransactions-Configuration) for configuration options.
4. You are using the above Hive Metastore Server with your Spark for its metastore communications.
1. You have Hive Metastore DB with version 3.1.2 or higher. Please refer to [Hive Metastore](https://cwiki.apache.org/confluence/display/Hive/Design#Design-MetastoreArchitecture) for details.
2. You have a Hive Metastore Server running with version 3.1.1 or higher, as Hive ACID needs a standalone Hive Metastore Server to operate. Please refer to [Hive Configuration](https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions#HiveTransactions-Configuration) for configuration options.

### Config

Expand All @@ -32,47 +43,112 @@ Change configuration in `$SPARK_HOME/conf/hive-site.xml` to point to already con

There are a few ways to use the library while running spark-shell

1. Use the published package
`spark-shell --packages qubole:spark-acid:0.4.2-s_2.11

spark-shell --packages qubole:spark-acid:0.4.0-s_2.11

2. If you built the jar yourself, copy the `spark-acid-assembly-0.4.0.jar` jar into `$SPARK_HOME/assembly/target/scala.2_11/jars` and run
2. If you built the jar yourself, copy the `spark-acid-assembly-0.4.2.jar` jar into `$SPARK_HOME/assembly/target/scala.2_11/jars` and run

spark-shell

#### Scala/Python

To read the acid table from Scala / pySpark, the table can be directly accessed using this datasource.
Note the short name of this datasource is `HiveAcid`
To operate on Hive ACID table from Scala / pySpark, the table can be directly accessed using this datasource. Note the short name of this datasource is `HiveAcid`. Hive ACID table are tables in HiveMetastore so any operation of read and/or write needs `format("HiveAcid").option("table", "<table name>"")`. _Direct read and write from the file is not supported_

scala> val df = spark.read.format("HiveAcid").options(Map("table" -> "default.acidtbl")).load()
scala> df.collect()

#### SQL
To read an existing Hive acid table through pure SQL, you need to create a dummy table that acts as a symlink to the
original acid table. This symlink is required to instruct Spark to use this datasource against an existing table.

To create the symlink table
To read an existing Hive acid table through pure SQL, there are two ways:

1. Create a dummy table that acts as a symlink to the original acid table. This symlink is required to instruct Spark to use this datasource against an existing table.

scala> spark.sql("create table symlinkacidtable using HiveAcid options ('table' 'default.acidtbl')")
To create the symlink table:

_NB: This will produce a warning indicating that Hive does not understand this format_
spark.sql("create table symlinkacidtable using HiveAcid options ('table' 'default.acidtbl')")

WARN hive.HiveExternalCatalog: Couldn’t find corresponding Hive SerDe for data source provider com.qubole.spark.datasources.hiveacid.HiveAcidDataSource. Persisting data source table `default`.`sparkacidtbl` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
spark.sql("select * from symlinkacidtable")


_NB: This will produce a warning indicating that Hive does not understand this format_

WARN hive.HiveExternalCatalog: Couldn’t find corresponding Hive SerDe for data source provider com.qubole.spark.hiveacid.datasource.HiveAcidDataSource. Persisting data source table `default`.`sparkacidtbl` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

_Please ignore it, as this is a sym table for Spark to operate with and no underlying storage._

To read the table data:
2. Use SparkSession extensions framework to add a new Analyzer rule (HiveAcidAutoConvert) to Spark Analyser. This analyzer rule automatically converts an _HiveTableRelation_ representing acid table to _LogicalRelation_ backed by HiveAcidRelation.

scala> var df = spark.sql("select * from symlinkacidtable")
scala> df.collect()
To use this, initialize SparkSession with the extension builder as mentioned below:

val spark = SparkSession.builder()
.appName("Hive-acid-test")
.config("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension")
.enableHiveSupport()
.<OTHER OPTIONS>
.getOrCreate()

## Latest Binaries
spark.sql("select * from default.acidtbl")

#### Example

##### Create Hive ACID Table

Drop Existing table

spark.sql("Drop table if exists aciddemo.t_scala_simple")

Create table

spark.sql("CREATE TABLE aciddemo.t_scala_simple (status BOOLEAN, tweet ARRAY<STRING>, rank DOUBLE, username STRING) STORED AS ORC TBLPROPERTIES('TRANSACTIONAL' = 'true')")

Check if it is transactional

spark.sql("DESCRIBE extended aciddemo.t_scala_simple").show()


##### Scala

Read Existing table and insert into acid table

val df = spark.read.format("HiveAcid").options(Map("table" -> "aciddemo.acidtbl")).load()
df.write.format("HiveAcid").option("table", "aciddemo.t_scala_simple").mode("append").save()

Read Existing table and insert overwrite acid table

ACID datasource is published spark-packages.org. The latest version of the binary is `0.4.0`
val df = spark.read.format("HiveAcid").options(Map("table" -> "aciddemo.acidtbl")).load()
df.write.format("HiveAcid").option("table", "aciddemo.t_scala_simple").mode("overwrite").save()


_Note: User cannot operate directly on file level data as table is required when reading and writing transactionally.
`df.write.format("HiveAcid").mode("overwrite").save("s3n://aciddemo/api/warehouse/aciddemo.db/random")` won't work_

Read acid table

val df = spark.read.format("HiveAcid").options(Map("table" -> "aciddemo.t_scala_simple")).load()
df.select("status", "rank").filter($"rank" > "20").show()

##### SQL

Insert into the table select as

spark.sql("INSERT INTO aciddemo.t_sql_simple select * from aciddemo.acidtbl")

Insert overwrite the table select as

spark.sql("INSERT OVERWRITE TABLE aciddemo.t_sql_simple select * from aciddemo.acidtbl")

Insert into"

spark.sql("INSERT INTO aciddemo.t_sql_simple VALUES(false, array("test"), 11.2, 'qubole')")

Read

spark.sql("SELECT status, rank from aciddemo.t_sql_simple where rank > 20")


## Latest Binaries

ACID datasource is published spark-packages.org. The latest version of the binary is `0.4.2`

## Version Compatibility

### Compatibility with Apache Spark Versions
Expand All @@ -90,21 +166,18 @@ _NB: Hive ACID V2 is supported in Hive 3.0.0 onwards and for that hive Metastore
## Developer resources
### Build

1. First, build the dependencies and publish it to local. The *shaded-dependencies* sub-project is an sbt project to create the shaded hive metastore and hive exec jars combined into a fat jar `spark-acid-shaded-dependencies`. This is required due to our dependency on Hive 3 for Hive ACID, and Spark currently only supports Hive 1.2

To compile and publish shaded dependencies jar:
1. First, build the dependencies and publish it to local. The *shaded-dependencies* sub-project is an sbt project to create the shaded hive metastore and hive exec jars combined into a fat jar `spark-acid-shaded-dependencies`. This is required due to our dependency on Hive 3 for Hive ACID, and Spark currently only supports Hive 1.2. To compile and publish shaded dependencies jar:

cd shaded-dependencies
sbt clean publishLocal

2. Next, build the main project:

cd ../
sbt assembly
sbt assembly

This will create the `spark-acid-assembly-0.4.0.jar` which can be now used in your application.
This will create the `spark-acid-assembly-0.4.2.jar` which can be now used in your application.

### Test
### Test
Tests are run against a standalone docker setup. Please refer to [Docker setup] (docker/README.md) to build and start a container.

_NB: Container run HMS server, HS2 Server and HDFS and listens on port 10000,10001 and 9000 respectively. So stop if you are running HMS or HDFS on same port on host machine._
Expand All @@ -129,37 +202,25 @@ Read more about [sbt release](https://github.com/sbt/sbt-release)

### Design Constraints

Hive ACID works with locks, where every client that is operating on ACID tables is expected to acquire locks for the duration of reads and writes. This datasource however does not acquire read locks. When it needs to read data, it talks to the HiveMetaStore Server to get the list of transactions that have been committed, and using that, the list of files it should read from the filesystem. But it does not lock the table or partition for the duration of the read.

Because it does not acquire read locks, there is a chance that the data being read could get deleted by Hive's ACID management(perhaps because the data was ready to be cleaned up due to compaction). To avoid this scenario which can read to query failures, we recommend that you disable automatic compaction and cleanup in Hive on the tables that you are going to be reading using this datasource, and recommend that the compaction and cleanup be done when you know that no users are reading those tables. Ideally, we would have wanted to just disable automatic cleanup and let the compaction happen, but there is no way in Hive today to just disable cleanup and it is tied to compaction, so we recommend to disable compaction.

You have a few options available to you to disable automatic compaction:

1. Disable automatic compaction globally, i.e. for all ACID tables: To do this, we recommend you set the following compaction thresholds on the Hive Metastore Server to a very high number(like 1000000 below) so that compaction never gets initiated automatically and can only be initiated manually.

hive.compactor.delta.pct.threshold=1000000
hive.compactor.delta.num.threshold=1000000

2. Disable automatic compaction for selected ACID tables: To do this, you can set a table property using the ALTER TABLE command:
1. This datasource when it needs to read data, it talks to the HiveMetaStore Server to get the list of transactions that have been committed, and using that, the list of files it should read from the filesystem (_uses s3 listing_). Given the snapshot of list of file is created by using listing, to avoid inconsistent copy of data, on cloud object store service like S3 guard should be used.

ALTER TABLE <> SET TBLPROPERTIES ("NO_AUTO_COMPACTION"="true")
2. This snapshot of list of files is created at the RDD level. These snapshot are at the RDD level so even when using same table in single SQL it may be operating on two different snapshots

This will disable automatic compaction on a particular table, and you can use this approach if you have a limited set of ACID tables that you intend to access using this datasource.
spark.sql("select * from a join a)

Once you have disabled automatic compaction either globally or on a particular set of tables, you can chose to run compaction manually at a desired time when you know there are no readers reading these acid tables, using an ALTER TABLE command:
3. The files in the snapshot needs to be protected till the RDD is in use. By design concurrent reads and writes on the Hive ACID works with the help of locks, where every client (across multiple engines) that is operating on ACID tables is expected to acquire locks for the duration of reads and writes. The lifetime of RDD can be very long, to avoid blocking other operations like inserts this datasource _DOES NOT_ acquire lock but uses an alternative mechanism to protect reads. Other way the snapshot can be protected is by making sure the files in the snapshot are not deleted while in use. For the current datasoure any table on which Spark is operating `Automatic Compaction` should be disabled. This makes sure that cleaner does not clean any file. To disable automatic compaction on table

ALTER TABLE table_name [PARTITION (partition_key = 'partition_value' [, ...])] COMPACT 'compaction_type'[AND WAIT] [WITH OVERWRITE TBLPROPERTIES ("property"="value" [, ...])];
ALTER TABLE <> SET TBLPROPERTIES ("NO_AUTO_COMPACTION"="true")

compaction_type are either `MAJOR` or `MINOR`
When the table is not in use cleaner can be enabled and all the files that needs cleaned will get queued up for cleaner. Disabling compaction do have performance implication on reads/writes as lot of delta file may need to be merged when performing read.

More details on the above commands and their variations available [here](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
4. Note that even though reads are protected admin operation like `TRUNCATE` `ALTER TABLE DROP COLUMN` and `DROP` have no protection as they clean files with intevention from cleaner. These operations should be performed when Spark is not using the table.

We are looking into removing this restriction, and hope to be able to fix this in the near future.

## Contributing

We use [Github Issues](https://github.com/qubole/spark-acid/issues) to track issues.

## Reporting bugs or feature requests

Please use the github issues for the spark-acid project to report issues or raise feature requests.
Please use the github issues for the acid-ds project to report issues or raise feature requests.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ excludeDependencies ++= Seq (

// orc
"org.apache.orc" % "orc-core",
"org.apache.orc" % "orc-mapreduce"
"org.apache.orc" % "orc-mapreduce",

"org.slf4j" % "slf4j-api"
)

// do not run test at assembly
Expand Down
8 changes: 8 additions & 0 deletions docker/beeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

name="spark-hiveacid-test-container"

docker exec -it $name bin/bash -c "\
. ~/.bashrc; \
export HADOOP_HOME=/hadoop; \
hive/bin/beeline -u jdbc:hive2://0.0.0.0:10001/default root root"
6 changes: 6 additions & 0 deletions docker/files/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,10 @@
<name>hive.auto.convert.join</name>
<value>false</value>
</property>

<property>
<name>hive.stats.autogather</name>
<value>false</value>
</property>

</configuration>
66 changes: 66 additions & 0 deletions docker/spark-shell
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash
if [ -z ${2} ]
then
echo "Specify the spark-acid jar location"
echo "spark-shell ~/codeline/TOT ~/codeline/TOT/acid-ds/target/scala-2.11/spark-acid-qds-assembly-0.4.3.jar"
exit
fi
if [ -z ${1} ]
then
echo "Specify and spark code base directory"
echo "spark-shell ~/codeline/TOT ~/codeline/TOT/acid-ds/target/scala-2.11/spark-acid-qds-assembly-0.4.3.jar"
exit
fi

shellenv() {
export QENV_LOCAL_CODELINE="${1}"
export QENV_LOCAL_CONF="${QENV_LOCAL}/conf"
export HADOOP_SRC="${QENV_LOCAL_CODELINE}/hadoop2"
export SPARK_SRC="${QENV_LOCAL_CODELINE}/spark"
export HUSTLER_SRC="${QENV_LOCAL_CODELINE}/hustler"
export HIVE_SRC="${QENV_LOCAL_CODELINE}/hive"
export ZEPPELIN_SRC="${QENV_LOCAL_CODELINE}/zeppelin"
}

hsnapshot() {
HADOOP_SNAPSHOT=`ls ${HADOOP_SRC}/hadoop-dist/target/hadoop* | grep SNAPSHOT: | cut -d':' -f1`
}

hivesnapshot() {
loc=`ls ${HIVE_SRC}/packaging/target/apache-hive* |grep bin |grep -v ':'`
HIVE_SNAPSHOT=${HIVE_SRC}/packaging/target/${loc}/${loc}/
}

run_spark_shelllocal() {

# Setup writest into spark-env file. Run spark-shell after it.
echo "Update Spark Conf based on Hadoop Build Version --> ${SPARK_SRC}/conf/spark-env.sh"
hsnapshot
hivesnapshot

str="export SPARK_YARN_USER_ENV=CLASSPATH=${QENV_LOCAL_CONF}/"
echo ${str} > ${SPARK_SRC}/conf/spark-env.sh

if [ -n "${HADOOP_SNAPSHOT}" ]
then

str="export SPARK_DIST_CLASSPATH=${QENV_LOCAL_CONF}/:${HADOOP_SNAPSHOT}/share/hadoop/common/lib/*:${HADOOP_SNAPSHOT}/share/hadoop/common/*:${HADOOP_SNAPSHOT}/share/hadoop/hdfs:${HADOOP_SNAPSHOT}/share/hadoop/hdfs/lib/*:${HADOOP_SNAPSHOT}/share/hadoop/hdfs/*:${HADOOP_SNAPSHOT}/share/hadoop/yarn/lib/*:${HADOOP_SNAPSHOT}/share/hadoop/yarn/*:${HADOOP_SNAPSHOT}/share/hadoop/mapreduce/*:/share/hadoop/tools:${HADOOP_SNAPSHOT}/share/hadoop/tools/lib/*:${HADOOP_SNAPSHOT}/share/hadoop/tools/*:/share/hadoop/qubole:${HADOOP_SNAPSHOT}/share/hadoop/qubole/*"
echo ${str} >> ${SPARK_SRC}/conf/spark-env.sh
fi

if [ -n "${HIVE_SNAPSHOT}" ]
then
str="export SPARK_DIST_CLASSPATH=\${SPARK_DIST_CLASSPATH}:${HIVE_SNAPSHOT}/lib/*"
echo ${str} >> ${SPARK_SRC}/conf/spark-env.sh
fi

str="export HADOOP_CONF_DIR=${QENV_LOCAL_CONF}/"
echo ${str} >> ${SPARK_SRC}/conf/spark-env.sh

$SPARK_SRC/bin/spark-shell $@
}


shellenv ${1}
shift
run_spark_shelllocal --jars $@ --conf spark.sql.extensions=com.qubole.spark.datasources.hiveacid.HiveAcidAutoConvertExtension --conf spark.hadoop.hive.metastore.uris=thrift://localhost:10000 --conf spark.sql.catalogImplementation=hive
17 changes: 15 additions & 2 deletions shaded-dependencies/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,27 @@ libraryDependencies ++= Seq(
"org.apache.hive" % "hive-jdbc" % hive_version intransitive(),
"org.apache.hive" % "hive-service" % hive_version intransitive(),
"org.apache.hive" % "hive-serde" % hive_version intransitive(),
"org.apache.hive" % "hive-common" % hive_version intransitive()
"org.apache.hive" % "hive-common" % hive_version intransitive(),

// To deal with hive3 metastore library 0.9.3 vs zeppelin thirft
// library version 0.9.1 conflict when runing Notebooks.
"org.apache.thrift" % "libfb303" % "0.9.3",
"org.apache.thrift" % "libthrift" % "0.9.3"
)

assemblyShadeRules in assembly := Seq(
ShadeRule.rename("org.apache.hadoop.hive.**" -> "com.qubole.shaded.hadoop.hive.@1").inAll,
ShadeRule.rename("org.apache.hive.**" -> "com.qubole.shaded.hive.@1").inAll,
ShadeRule.rename("org.apache.orc.**" -> "com.qubole.shaded.orc.@1").inAll,
ShadeRule.rename("com.google.**" -> "com.qubole.shaded.@1").inAll
ShadeRule.rename("org.apache.commons.**" -> "com.qubole.shaded.commons.@1").inAll,
ShadeRule.rename("org.apache.avro.**" -> "com.qubole.shaded.avro.@1").inAll,
ShadeRule.rename("org.apache.parquet.**" -> "com.qubole.shaded.parquet.@1").inAll,
ShadeRule.rename("org.apache.http.**" -> "com.qubole.shaded.http.@1").inAll,
ShadeRule.rename("org.apache.tez.**" -> "com.qubole.shaded.tez.@1").inAll,

ShadeRule.rename("com.google.**" -> "com.qubole.shaded.@1").inAll,
ShadeRule.rename("com.facebook.fb303.**" -> "com.qubole.shaded.facebook.fb303.@1").inAll,
ShadeRule.rename("org.apache.thrift.**" -> "com.qubole.shaded.thrift.@1").inAll
)

import sbtassembly.AssemblyPlugin.autoImport.ShadeRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
# limitations under the License.
#

com.qubole.spark.datasources.hiveacid.HiveAcidDataSource
com.qubole.spark.hiveacid.datasource.HiveAcidDataSource
Loading

0 comments on commit 2e66f76

Please sign in to comment.