Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support read and write from hive datasource #100

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,49 @@
}

data: {
# data source. optional of nebula,nebula-ngql,csv,json
# data source. optional of nebula,nebula-ngql,csv,json,hive
source: csv
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text,hive
sink: csv
# if your algorithm needs weight
hasWeight: false
}

# Hive related config
hive: {
# algo's data source from hive
read: {
#[Optional] spark and hive require configuration on different clusters
metaStoreUris: "thrift://hive-metastore-server-01:9083"
#spark sql
sql: "select column_1,column_2,column_3 from database_01.table_01 "
#[Optional] graph source vid mapping with column of sql result.
srcId: "column_1"
#[Optional] graph dest vid mapping with column of sql result
dstId: "column_2"
#[Optional] graph weight mapping with column of sql result
weight: "column_3"
}

# algo result sink into hive
write: {
#[Optional] spark and hive require configuration on different clusters
metaStoreUris: "thrift://hive-metastore-server-02:9083"
#save result to hive table
dbTableName: "database_02.table_02"
#[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite
saveMode: "Overwrite"
#[Optional] if auto create hive table. Default is true
autoCreateTable: true
#[Optional] algorithm result mapping with hive table column name. Default same with column name of algo result dataframe
resultTableColumnMapping: {
# Note: Different algorithms have different output fields, so let's take the pagerank algorithm for example:
_id: "column_1"
pagerank: "pagerank_value"
}
}
}

# NebulaGraph related config
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
Expand Down Expand Up @@ -78,7 +113,7 @@
# the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
# labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
# betweenness, graphtriangleCount, clusteringcoefficient, bfs, hanp, closeness, jaccard, node2vec]
executeAlgo: graphtrianglecount
executeAlgo: pagerank

# PageRank parameter
pagerank: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object Main {
val algoTime = System.currentTimeMillis()

// writer
saveAlgoResult(algoResult, configs)
saveAlgoResult(sparkConfig.spark, algoResult, configs)
val endTime = System.currentTimeMillis()

sparkConfig.spark.stop()
Expand Down Expand Up @@ -149,8 +149,8 @@ object Main {
}
}

private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = {
private[this] def saveAlgoResult(spark: SparkSession, algoResult: DataFrame, configs: Configs): Unit = {
val writer = AlgoWriter.make(configs)
writer.write(algoResult, configs)
writer.write(spark, algoResult, configs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.log4j.Logger
import scala.collection.JavaConverters._
import com.typesafe.config.{Config, ConfigFactory}
import com.vesoft.nebula.algorithm.config.Configs.readConfig
import com.vesoft.nebula.algorithm.config.Configs.getOrElse

import scala.collection.mutable

Expand Down Expand Up @@ -129,6 +130,46 @@ object LocalConfigEntry {
}
}


object HiveConfigEntry {
def apply(config: Config): HiveConfigEntry = {
//执行SQL
val sql: String = getOrElse(config,"hive.read.sql","")
//起点ID字段名称
val srcIdCol: String = getOrElse(config,"hive.read.srcId","")
//目标ID字段名称
val dstIdCol: String = getOrElse(config,"hive.read.dstId","")
//权重字段名称
val weightCol: String = getOrElse(config,"hive.read.weight","")
//hive元数据地址
val readMetaStoreUris: String = getOrElse(config,"hive.read.metaStoreUris","")
val readConfigEntry = HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol, readMetaStoreUris)

//写入hive表名:db.table
val dbTableName: String = getOrElse(config,"hive.write.dbTableName","")
//保存模式,见spark中的saveMode
val saveMode: String = getOrElse(config,"hive.write.saveMode","")
//是否自动建表
val autoCreateTable: Boolean = getOrElse(config,"hive.write.autoCreateTable",true)
//hive元数据地址
val writeMetaStoreUris: String = getOrElse(config,"hive.write.metaStoreUris","")
//执行结果和表字段映射关系,比如将算法结果中的_id映射为user_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the comment to English~

val resultColumnMapping = mutable.Map[String, String]()
val mappingKey = "hive.write.resultTableColumnMapping"
if (config.hasPath(mappingKey)) {
val mappingConfig = config.getObject(mappingKey)
for (subkey <- mappingConfig.unwrapped().keySet().asScala) {
val key = s"${mappingKey}.${subkey}"
val value = config.getString(key)
resultColumnMapping += subkey -> value
}
}
val writeConfigEntry = HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping, writeMetaStoreUris)

HiveConfigEntry(readConfigEntry, writeConfigEntry)
}
}

