Open
Description
There are two ways to configure the project:
- by the configuration file application.conf
- by args of spark-submit command
The configuration file(application.conf) template is as follows:
// spark conf
spark {
streaming.trigger.time.interval.msec = 1000
streaming.future.task.timeout.msec = 300000
show.table.numRows = 100
show.table.truncate = true
redis.host = 127.0.0.1
redis.port = 6379
redis.db = 4
// redis.auth =
// redis.timeout =
// redis.max.pipeline.size =
// redis.scan.count =
}
sqlalarm {
// event sources, can more than one
sources = "kafka,redis"
// alarm event input source conf
input {
kafka {
topic = "sqlalarm_event"
subscribe.topic.pattern = 1
bootstrap.servers = "127.0.0.1:9092"
group = "sqlalarm_group"
}
}
// alarm sink, can more than one
sinks = "console,kafka,jdbc"
// alarm record sink canal conf
output {
kafka {
}
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
driver = "com.mysql.jdbc.Driver"
user = "xxx"
password = "xxx"
}
}
checkpointLocation = "checkpoint"
// alarm alert conf, use rest api usually
alert {
pigeonApi = "https://dt.sqlclub/api/pigeon"
}
}
Submitted by spark-submit command as follows:
--conf "spark.streaming.trigger.time.interval.msec=1000"
--conf "spark.streaming.future.task.timeout.msec=300000"
--conf "spark.show.table.numRows=100"
--conf "spark.show.table.truncate=true"
--conf "spark.redis.host=127.0.0.1"
--conf "spark.redis.port=6379"
--conf "spark.redis.db=4"
-sqlalarm.name sqlalarm
-sqlalarm.sources kafka
-sqlalarm.input.kafka.topic sqlalarm_event
-sqlalarm.input.kafka.subscribe.topic.pattern 1
-sqlalarm.input.kafka.bootstrap.servers "127.0.0.1:9092"
-sqlalarm.sinks "console,kafka,jdbc"
-sqlalarm.output.kafka.topic sqlalarm_output
-sqlalarm.output.kafka.bootstrap.servers "127.0.0.1:9092"
-sqlalarm.alert.pigeonApi "https://dt.sqlclub/api/pigeon"
Complete example:
spark-submit --class dt.sql.alarm.SQLAlarmBoot \
--driver-memory 2g \
--master "local[4]" \
--name SQLALARM \
--conf "spark.sql.hive.thriftServer.singleSession=true" \
--conf "spark.kryoserializer.buffer=256k" \
--conf "spark.kryoserializer.buffer.max=1024m" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.scheduler.mode=FAIR" \
--conf "spark.show.table.numRows=100" \
--conf "spark.show.table.truncate=true" \
--conf "spark.streaming.trigger.time.interval.msec=1000" \
--conf "spark.redis.host=127.0.0.1" \
--conf "spark.redis.port=6379" \
--conf "spark.redis.db=4" \
sa-core-1.0-SNAPSHOT.jar \
-sqlalarm.name sqlalarm \
-sqlalarm.sources kafka \
-sqlalarm.input.kafka.topic sqlalarm_event \
-sqlalarm.input.kafka.subscribe.topic.pattern 1 \
-sqlalarm.input.kafka.bootstrap.servers "127.0.0.1:9092" \
-sqlalarm.sinks console
notes:
spark.streaming.trigger.time.interval.msec (The periodic interval of the stream, in milliseconds)
spark.streaming.future.task.timeout.msec (Timeout of sink and alert task, in milliseconds)
spark.show.table.numRows (Table display rows of console sink)
spark.show.table.truncate (Whether truncate long strings at console sink )