Skip to content

Commit

Permalink
Merged in SPAR-4319 (pull request qubole#25)
Browse files Browse the repository at this point in the history
fix: dev: SPAR-4319: Added blobstore commit marker while doing insert overwrite. Also set hive conf when reading from spark.

Approved-by: Amogh Margoor <[email protected]>
  • Loading branch information
Sourabh Goyal committed Jun 12, 2020
1 parent d07a726 commit e76547a
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package com.qubole.spark.hiveacid

import com.qubole.shaded.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.log4j.{Level, LogManager, Logger}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

class BlobstoreCommitMarkerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {

val log: Logger = LogManager.getLogger(this.getClass)
log.setLevel(Level.INFO)

var helper: TestHelper = _
val isDebug = true

val DEFAULT_DBNAME = "BlobstoreCommitMarkerDB"
val cols: Map[String, String] = Map(
("intCol","int"),
("doubleCol","double"),
("floatCol","float"),
("booleanCol","boolean")
)

override def beforeAll() {
try {
helper = new TestHelper
if (isDebug) {
log.setLevel(Level.DEBUG)
}
helper.init(isDebug)

// DB
helper.hiveExecute("DROP DATABASE IF EXISTS "+ DEFAULT_DBNAME +" CASCADE")
helper.hiveExecute("CREATE DATABASE "+ DEFAULT_DBNAME)
} catch {
case NonFatal(e) => log.info("failed " + e)
}
}

override protected def afterAll(): Unit = {
helper.hiveExecute("DROP DATABASE IF EXISTS "+ DEFAULT_DBNAME +" CASCADE")
helper.destroy()
}

test("Check for blobstore marker in insert overwrite into full acid partitioned table") {
val partitionedTableName = "partitionedTbl"
val partitionedTable = new Table(DEFAULT_DBNAME, partitionedTableName, cols, Table.orcPartitionedFullACIDTable,
isPartitioned = true)

def code(): Unit = {
helper.recreate(partitionedTable)
helper.sparkSQL(partitionedTable.insertOverwriteSparkTableKeyRange(1,1))
val metadata = helper.getTableMetadata(partitionedTable)
val basePath = metadata.rootPath
val fs = FileSystem.get(basePath.toUri, helper.spark.sessionState.newHadoopConf())
for (i <- metadata.getRawPartitions()) {
val partitionPath = new Path(basePath, i.getName)
log.info(s"partition path to check for blobstore marker commit: ${partitionPath}")
val iter = fs.listFiles(partitionPath, true)
val pathStatuses = new ArrayBuffer[Path]
while (iter.hasNext) {
pathStatuses.append(iter.next.getPath)
}
val filteredPaths = fs.listStatus(pathStatuses.toArray, new PathFilter {
override def accept(path: Path): Boolean = {
log.info(s"path to filter is: ${path}")
val filter = (path.getParent.getName.startsWith("base_") &&
path.getName == AcidUtils.BlobstoreCommitMarker.BLOBSTORE_COMMIT_MARKER)
filter
}
})
val filteredPathString = filteredPaths.mkString(",")
log.info(s"filteredPaths: ${filteredPathString}")

assert(filteredPaths.size > 0, s"blobstore commit marker was not written in " +
s"${filteredPathString}")
}
}
helper.myRun(s"check for blob store commit marker " +
s" when insert overwrite into partitioned table: ${partitionedTable}", code)

}

test("check for blobstore marker for insert overwrite for non partitioned table") {
val nonPartitionedTableName = "nonPartitionedTbl"
val nonPartitionedTable = new Table(DEFAULT_DBNAME, nonPartitionedTableName, cols, Table.orcFullACIDTable,
isPartitioned = false)

def code(): Unit = {
helper.recreate(nonPartitionedTable)
helper.sparkSQL(nonPartitionedTable.insertOverwriteSparkTableKeyRange(1,1))
val metadata = helper.getTableMetadata(nonPartitionedTable)
val basePath = metadata.rootPath
val fs = FileSystem.get(basePath.toUri, helper.spark.sessionState.newHadoopConf())

log.info(s"partition path to check for blobstore marker commit: ${basePath}")
val iter = fs.listFiles(basePath, true)
val pathStatuses = new ArrayBuffer[Path]
while (iter.hasNext) {
pathStatuses.append(iter.next.getPath)
}
val filteredPaths = fs.listStatus(pathStatuses.toArray, new PathFilter {
override def accept(path: Path): Boolean = {
log.info(s"path to filter is: ${path}")
val filter = (path.getParent.getName.startsWith("base_") &&
path.getName == AcidUtils.BlobstoreCommitMarker.BLOBSTORE_COMMIT_MARKER)
filter
}
})
val filteredPathString = filteredPaths.mkString(",")
log.info(s"filteredPaths: ${filteredPathString}")

assert(filteredPaths.size > 0, s"blobstore commit marker was not written in ${filteredPathString}")
}
helper.myRun(s"check for blob store commit marker " +
s" when insert overwrite into non partitioned table: ${nonPartitionedTableName}", code)
}


test("Check blobstore marker not present in insert into full acid partitioned table") {
val partitionedTableName = "partitionedTbl"
val partitionedTable = new Table(DEFAULT_DBNAME, partitionedTableName, cols, Table.orcPartitionedFullACIDTable,
isPartitioned = true)

def code(): Unit = {
helper.recreate(partitionedTable)
helper.sparkSQL(partitionedTable.insertIntoSparkTableKeyRange(1,1))
val metadata = helper.getTableMetadata(partitionedTable)
val basePath = metadata.rootPath
val fs = FileSystem.get(basePath.toUri, helper.spark.sessionState.newHadoopConf())
for (i <- metadata.getRawPartitions()) {
val partitionPath = new Path(basePath, i.getName)
log.info(s"partition path to check for blobstore marker commit: ${partitionPath}")
val iter = fs.listFiles(partitionPath, true)
val pathStatuses = new ArrayBuffer[Path]
while (iter.hasNext) {
pathStatuses.append(iter.next.getPath)
}
val filteredPaths = fs.listStatus(pathStatuses.toArray, new PathFilter {
override def accept(path: Path): Boolean = {
log.info(s"path to filter is: ${path}")
val filter = (path.getParent.getName.startsWith("delta_") &&
path.getName == AcidUtils.BlobstoreCommitMarker.BLOBSTORE_COMMIT_MARKER)
filter
}
})
val filteredPathString = filteredPaths.mkString(",")
log.info(s"filteredPaths: ${filteredPathString}")

assert(filteredPaths.size == 0, s"blobstore commit marker should not be written in " +
s"${filteredPathString}")
}
}
helper.myRun(s"check that blob store commit marker is not present" +
s" when insert into partitioned table: ${partitionedTable}", code)

}
}
3 changes: 3 additions & 0 deletions src/it/scala/com/qubole/spark/hiveacid/Table.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.qubole.spark.hiveacid

import com.qubole.spark.hiveacid.hive.HiveAcidMetadata
import org.joda.time.DateTime

import scala.collection.mutable
Expand Down Expand Up @@ -55,6 +56,8 @@ class Table (
}
}