/**
* SparkConfigEntry support key-value pairs for spark session.
*
Expand Down Expand Up @@ -173,6 +214,35 @@ case class LocalConfigEntry(filePath: String,
}
}

case class HiveConfigEntry(hiveReadConfigEntry:HiveReadConfigEntry,
hiveWriteConfigEntry:HiveWriteConfigEntry) {
override def toString: String = {
s"HiveConfigEntry: {read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}"
}
}

case class HiveReadConfigEntry(sql: String,
srcIdCol: String = "srcId",
dstIdCol: String = "dstId",
weightCol: String,
metaStoreUris: String) {
override def toString: String = {
s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " +
s"weightCol:$weightCol, metaStoreUris:$metaStoreUris}"
}
}

case class HiveWriteConfigEntry(dbTableName: String,
saveMode: String,
autoCreateTable: Boolean,
resultColumnMapping: mutable.Map[String, String],
metaStoreUris: String) {
override def toString: String = {
s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " +
s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping, metaStoreUris=$metaStoreUris}"
}
}

/**
* NebulaConfigEntry
* @param readConfigEntry config for nebula-spark-connector reader
Expand Down Expand Up @@ -218,6 +288,7 @@ case class Configs(sparkConfig: SparkConfigEntry,
dataSourceSinkEntry: DataSourceSinkEntry,
nebulaConfig: NebulaConfigEntry,
localConfigEntry: LocalConfigEntry,
hiveConfigEntry: HiveConfigEntry,
algorithmConfig: AlgorithmConfigEntry)

object Configs {
Expand All @@ -237,10 +308,11 @@ object Configs {
val dataSourceEntry = DataSourceSinkEntry(config)
val localConfigEntry = LocalConfigEntry(config)
val nebulaConfigEntry = NebulaConfigEntry(config)
val sparkEntry = SparkConfigEntry(config)
val algorithmEntry = AlgorithmConfigEntry(config)
val hiveConfigEntry = HiveConfigEntry(config)
val sparkEntry = SparkConfigEntry(config)
val algorithmEntry = AlgorithmConfigEntry(config)

Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, algorithmEntry)
Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, hiveConfigEntry, algorithmEntry)
}

/**
Expand Down Expand Up @@ -277,15 +349,15 @@ object Configs {
}

/**
* Get the value from config by the path. If the path not exist, return the default value.
*
* @param config The config.
* @param path The path of the config.
* @param defaultValue The default value for the path.
*
* @return
*/
private[this] def getOrElse[T](config: Config, path: String, defaultValue: T): T = {
* Get the value from config by the path. If the path not exist, return the default value.
*
* @param config The config.
* @param path The path of the config.
* @param defaultValue The default value for the path.
*
* @return
*/
def getOrElse[T](config: Config, path: String, defaultValue: T): T = {
if (config.hasPath(path)) {
config.getAnyRef(path).asInstanceOf[T]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.vesoft.nebula.algorithm.config

import com.vesoft.nebula.algorithm.reader.ReaderType
import com.vesoft.nebula.algorithm.writer.WriterType
import org.apache.spark.sql.SparkSession

case class SparkConfig(spark: SparkSession, partitionNum: Int)
Expand All @@ -20,6 +22,13 @@ object SparkConfig {
sparkConfigs.foreach { case (key, value) =>
session.config(key, value)
}

val dataSource = configs.dataSourceSinkEntry
if (dataSource.source.equals(ReaderType.hive.stringify)
|| dataSource.sink.equals(WriterType.hive.stringify)) {
session.enableHiveSupport()
}

val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0")
val spark = session.getOrCreate()
validate(spark.version, "2.4.*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object DataReader {
case ReaderType.nebulaNgql => new NebulaNgqlReader
case ReaderType.nebula => new NebulaReader
case ReaderType.csv => new CsvReader
case ReaderType.hive => new HiveReader
}
.getOrElse(throw new UnsupportedOperationException("unsupported reader"))
}
Expand Down Expand Up @@ -179,3 +180,39 @@ final class JsonReader extends DataReader {
data
}
}
final class HiveReader extends DataReader {

override val tpe: ReaderType = ReaderType.hive
override def read(spark: SparkSession, configs: Configs, partitionNum: Int): DataFrame = {
val readConfig = configs.hiveConfigEntry.hiveReadConfigEntry
val sql = readConfig.sql
val srcIdCol = readConfig.srcIdCol
val dstIdCol = readConfig.dstIdCol
val weightCol = readConfig.weightCol

println(s"""hiveDataReader, srcIdCol:$srcIdCol, dstIdCol:$dstIdCol, weightCol:$weightCol""")

if (readConfig.metaStoreUris != null && readConfig.metaStoreUris.trim.nonEmpty) {
spark.conf.set("hive.metastore.schema.verification", false)
spark.conf.set("hive.metastore.uris", readConfig.metaStoreUris)
}

var data = spark.sql(sql)

if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) {
if (configs.dataSourceSinkEntry.hasWeight && weightCol != null && weightCol.trim.nonEmpty) {
data = data.select(srcIdCol, dstIdCol, weightCol)
} else {
data = data.select(srcIdCol, dstIdCol)
}
}

if (partitionNum != 0) {
data.repartition(partitionNum)
}

data.show(3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to show.


data
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ sealed trait ReaderType {
case ReaderType.nebulaNgql => "nebula-ngql"
case ReaderType.nebula => "nebula"
case ReaderType.csv => "csv"
case ReaderType.hive => "hive"
}
}
object ReaderType {
lazy val mapping: Map[String, ReaderType] = Map(
json.stringify -> json,
nebulaNgql.stringify -> nebulaNgql,
nebula.stringify -> nebula,
csv.stringify -> csv
csv.stringify -> csv,
hive.stringify -> hive
)
object json extends ReaderType
object nebulaNgql extends ReaderType
object nebula extends ReaderType
object csv extends ReaderType
object hive extends ReaderType
}
Loading
Loading