val fullyQualifiedTableName: String = s"${dbName}.${tName}"

// NB Add date column as well apparently always in the end
private def getRow(key: Int): String = colMapWithPartitionedCols.map(x => {
x._2 match {
Expand Down
5 changes: 5 additions & 0 deletions src/it/scala/com/qubole/spark/hiveacid/TestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package com.qubole.spark.hiveacid
import java.net.URLClassLoader
import java.net.URL

import com.qubole.spark.hiveacid.hive.HiveAcidMetadata
import org.apache.log4j.{Level, LogManager, Logger}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
Expand Down Expand Up @@ -349,6 +350,10 @@ class TestHelper extends SQLImplicits {
log.info(s"Class: $className found in following ${requiredJars.length} jars:")
requiredJars.foreach(uri => log.info(uri.toString))
}

def getTableMetadata(table: Table): HiveAcidMetadata = {
HiveAcidMetadata.fromSparkSession(spark, table.fullyQualifiedTableName)
}
}

object TestHelper {
Expand Down
10 changes: 10 additions & 0 deletions src/main/scala/com/qubole/spark/hiveacid/SparkAcidConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ case class SparkAcidConf(@transient sparkSession: SparkSession, @transient param
val lockNumRetries = getConf(SparkAcidConf.LOCK_NUM_RETRIES)
val metastorePartitionPruningEnabled = sparkSession.sessionState.conf.metastorePartitionPruning
val includeRowIds = parameters.getOrElse("includeRowIds", "false").toBoolean
val readerUseBlobStoreCommitMarker = getConf(SparkAcidConf.READER_USE_BLOBSTORE_COMMIT_MARKER)

def getConf[T](configEntry: SparkAcidConfigEntry[T]): T = {
val value = configMap.getOrElse(configEntry.configName, configEntry.defaultValue)
Expand All @@ -89,6 +90,15 @@ object SparkAcidConf {
" On enabling Spark readers will be used to read the Hive Table readers")
.create()

val READER_USE_BLOBSTORE_COMMIT_MARKER = SparkAcidConfigBuilder[Boolean](
"spark.sql.hiveAcid.reader.useBlobStoreCommitMarker")
.defaultValue("true")
.converter(toBoolean)
.description("When enabled, acid reader (be it native spark or hive acid reader) checks " +
"the presence of blobstore commit marker in the directory to check " +
"if that directory is eligible for read")
.create()

val MAX_SLEEP_BETWEEN_LOCK_RETRIES = SparkAcidConfigBuilder[Long]("spark.hiveAcid.lock.max.sleep.between.retries")
.defaultValue("60000")
.converter(toLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ extends CastSupport with Reader with Logging {
hiveAcidMetadata,
readerOptions.requiredNonPartitionedColumns)

if(readerOptions.readConf.readerUseBlobStoreCommitMarker) {
readerOptions.hadoopConf.setBoolean(
ConfVars.HIVE_QUBOLE_ACID_READ_BLOBSTORE_COMMIT_MARKER.varname, true)
// besides property HIVE_QUBOLE_ACID_READ_BLOBSTORE_COMMIT_MARKER, currently hive lib expects
// "hive.allow.move.on.s3" to be false to use blob store commit marker
// so explicitly setting to false here irrespective of what is set in spark
readerOptions.hadoopConf.setBoolean("hive.allow.move.on.s3", false)
}

logDebug(s"sarg.pushdown: " +
s"${readerOptions.hadoopConf.get("sarg.pushdown")}," +
s"hive.io.file.readcolumn.names: " +
Expand All @@ -138,6 +147,11 @@ extends CastSupport with Reader with Logging {
def makeRDDForPartitionedTable(hiveAcidMetadata: HiveAcidMetadata,
partitions: Seq[ReaderPartition]): RDD[InternalRow] = {

if (readerOptions.readConf.readerUseBlobStoreCommitMarker) {
readerOptions.hadoopConf.setBoolean(
ConfVars.HIVE_QUBOLE_ACID_READ_BLOBSTORE_COMMIT_MARKER.varname, true)
readerOptions.hadoopConf.setBoolean("hive.allow.move.on.s3", false)
}
val partitionToDeserializer = partitions.map(p => p.ptn.asInstanceOf[HiveJarPartition]).map {
part =>
val deserializerClassName = part.getTPartition.getSd.getSerdeInfo.getSerializationLib
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,53 @@ abstract private[writer] class HiveAcidWriter(val options: WriterOptions,
fileSinkConf.setDirName(new Path(HiveAcidOptions.rootPath))
fileSinkConf
}

lazy val acidOutputFormatOptions = new AcidOutputFormat.Options(jobConf)
.writingBase(options.operationType == HiveAcidOperation.INSERT_OVERWRITE)
.minimumWriteId(fileSinkConf.getTableWriteId)
.maximumWriteId(fileSinkConf.getTableWriteId)
.statementId(fileSinkConf.getStatementId)

private def retry(retryCount: Int)(fn: => Unit, fnName: String): Unit = {
try {
logInfo(s"Executing: ${fnName}")
fn
} catch {
case e: Exception =>
if (retryCount > 1 ) {
logWarning(s"Exception occurred while executing: ${fnName}, retrying")
retry(retryCount - 1)(fn, fnName)
} else {
throw e
}
}
}

def close(): Unit = {
if (options.operationType == HiveAcidOperation.INSERT_OVERWRITE) {
writers.keys.foreach { case (pathString, _, bucketId) =>
// create a blobstore marker
val path = new Path(pathString)
val ouptputformatOptions = acidOutputFormatOptions.bucket(bucketId)
val fs = path.getFileSystem(jobConf)
val blobStoreCommitMarkerBasePath = AcidUtils.
createFilename(path, ouptputformatOptions).getParent

def f(): Unit = {
val blobStoreCommitMarker = new Path(blobStoreCommitMarkerBasePath,
AcidUtils.BlobstoreCommitMarker.BLOBSTORE_COMMIT_MARKER)
if (!fs.exists(blobStoreCommitMarker)) {
AcidUtils.BlobstoreCommitMarker.
createBlobstoreCommitMarker(
blobStoreCommitMarkerBasePath, fs)
}
}

retry(3)(f, "write blob store commit marker to" +
s" ${blobStoreCommitMarkerBasePath.toString}")
}
}
}
}

/**
Expand Down Expand Up @@ -191,13 +238,6 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,
Reporter.NULL,
rowIdColNum)

val acidOutputFormatOptions = new AcidOutputFormat.Options(jobConf)
.writingBase(options.operationType == HiveAcidOperation.INSERT_OVERWRITE)
.bucket(acidBucketId)
.minimumWriteId(fileSinkConf.getTableWriteId)
.maximumWriteId(fileSinkConf.getTableWriteId)
.statementId(fileSinkConf.getStatementId)

val (createDelta, createDeleteDelta) = options.operationType match {
case HiveAcidOperation.INSERT_INTO | HiveAcidOperation.INSERT_OVERWRITE => (true, false)
case HiveAcidOperation.UPDATE => (true, true)
Expand Down Expand Up @@ -228,6 +268,7 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,
if (createDeleteDelta) {
createVersionFile(acidOutputFormatOptions.writingDeleteDelta(true))
}

recordUpdater
}

Expand Down Expand Up @@ -280,15 +321,18 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,
}
}

def close(): Unit = {
writers.foreach( x => try {
// TODO: Seems the boolean value passed into close does not matter.
x._2.asInstanceOf[RecordUpdater].close(false)
override def close(): Unit = {
try {
super.close()
} finally {
writers.foreach( x => try {
// TODO: Seems the boolean value passed into close does not matter.
x._2.asInstanceOf[RecordUpdater].close(false)
} catch {
case e: Exception =>
logError("Unable to close " + x._2 + " due to: " + e.getMessage)
})
}
catch {
case e: Exception =>
logError("Unable to close " + x._2 + " due to: " + e.getMessage)
})
}
}

Expand All @@ -308,13 +352,6 @@ private[writer] class HiveAcidInsertOnlyWriter(options: WriterOptions,
override protected def createWriter(path: Path, acidBucketId: Int): Any = {
val outputClass = sparkHiveRowConverter.serializer.getSerializedClass

val acidOutputFormatOptions = new AcidOutputFormat.Options(jobConf)
.writingBase(options.operationType == HiveAcidOperation.INSERT_OVERWRITE)
.bucket(acidBucketId)
.minimumWriteId(fileSinkConf.getTableWriteId)
.maximumWriteId(fileSinkConf.getTableWriteId)
.statementId(fileSinkConf.getStatementId)

// FIXME: Hack to remove bucket prefix for Insert only table.
var fullPathStr = AcidUtils.createFilename(path,
acidOutputFormatOptions).toString.replace("bucket_", "")
Expand Down Expand Up @@ -357,15 +394,18 @@ private[writer] class HiveAcidInsertOnlyWriter(options: WriterOptions,
}
}

def close(): Unit = {
writers.foreach( x => try {
// TODO: Seems the boolean value passed into close does not matter.
x._2.asInstanceOf[RecordWriter].close(false)
override def close(): Unit = {
try {
super.close()
} finally {
writers.foreach(x => try {
// TODO: Seems the boolean value passed into close does not matter.
x._2.asInstanceOf[RecordWriter].close(false)
} catch {
case e: Exception =>
logError("Unable to close " + x._2 + " due to: " + e.getMessage)
})
}
catch {
case e: Exception =>
logError("Unable to close " + x._2 + " due to: " + e.getMessage)
})
}

}
Expand Down

0 comments on commit e76547a

Please sign in to comment.