From fd9afc31f6a783891461b344cef89e7f405acf24 Mon Sep 17 00:00:00 2001 From: nailixing Date: Wed, 10 Jun 2020 22:43:54 +0800 Subject: [PATCH] Add Monitor --- dockerfiles/logstash.Dockerfile | 39 ++++ dockerfiles/spark.Dockerfile | 23 ++ log_minitor/log_minitor.iml | 2 + log_minitor/pom.xml | 111 ++++++++++ .../src/main/resources/log4j.properties | 8 + .../main/scala/com/singa/auto/Config.scala | 48 +++++ .../auto/common/configuration/SparkCfg.scala | 33 +++ .../com/singa/auto/common/store/Kafka.scala | 40 ++++ .../com/singa/auto/common/udf/SparkUdfs.scala | 58 +++++ .../auto/monitor/processer/BaseProcess.scala | 63 ++++++ .../auto/monitor/processer/FiledsParse.scala | 52 +++++ .../auto/monitor/processer/LogEvents.scala | 56 +++++ .../auto/monitor/stream/BaseStream.scala | 85 ++++++++ .../monitor/stream/LogStreamProcess.scala | 73 +++++++ scripts/config/logstash.conf | 18 ++ scripts/docker_swarm/.env.sh | 13 ++ scripts/docker_swarm/build_images.sh | 4 + scripts/docker_swarm/start_monitor.sh | 58 +++++ scripts/docker_swarm/stop.sh | 9 + scripts/kubernetes/.env.sh | 17 ++ scripts/kubernetes/build_images.sh | 8 + scripts/kubernetes/create_config.py | 201 ++++++++++++++++++ scripts/kubernetes/generate_config.sh | 17 +- scripts/kubernetes/remove_config.sh | 9 +- scripts/kubernetes/replace_services.sh | 14 ++ scripts/kubernetes/spark-rbac.yaml | 33 +++ scripts/kubernetes/start.sh | 3 + scripts/kubernetes/start_monitor.sh | 74 +++++++ 28 files changed, 1167 insertions(+), 2 deletions(-) create mode 100644 dockerfiles/logstash.Dockerfile create mode 100644 dockerfiles/spark.Dockerfile create mode 100644 log_minitor/log_minitor.iml create mode 100644 log_minitor/pom.xml create mode 100644 log_minitor/src/main/resources/log4j.properties create mode 100644 log_minitor/src/main/scala/com/singa/auto/Config.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/common/configuration/SparkCfg.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/common/store/Kafka.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/common/udf/SparkUdfs.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/monitor/processer/BaseProcess.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/monitor/processer/FiledsParse.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/monitor/processer/LogEvents.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/monitor/stream/BaseStream.scala create mode 100644 log_minitor/src/main/scala/com/singa/auto/monitor/stream/LogStreamProcess.scala create mode 100644 scripts/config/logstash.conf create mode 100644 scripts/docker_swarm/start_monitor.sh create mode 100644 scripts/kubernetes/spark-rbac.yaml create mode 100644 scripts/kubernetes/start_monitor.sh diff --git a/dockerfiles/logstash.Dockerfile b/dockerfiles/logstash.Dockerfile new file mode 100644 index 00000000..2b20f51a --- /dev/null +++ b/dockerfiles/logstash.Dockerfile @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +FROM logstash:7.7.0 + +MAINTAINER NailiXing + + +RUN /usr/share/logstash/bin/logstash-plugin install logstash-input-http_poller +RUN /usr/share/logstash/bin/logstash-plugin install logstash-input-exec +RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-json_encode + + +EXPOSE 9600 5044 + +ARG LOGSTASH_DOCKER_WORKDIR_PATH + +WORKDIR $LOGSTASH_DOCKER_WORKDIR_PATH + +COPY scripts/config/logstash.conf $LOGSTASH_DOCKER_WORKDIR_PATH/logstash.conf + +COPY ./logs $LOGSTASH_DOCKER_WORKDIR_PATH/logs + +CMD bin/logstash -f logstash.conf diff --git a/dockerfiles/spark.Dockerfile b/dockerfiles/spark.Dockerfile new file mode 100644 index 00000000..70a49a1b --- /dev/null +++ b/dockerfiles/spark.Dockerfile @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +FROM gcr.io/spark-operator/spark:v2.4.5 + +ARG SPAEK_DOCKER_JARS_PATH +COPY log_minitor/target/log_minitor-jar-with-dependencies.jar $SPAEK_DOCKER_JARS_PATH/log_minitor-jar-with-dependencies.jar +COPY log_minitor/src/main/resources/log4j.properties /opt/spark/conf/log4j.properties diff --git a/log_minitor/log_minitor.iml b/log_minitor/log_minitor.iml new file mode 100644 index 00000000..78b2cc53 --- /dev/null +++ b/log_minitor/log_minitor.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/log_minitor/pom.xml b/log_minitor/pom.xml new file mode 100644 index 00000000..9782b182 --- /dev/null +++ b/log_minitor/pom.xml @@ -0,0 +1,111 @@ + + + 4.0.0 + + org.example + log_minitor + 1.0-SNAPSHOT + + + UTF-8 + 2.4.5 + 2.11 + 2.11.12 + 7.7.1 + + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.apache.spark + spark-streaming_${scala_spark_core.version} + ${spark.version} + + + + org.apache.spark + spark-sql_${scala_spark_core.version} + ${spark.version} + compile + + + + org.apache.spark + spark-streaming-kafka-0-10_${scala_spark_core.version} + ${spark.version} + + + + org.elasticsearch + elasticsearch-hadoop + ${elasticsearch.version} + + + + + + log_minitor + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + compile-scala + compile + + add-source + compile + + + + test-compile-scala + test-compile + + add-source + testCompile + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.5.5 + + + jar-with-dependencies + + + + LogStreamProcess + log4j.properties + + + + + make-assembly + package + + single + + + + + + + + + + + diff --git a/log_minitor/src/main/resources/log4j.properties b/log_minitor/src/main/resources/log4j.properties new file mode 100644 index 00000000..159e1eaa --- /dev/null +++ b/log_minitor/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +log4j.rootLogger=ERROR, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=target/spring.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n \ No newline at end of file diff --git a/log_minitor/src/main/scala/com/singa/auto/Config.scala b/log_minitor/src/main/scala/com/singa/auto/Config.scala new file mode 100644 index 00000000..bb6ee2dd --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/Config.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.singa.auto + +object Config { + + // spark table view + val LogViewTable = "logs" + // model +// val SPARK_APP_NAME = "logs_process" +// val RUNNING_MODEL="local[*]" + + // kafka config + val TOPIC = "sa-logs" + val BROKER = "singa-auto-kafka:9092" + val CONSUME_GROUP = "log_consumer"; + + // es config + val ES_HOST = "elasticsearch"; + val ES_PORT = "9200"; + val ES_INDEX = "spark/logs" + + // spark stream time between 2 pull + val TIME = "2" + + // don't store below log message at es + val FilterMsg:Array[String] = Array("kafka.conn INFO", "kafka.consumer.subscription_state INFO") + + + +} diff --git a/log_minitor/src/main/scala/com/singa/auto/common/configuration/SparkCfg.scala b/log_minitor/src/main/scala/com/singa/auto/common/configuration/SparkCfg.scala new file mode 100644 index 00000000..2a4e34cd --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/common/configuration/SparkCfg.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.singa.auto.common.configuration + +import org.apache.spark.SparkConf + +object SparkCfg { + + def setConf(sparkConf: SparkConf):SparkConf={ + sparkConf.set("spark.dynamicAllocation.enabled", "false"); + sparkConf.set("spark.streaming.kafka.consumer.poll.ms", "2000"); + sparkConf.set("spark.streaming.unpersist", "true") + sparkConf.set("spark.cleaner.ttl", "7200s") + sparkConf + } +} diff --git a/log_minitor/src/main/scala/com/singa/auto/common/store/Kafka.scala b/log_minitor/src/main/scala/com/singa/auto/common/store/Kafka.scala new file mode 100644 index 00000000..d6d8f62b --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/common/store/Kafka.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.singa.auto.common.store + +import org.apache.kafka.common.serialization.StringDeserializer + +object Kafka { + + def getKafkaParams(brokers: String, kafkaConsumerGroup: String): Map[String, Object] = { + val kafkaParams = Map[String, Object]( + "bootstrap.servers" -> brokers, + "key.deserializer" -> classOf[StringDeserializer], + "value.deserializer" -> classOf[StringDeserializer], + "auto.offset.reset" -> "latest", // earliest,latest + "group.id" -> kafkaConsumerGroup, + "session.timeout.ms" -> (120 * 1000: java.lang.Integer), + "request.timeout.ms" -> (150 * 1000: java.lang.Integer), + "enable.auto.commit" -> (true: java.lang.Boolean)) + + kafkaParams + } + +} diff --git a/log_minitor/src/main/scala/com/singa/auto/common/udf/SparkUdfs.scala b/log_minitor/src/main/scala/com/singa/auto/common/udf/SparkUdfs.scala new file mode 100644 index 00000000..11dd5562 --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/common/udf/SparkUdfs.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.singa.auto.common.udf + +import com.singa.auto.Config +import org.apache.spark.sql.SparkSession + +class SparkUdfs(spark: SparkSession) extends Serializable { + + def regist(): SparkSession = { + + spark.udf.register("parserMsg", + (message: String)=> + { + message + } + ) + + spark.udf.register("parserName", + (path: String)=> + { + path.split("/").last + } + ) + + spark.udf.register("filterMsg", + (message: String) => { + var isvalid: String = "valid"; + for (msg <- Config.FilterMsg) { + if (message.contains(msg)) { + isvalid = "invalid" + } + } + isvalid + } + ) + + spark + + } +} diff --git a/log_minitor/src/main/scala/com/singa/auto/monitor/processer/BaseProcess.scala b/log_minitor/src/main/scala/com/singa/auto/monitor/processer/BaseProcess.scala new file mode 100644 index 00000000..2fe7a2ed --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/monitor/processer/BaseProcess.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.singa.auto.monitor.processer +import com.singa.auto.Config +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.elasticsearch.spark.sql._ + +abstract class BaseProcess extends Serializable { + + def filter(k: String):Boolean={ + true + } + + def sparkSQl(dataFrame: DataFrame):String ={ + "" + } + + def tempView(table: String): String={ + (table.replace(".", "_") + "_VIEW").toUpperCase; + + } + + def process(spark: SparkSession, dataFrame: DataFrame, table: String, data:String):Unit={ + try{ + + dataFrame.createOrReplaceTempView(tempView(table)); + + val sql = sparkSQl(dataFrame: DataFrame) + + val datas = spark.sql(sql) + + println("-------------Saving Dataframe to ES-------------") + datas.show(false) + datas.saveToEs(Config.ES_INDEX) + + } + catch { + case ex: Exception => + println("\n Exception in processing SparkSQL:" + sparkSQl(dataFrame: DataFrame) + "\n") + println("\n Exception in processing SparkSQL:" + ex.getMessage + "\n") + + } + + } + +} diff --git a/log_minitor/src/main/scala/com/singa/auto/monitor/processer/FiledsParse.scala b/log_minitor/src/main/scala/com/singa/auto/monitor/processer/FiledsParse.scala new file mode 100644 index 00000000..32c2db32 --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/monitor/processer/FiledsParse.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.singa.auto.monitor.processer + +import org.apache.spark.sql.DataFrame + +import scala.util.Try + +object FiledsParse { + + def getFileName(dataFrame: DataFrame):String = { + + if (hasColumn(dataFrame, "path")){ + val filedSql = "parserName(path) as path" + filedSql + } + else "path" + + } + + def getMessage(dataFrame: DataFrame):String = { + if (hasColumn(dataFrame, "message")){ + val filedSql = "parserMsg(message) as message" + filedSql + } + else "message" + } + + def filterMessage(dataFrame: DataFrame):String ={ + "filterMsg(message) = 'valid' " + } + + def hasColumn(df: DataFrame, path: String):Boolean = Try(df(path)).isSuccess + + +} diff --git a/log_minitor/src/main/scala/com/singa/auto/monitor/processer/LogEvents.scala b/log_minitor/src/main/scala/com/singa/auto/monitor/processer/LogEvents.scala new file mode 100644 index 00000000..9ad2bf5b --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/monitor/processer/LogEvents.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.singa.auto.monitor.processer + +import com.singa.auto.Config +import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.{DataFrame, SparkSession} + +class LogEvents(spark: SparkSession, dataFrame: DataFrame) extends BaseProcess { + + override def sparkSQl(dataFrame: DataFrame) :String = { + + var sqlBuffer = new StringBuffer() + + sqlBuffer.append("SELECT ") + sqlBuffer.append(FiledsParse.getFileName(dataFrame)).append(",") + sqlBuffer.append(FiledsParse.getMessage(dataFrame)) + sqlBuffer.append(" FROM ") + sqlBuffer.append(tempView(LogEvents.TABLE)) + sqlBuffer.append(" WHERE ") + sqlBuffer.append(FiledsParse.filterMessage(dataFrame)) + + println("Using SQL",sqlBuffer.toString) + + sqlBuffer.toString + } +} + + +object LogEvents { + val TABLE:String = Config.LogViewTable; + + def stream(spark: SparkSession, dataFrame: DataFrame): Unit = { + println("-------------stream-Events-------------") + val app = new LogEvents(spark, dataFrame); + app.process(spark, dataFrame,TABLE, null) + + } +} + diff --git a/log_minitor/src/main/scala/com/singa/auto/monitor/stream/BaseStream.scala b/log_minitor/src/main/scala/com/singa/auto/monitor/stream/BaseStream.scala new file mode 100644 index 00000000..6d021fe6 --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/monitor/stream/BaseStream.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.singa.auto.monitor.stream +import com.singa.auto.common.store.Kafka +import com.singa.auto.common.udf.SparkUdfs +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils} + + +abstract class BaseStream extends Serializable { + + def createSparkStreamContext( + sparkConf: SparkConf, + topics: String, + brokers:String, + kafkaConsumerGroup:String, + batchTime: Long): Unit = { + val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(batchTime)) + + val topicsSet = topics.split(",").toSet + val kafkaParams = Kafka.getKafkaParams(brokers, kafkaConsumerGroup) + + // Create kafka stream. + val messages = KafkaUtils.createDirectStream[String, String]( + ssc, + PreferConsistent, + Subscribe[String, String](topicsSet, kafkaParams) + ) + + // Create spark session for each instance + var spark = SparkSession.builder.config(sparkConf).getOrCreate() + spark = new SparkUdfs(spark).regist() + spark.sparkContext.setLogLevel("WARN") + + val dStream = messages.map(_.value()) + + consumer(spark, dStream) + + // update offset + messages.foreachRDD { + rdd => + val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + offsetRanges.foreach { x => + println("kafka message ==> partition:" + x.partition + "--from: " + x.fromOffset + "--to: " + x.untilOffset) + } + // some time later, after outputs have completed + messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) + } + + ssc.start() + ssc.awaitTermination() + } + + /** + * consumer method + * @param spark:SparkSession + * @param dStream: stream message + */ + def consumer(spark:SparkSession, dStream: DStream[String]): Unit={ + + } + +} diff --git a/log_minitor/src/main/scala/com/singa/auto/monitor/stream/LogStreamProcess.scala b/log_minitor/src/main/scala/com/singa/auto/monitor/stream/LogStreamProcess.scala new file mode 100644 index 00000000..229797bc --- /dev/null +++ b/log_minitor/src/main/scala/com/singa/auto/monitor/stream/LogStreamProcess.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.singa.auto.monitor.stream +import com.singa.auto.Config +import com.singa.auto.common.configuration.SparkCfg +import com.singa.auto.monitor.processer.LogEvents +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.streaming.dstream.DStream + +class LogStreamProcess extends BaseStream { + + override + def consumer(spark:SparkSession, dStream: DStream[String]): Unit= { + println("LogToDwStream ---- consumer") + dStream.print() + dStream.foreachRDD{ + rdd => + if (!rdd.isEmpty()){ + val logs: DataFrame = spark.read.json(rdd) + logs.show(false) + LogEvents.stream(spark, logs) + } + } + } +} + +object LogStreamProcess { + +// val SPARK_APP_NAME:String = Config.SPARK_APP_NAME +// val RUNNING_MODEL:String = Config.RUNNING_MODEL + + val TOPIC:String = Config.TOPIC + val BROKER:String = Config.BROKER + val CONSUME_GROUP:String = Config.CONSUME_GROUP + val ES_HOST:String = Config.ES_HOST + val ES_PORT:String = Config.ES_PORT + val TIME:String = Config.TIME + + def main(args: Array[String]):Unit ={ + + println("In LogStreamProcess") +// var sparkConf = new SparkConf().setMaster(RUNNING_MODEL).setAppName(SPARK_APP_NAME); + var sparkConf = new SparkConf() + + sparkConf.set("spark.es.nodes",ES_HOST) + sparkConf.set("spark.es.port",ES_PORT) + + sparkConf = SparkCfg.setConf(sparkConf) + + // ssc + val logsStream = new LogStreamProcess + logsStream.createSparkStreamContext(sparkConf, TOPIC, BROKER, CONSUME_GROUP, Integer.parseInt(TIME)); + + } +} diff --git a/scripts/config/logstash.conf b/scripts/config/logstash.conf new file mode 100644 index 00000000..a2643d36 --- /dev/null +++ b/scripts/config/logstash.conf @@ -0,0 +1,18 @@ + + +input { + file { + path => "${LOGSTASH_DOCKER_WORKDIR_PATH}/logs/*" + start_position => "beginning" + ignore_older => 0 + } +} +output { + stdout{} + kafka { + bootstrap_servers => "${KAFKA_HOST}:${KAFKA_PORT}" + topic_id => "sa-logs" + compression_type => "none" + codec => "json" + } +} diff --git a/scripts/docker_swarm/.env.sh b/scripts/docker_swarm/.env.sh index 44bfd506..19b14c98 100644 --- a/scripts/docker_swarm/.env.sh +++ b/scripts/docker_swarm/.env.sh @@ -58,6 +58,13 @@ export ZOOKEEPER_HOST=singa_auto_zookeeper export ZOOKEEPER_PORT=2181 export KAFKA_HOST=singa_auto_kafka export KAFKA_PORT=9092 +export LOGSTASH_HOST=singa_auto_logstash +export LOGSTASH_PORT=9600 +export ES_HOST=elasticsearch +export ES_PORT=9200 +export ES_NODE_PORT=9300 +export KIBANA_HOST=singa_auto_kibana +export KIBANA_PORT=5601 export DOCKER_WORKDIR_PATH=/root export DB_DIR_PATH=db/data @@ -66,12 +73,15 @@ export LOGS_DIR_PATH=logs # Shares a folder with containers that stores componen export PARAMS_DIR_PATH=params # Shares a folder with containers that stores model parameters, relative to workdir export CONDA_ENVIORNMENT=singa_auto export WORKDIR_PATH=$HOST_WORKDIR_PATH # Specifying workdir if Python programs are run natively +export LOGSTASH_DOCKER_WORKDIR_PATH=/usr/share/logstash +export KIBANA_DOCKER_WORKDIR_PATH=/usr/share/kibana # Docker images for SINGA-Auto's custom components export SINGA_AUTO_IMAGE_ADMIN=singa_auto/singa_auto_admin export SINGA_AUTO_IMAGE_WEB_ADMIN=singa_auto/singa_auto_admin_web export SINGA_AUTO_IMAGE_WORKER=singa_auto/singa_auto_worker export SINGA_AUTO_IMAGE_PREDICTOR=singa_auto/singa_auto_predictor +export SINGA_AUTO_IMAGE_LOGSTASH=singa_auto/singa_auto_logstash # Docker images for dependent services export IMAGE_POSTGRES=postgres:10.5-alpine @@ -79,6 +89,9 @@ export IMAGE_REDIS=redis:5.0.3-alpine3.8 export IMAGE_ZOOKEEPER=zookeeper:3.5 export IMAGE_KAFKA=wurstmeister/kafka:2.12-2.1.1 +export IMAGE_KIBANA=kibana:7.7.0 +export IMAGE_ES=docker.elastic.co/elasticsearch/elasticsearch:7.7.0 + # Utility configuration export PYTHONPATH=$PWD # Ensures that `singa_auto` module can be imported at project root export PYTHONUNBUFFERED=1 # Ensures logs from Python appear instantly diff --git a/scripts/docker_swarm/build_images.sh b/scripts/docker_swarm/build_images.sh index 45b4a135..1f3d8ccc 100644 --- a/scripts/docker_swarm/build_images.sh +++ b/scripts/docker_swarm/build_images.sh @@ -63,4 +63,8 @@ title "Building SINGA-Auto Web Admin's image..." docker build -t $SINGA_AUTO_IMAGE_WEB_ADMIN:$SINGA_AUTO_VERSION -f ./dockerfiles/web_admin.Dockerfile \ --build-arg DOCKER_WORKDIR_PATH=$DOCKER_WORKDIR_PATH $PWD || exit 1 +title "Building SINGA-Auto LogStash's image..." +docker build -t $SINGA_AUTO_IMAGE_LOGSTASH:$SINGA_AUTO_VERSION -f ./dockerfiles/logstash.Dockerfile \ + --build-arg LOGSTASH_DOCKER_WORKDIR_PATH=$LOGSTASH_DOCKER_WORKDIR_PATH $PWD + echo "Finished building all SINGA-Auto's images successfully!" diff --git a/scripts/docker_swarm/start_monitor.sh b/scripts/docker_swarm/start_monitor.sh new file mode 100644 index 00000000..fde796e7 --- /dev/null +++ b/scripts/docker_swarm/start_monitor.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +source ./scripts/docker_swarm/.env.sh +source ./scripts/docker_swarm/utils.sh + +ES_LOG_FILE_PATH=$PWD/$LOGS_DIR_PATH/es_monitor.log +KIBANA_LOG_FILE_PATH=$PWD/$LOGS_DIR_PATH/kibana_monitor.log +LOGSTADH_LOG_FILE_PATH=$PWD/$LOGS_DIR_PATH/logstash_monitor.log + +title "Starting SINGA-Auto's Monitor..." + +# start logstash + +(docker run --rm --name $LOGSTASH_HOST \ + --network $DOCKER_NETWORK \ + -e LOGSTASH_DOCKER_WORKDIR_PATH=$LOGSTASH_DOCKER_WORKDIR_PATH \ + -e KAFKA_HOST=$KAFKA_HOST \ + -e KAFKA_HOST=$KAFKA_HOST \ + -p $LOGSTASH_PORT:$LOGSTASH_PORT \ + -v $HOST_WORKDIR_PATH/$LOGS_DIR_PATH:$LOGSTASH_DOCKER_WORKDIR_PATH/$LOGS_DIR_PATH \ + -v $HOST_WORKDIR_PATH/scripts/config/logstash.conf:$LOGSTASH_DOCKER_WORKDIR_PATH/logstash.conf \ + -d $SINGA_AUTO_IMAGE_LOGSTASH:$SINGA_AUTO_VERSION \ + &> LOGSTADH_LOG_FILE_PATH) & + +# start es +(docker run --rm --name $ES_HOST \ + --network $DOCKER_NETWORK \ + -p $ES_PORT:$ES_PORT \ + -p $ES_NODE_PORT:$ES_NODE_PORT \ + -e "discovery.type=single-node" \ + -d $IMAGE_ES\ + &> ES_LOG_FILE_PATH) & + +# start +(docker run --rm --name $KIBANA_HOST \ + --network $DOCKER_NETWORK \ + -p $KIBANA_PORT:$KIBANA_PORT \ + -d $IMAGE_KIBANA \ + &> KIBANA_LOG_FILE_PATH) & diff --git a/scripts/docker_swarm/stop.sh b/scripts/docker_swarm/stop.sh index 4ad010c2..c41282e0 100644 --- a/scripts/docker_swarm/stop.sh +++ b/scripts/docker_swarm/stop.sh @@ -56,4 +56,13 @@ docker rm -f $ADMIN_HOST || echo "Failed to stop SINGA-Auto's Admin" title "Stopping SINGA-Auto's Web Admin..." docker rm -f $WEB_ADMIN_HOST || echo "Failed to stop SINGA-Auto's Web Admin" +title "Stopping SINGA-Auto's Kibana..." +docker rm -f $KIBANA_HOST + +title "Stopping SINGA-Auto's logstash..." +docker rm -f $LOGSTASH_HOST + +title "Stopping SINGA-Auto's es..." +docker rm -f $ES_HOST + echo "You'll need to destroy your machine's Docker swarm manually" diff --git a/scripts/kubernetes/.env.sh b/scripts/kubernetes/.env.sh index c4e1c590..1e8f52fb 100644 --- a/scripts/kubernetes/.env.sh +++ b/scripts/kubernetes/.env.sh @@ -40,6 +40,7 @@ export POSTGRES_EXT_PORT=5433 export REDIS_EXT_PORT=6380 export ZOOKEEPER_EXT_PORT=2181 export KAFKA_EXT_PORT=9092 +export KIBANA_EXT_PORT=31009 export HOST_WORKDIR_PATH=$PWD export APP_MODE=DEV # DEV or PROD export POSTGRES_DUMP_FILE_PATH=$PWD/db_dump.sql # PostgreSQL database dump file @@ -65,6 +66,14 @@ export ZOOKEEPER_HOST=singa-auto-zookeeper export ZOOKEEPER_PORT=2181 export KAFKA_HOST=singa-auto-kafka export KAFKA_PORT=9092 +export LOGSTASH_HOST=singa-auto-logstash +export LOGSTASH_PORT=9600 +export ES_HOST=elasticsearch +export ES_PORT=9200 +export ES_NODE_PORT=9300 +export KIBANA_HOST=singa-auto-kibana +export KIBANA_PORT=5601 + export DOCKER_WORKDIR_PATH=/root export DB_DIR_PATH=db/data export DATA_DIR_PATH=data # Shares a data folder with containers, relative to workdir @@ -72,6 +81,9 @@ export LOGS_DIR_PATH=logs # Shares a folder with containers that stores componen export PARAMS_DIR_PATH=params # Shares a folder with containers that stores model parameters, relative to workdir export CONDA_ENVIORNMENT=singa_auto export WORKDIR_PATH=$HOST_WORKDIR_PATH # Specifying workdir if Python programs are run natively +export LOGSTASH_DOCKER_WORKDIR_PATH=/usr/share/logstash +export KIBANA_DOCKER_WORKDIR_PATH=/usr/share/kibana +export SPAEK_DOCKER_JARS_PATH=/opt/spark/examples # Docker images for SINGA-Auto's custom components export SINGA_AUTO_IMAGE_ADMIN=singa_auto/singa_auto_admin @@ -80,6 +92,8 @@ export SINGA_AUTO_IMAGE_WORKER=singa_auto/singa_auto_worker export SINGA_AUTO_IMAGE_PREDICTOR=singa_auto/singa_auto_predictor export SINGA_AUTO_IMAGE_STOLON=sorintlab/stolon:master-pg10 export SINGA_AUTO_IMAGE_TEST=singa_auto/singa_auto_test +export SINGA_AUTO_IMAGE_LOGSTASH=singa_auto/singa_auto_logstash +export SINGA_AUTO_IMAGE_SPARKAPP=singa_auto/singa_auto_sparkapp # Docker images for dependent services export IMAGE_POSTGRES=postgres:10.5-alpine @@ -87,6 +101,9 @@ export IMAGE_REDIS=redis:5.0.3-alpine3.8 export IMAGE_ZOOKEEPER=zookeeper:3.5 export IMAGE_KAFKA=wurstmeister/kafka:2.12-2.1.1 +export IMAGE_KIBANA=kibana:7.7.0 +export IMAGE_ES=docker.elastic.co/elasticsearch/elasticsearch:7.7.0 + # Utility configuration export PYTHONPATH=$PWD # Ensures that `singa_auto` module can be imported at project root export PYTHONUNBUFFERED=1 # Ensures logs from Python appear instantly diff --git a/scripts/kubernetes/build_images.sh b/scripts/kubernetes/build_images.sh index 960a5752..00c14265 100644 --- a/scripts/kubernetes/build_images.sh +++ b/scripts/kubernetes/build_images.sh @@ -32,6 +32,14 @@ title "Using K8S" echo "using $APP_MODE docker files" +title "Building SINGA-Auto LogStash's image..." +docker build -t $SINGA_AUTO_IMAGE_LOGSTASH:$SINGA_AUTO_VERSION -f ./dockerfiles/logstash.Dockerfile \ + --build-arg LOGSTASH_DOCKER_WORKDIR_PATH=$LOGSTASH_DOCKER_WORKDIR_PATH $PWD + +title "Building SINGA-Auto SparkApp's image..." +docker build -t $SINGA_AUTO_IMAGE_SPARKAPP:$SINGA_AUTO_VERSION -f ./dockerfiles/spark.Dockerfile \ + --build-arg SPAEK_DOCKER_JARS_PATH=$SPAEK_DOCKER_JARS_PATH $PWD + if [[ $APP_MODE = "DEV" ]] then title "Building SINGA-Auto Admin's image..." diff --git a/scripts/kubernetes/create_config.py b/scripts/kubernetes/create_config.py index 37789b13..2fdae4e2 100644 --- a/scripts/kubernetes/create_config.py +++ b/scripts/kubernetes/create_config.py @@ -85,6 +85,22 @@ INGRESS_EXT_PORT = sys.argv[53] REDIS_PASSWORD = sys.argv[54] + LOGSTASH_HOST = sys.argv[55] + LOGSTASH_PORT = sys.argv[56] + ES_HOST = sys.argv[57] + ES_PORT = sys.argv[58] + ES_NODE_PORT = sys.argv[59] + KIBANA_HOST = sys.argv[60] + KIBANA_PORT = sys.argv[61] + LOGSTASH_DOCKER_WORKDIR_PATH = sys.argv[62] + KIBANA_DOCKER_WORKDIR_PATH = sys.argv[63] + SINGA_AUTO_IMAGE_LOGSTASH = sys.argv[64] + IMAGE_KIBANA = sys.argv[65] + IMAGE_ES = sys.argv[66] + KIBANA_EXT_PORT = sys.argv[67] + SINGA_AUTO_IMAGE_SPARKAPP = sys.argv[68] + SPAEK_DOCKER_JARS_PATH = sys.argv[69] + #zk service content = {} content.setdefault('apiVersion', 'v1') @@ -425,3 +441,188 @@ container.setdefault('env', env) with open('{}/scripts/kubernetes/start_web_admin_deployment.json'.format(PYTHONPATH), 'w') as f: f.write(json.dumps(content, indent=4)) + + # LogStash deployment + content = {} + content.setdefault('apiVersion', 'apps/v1') + content.setdefault('kind', 'Deployment') + metadata = content.setdefault('metadata', {}) + metadata.setdefault('name', LOGSTASH_HOST) + labels = metadata.setdefault('labels', {}) + labels.setdefault('name', LOGSTASH_HOST) + spec = content.setdefault('spec', {}) + spec.setdefault('replicas', 1) + spec.setdefault('selector', {'matchLabels': {'name': LOGSTASH_HOST}}) + template = spec.setdefault('template', {}) + template.setdefault('metadata', {'labels': {'name': LOGSTASH_HOST}}) + container = {} + container.setdefault('name', LOGSTASH_HOST) + container.setdefault('image', '{}:{}'.format(SINGA_AUTO_IMAGE_LOGSTASH, SINGA_AUTO_VERSION)) + + template.setdefault('spec', {'containers': [container]}) + env = [] + env.append({'name': 'LOGSTASH_DOCKER_WORKDIR_PATH', 'value': LOGSTASH_DOCKER_WORKDIR_PATH}) + env.append({'name': 'KAFKA_HOST', 'value': KAFKA_HOST}) + env.append({'name': 'KAFKA_PORT', 'value': KAFKA_PORT}) + container.setdefault('env', env) + + container.setdefault('volumeMounts', + [{'name': 'conf-path', 'mountPath': '{}/logstash.conf'.format(LOGSTASH_DOCKER_WORKDIR_PATH)}, \ + {'name': 'log-path', 'mountPath': '{}/{}'.format(LOGSTASH_DOCKER_WORKDIR_PATH, LOGS_DIR_PATH)}, \ + {'name': 'docker-path', 'mountPath': '/var/run/docker.sock'}]) + template['spec']['volumes'] = [ + {'name': 'conf-path', 'hostPath': {'path': '{}/scripts/config/logstash.conf'.format(HOST_WORKDIR_PATH)}}, \ + {'name': 'log-path', 'hostPath': {'path': '{}/{}'.format(HOST_WORKDIR_PATH, LOGS_DIR_PATH)}}, \ + {'name': 'docker-path', 'hostPath': {'path': '/var/run/docker.sock'}}] + + with open('{}/scripts/kubernetes/start_logstash_deployment.json'.format(PYTHONPATH), 'w') as f: + f.write(json.dumps(content, indent=4)) + + # LogStash service + content = {} + content.setdefault('apiVersion', 'v1') + content.setdefault('kind', 'Service') + metadata = content.setdefault('metadata', {}) + metadata.setdefault('name', LOGSTASH_HOST) + labels = metadata.setdefault('labels', {}) + labels.setdefault('name', LOGSTASH_HOST) + spec = content.setdefault('spec', {}) + spec.setdefault('type', 'NodePort') + ports = spec.setdefault('ports', []) + ports.append({ + 'port': int(LOGSTASH_PORT), + 'targetPort': int(LOGSTASH_PORT) + }) + spec.setdefault('selector', {'name': LOGSTASH_HOST}) + with open('{}/scripts/kubernetes/start_logstash_service.json'.format(PYTHONPATH), 'w') as f: + f.write(json.dumps(content, indent=4)) + + # kibana deployment + content = {} + content.setdefault('apiVersion', 'apps/v1') + content.setdefault('kind', 'Deployment') + metadata = content.setdefault('metadata', {}) + metadata.setdefault('name', KIBANA_HOST) + labels = metadata.setdefault('labels', {}) + labels.setdefault('name', KIBANA_HOST) + spec = content.setdefault('spec', {}) + spec.setdefault('replicas', 1) + spec.setdefault('selector', {'matchLabels': {'name': KIBANA_HOST}}) + template = spec.setdefault('template', {}) + template.setdefault('metadata', {'labels': {'name': KIBANA_HOST}}) + container = {} + container.setdefault('name', KIBANA_HOST) + container.setdefault('image', '{}'.format(IMAGE_KIBANA)) + + template.setdefault('spec', {'containers': [container]}) + + with open('{}/scripts/kubernetes/start_kibana_deployment.json'.format(PYTHONPATH), 'w') as f: + f.write(json.dumps(content, indent=4)) + + # kibana service + content = {} + content.setdefault('apiVersion', 'v1') + content.setdefault('kind', 'Service') + metadata = content.setdefault('metadata', {}) + metadata.setdefault('name', KIBANA_HOST) + labels = metadata.setdefault('labels', {}) + labels.setdefault('name', KIBANA_HOST) + spec = content.setdefault('spec', {}) + spec.setdefault('type', 'NodePort') + ports = spec.setdefault('ports', []) + ports.append({ + 'port': int(KIBANA_PORT), + 'targetPort': int(KIBANA_PORT), + 'nodePort': int(KIBANA_EXT_PORT) + }) + spec.setdefault('selector', {'name': KIBANA_HOST}) + with open('{}/scripts/kubernetes/start_kibana_service.json'.format(PYTHONPATH), 'w') as f: + f.write(json.dumps(content, indent=4)) + + # es deployment + content = {} + content.setdefault('apiVersion', 'apps/v1') + content.setdefault('kind', 'Deployment') + metadata = content.setdefault('metadata', {}) + metadata.setdefault('name', ES_HOST) + labels = metadata.setdefault('labels', {}) + labels.setdefault('name', ES_HOST) + spec = content.setdefault('spec', {}) + spec.setdefault('replicas', 1) + spec.setdefault('selector', {'matchLabels': {'name': ES_HOST}}) + template = spec.setdefault('template', {}) + template.setdefault('metadata', {'labels': {'name': ES_HOST}}) + container = {} + container.setdefault('name', ES_HOST) + container.setdefault('image', '{}'.format(IMAGE_ES)) + + template.setdefault('spec', {'containers': [container]}) + env = [] + env.append({'name': 'discovery.type', 'value': 'single-node'}) + container.setdefault('env', env) + + with open('{}/scripts/kubernetes/start_es_deployment.json'.format(PYTHONPATH), 'w') as f: + f.write(json.dumps(content, indent=4)) + + # es service + content = {} + content.setdefault('apiVersion', 'v1') + content.setdefault('kind', 'Service') + metadata = content.setdefault('metadata', {}) + metadata.setdefault('name', ES_HOST) + labels = metadata.setdefault('labels', {}) + labels.setdefault('name', ES_HOST) + spec = content.setdefault('spec', {}) + spec.setdefault('type', 'NodePort') + ports = spec.setdefault('ports', []) + ports.append({ + 'port': int(ES_PORT), + 'targetPort': int(ES_PORT), + 'name': "{}".format(ES_PORT) + }) + ports.append({ + 'port': int(ES_NODE_PORT), + 'targetPort': int(ES_NODE_PORT), + 'name': "{}".format(ES_NODE_PORT) + }) + + spec.setdefault('selector', {'name': ES_HOST}) + with open('{}/scripts/kubernetes/start_es_service.json'.format(PYTHONPATH), 'w') as f: + f.write(json.dumps(content, indent=4)) + + # spark configure + content = {} + content.setdefault('apiVersion', "sparkoperator.k8s.io/v1beta2") + content.setdefault('kind', 'SparkApplication') + + metadata = content.setdefault('metadata', {}) + metadata.setdefault('name', "singa-auto-monitor") + metadata.setdefault('namespace', "default") + + spec = content.setdefault('spec', {}) + spec.setdefault('type', 'Scala') + spec.setdefault('mode', 'cluster') + spec.setdefault('image', '{}:{}'.format(SINGA_AUTO_IMAGE_SPARKAPP, SINGA_AUTO_VERSION)) + spec.setdefault('imagePullPolicy', 'IfNotPresent') + spec.setdefault('mainClass', "com.singa.auto.monitor.stream.LogStreamProcess") + spec.setdefault('mainApplicationFile', "local://{}/log_minitor-jar-with-dependencies.jar".format(SPAEK_DOCKER_JARS_PATH)) + spec.setdefault('sparkVersion', "2.4.5") + + restartPolicy = spec.setdefault('restartPolicy', {}) + restartPolicy.setdefault('type', "Always") + + driver = spec.setdefault('driver', {}) + driver.setdefault('cores', 1) + driver.setdefault('coreLimit', "1000m") + driver.setdefault('memory', "512m") + driver.setdefault('labels', {"version": "2.4.5"}) + driver.setdefault('serviceAccount', "spark") + + executor = spec.setdefault('executor', {}) + executor.setdefault('cores', 1) + executor.setdefault('instances', 2) + executor.setdefault('memory', "512m") + executor.setdefault('labels', {"version": "2.4.5"}) + + with open('{}/scripts/kubernetes/spark-app.json'.format(PYTHONPATH), 'w') as f: + f.write(json.dumps(content, indent=4)) diff --git a/scripts/kubernetes/generate_config.sh b/scripts/kubernetes/generate_config.sh index 99e24400..6d0ba6f7 100644 --- a/scripts/kubernetes/generate_config.sh +++ b/scripts/kubernetes/generate_config.sh @@ -73,4 +73,19 @@ $CLUSTER_MODE \ $DB_DIR_PATH \ $INGRESS_NAME \ $INGRESS_EXT_PORT \ -$REDIS_PASSWORD +$REDIS_PASSWORD \ +$LOGSTASH_HOST \ +$LOGSTASH_PORT \ +$ES_HOST \ +$ES_PORT \ +$ES_NODE_PORT \ +$KIBANA_HOST \ +$KIBANA_PORT \ +$LOGSTASH_DOCKER_WORKDIR_PATH \ +$KIBANA_DOCKER_WORKDIR_PATH \ +$SINGA_AUTO_IMAGE_LOGSTASH \ +$IMAGE_KIBANA \ +$IMAGE_ES \ +$KIBANA_EXT_PORT \ +$SINGA_AUTO_IMAGE_SPARKAPP \ +$SPAEK_DOCKER_JARS_PATH diff --git a/scripts/kubernetes/remove_config.sh b/scripts/kubernetes/remove_config.sh index 791620e8..682d73a3 100644 --- a/scripts/kubernetes/remove_config.sh +++ b/scripts/kubernetes/remove_config.sh @@ -23,10 +23,17 @@ rm -f ./scripts/kubernetes/start_web_admin_deployment.json rm -f ./scripts/kubernetes/start_db_deployment.json rm -f ./scripts/kubernetes/start_zookeeper_deployment.json rm -f ./scripts/kubernetes/start_kafka_deployment.json +rm -f ./scripts/kubernetes/start_kibana_deployment.json +rm -f ./scripts/kubernetes/start_logstash_deployment.json +rm -f ./scripts/kubernetes/start_es_deployment.json rm -f ./scripts/kubernetes/start_admin_service.json rm -f ./scripts/kubernetes/start_redis_service.json rm -f ./scripts/kubernetes/start_web_admin_service.json rm -f ./scripts/kubernetes/start_db_service.json rm -f ./scripts/kubernetes/start_zookeeper_service.json -rm -f ./scripts/kubernetes/start_kafka_service.json \ No newline at end of file +rm -f ./scripts/kubernetes/start_kafka_service.json +rm -f ./scripts/kubernetes/start_kibana_service.json +rm -f ./scripts/kubernetes/start_logstash_service.json +rm -f ./scripts/kubernetes/start_es_service.json +rm -f ./scripts/kubernetes/spark-app.json diff --git a/scripts/kubernetes/replace_services.sh b/scripts/kubernetes/replace_services.sh index 34c973b6..b19ac7bd 100644 --- a/scripts/kubernetes/replace_services.sh +++ b/scripts/kubernetes/replace_services.sh @@ -44,4 +44,18 @@ then kubectl replace --force -f scripts/kubernetes/start_web_admin_service.json fi + +if [[ $1 = "monitor" ]] +then + kubectl replace --force -f scripts/kubernetes/start_logstash_deployment.json + kubectl replace --force -f scripts/kubernetes/start_logstash_service.json + + kubectl replace --force -f scripts/kubernetes/start_es_deployment.json + kubectl replace --force -f scripts/kubernetes/start_es_service.json + + kubectl replace --force -f scripts/kubernetes/start_kibana_deployment.json + kubectl replace --force -f scripts/kubernetes/start_kibana_service.json + + kubectl replace --force -f scripts/kubernetes/spark-app.json +fi bash ./scripts/kubernetes/remove_config.sh diff --git a/scripts/kubernetes/spark-rbac.yaml b/scripts/kubernetes/spark-rbac.yaml new file mode 100644 index 00000000..bd46eee8 --- /dev/null +++ b/scripts/kubernetes/spark-rbac.yaml @@ -0,0 +1,33 @@ + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + namespace: default + name: spark-role +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["*"] +- apiGroups: [""] + resources: ["services"] + verbs: ["*"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-role-binding + namespace: default +subjects: +- kind: ServiceAccount + name: spark + namespace: default +roleRef: + kind: Role + name: spark-role + apiGroup: rbac.authorization.k8s.io diff --git a/scripts/kubernetes/start.sh b/scripts/kubernetes/start.sh index 6949a8c9..db46cb90 100644 --- a/scripts/kubernetes/start.sh +++ b/scripts/kubernetes/start.sh @@ -55,6 +55,9 @@ fi bash ./scripts/kubernetes/start_admin.sh || exit 1 bash ./scripts/kubernetes/start_web_admin.sh || exit 1 +echo "Deploy monitor plugin" +bash ./scripts/kubernetes/start_monitor.sh + bash ./scripts/kubernetes/remove_config.sh || exit 1 echo "Deploy ingress-nginx" diff --git a/scripts/kubernetes/start_monitor.sh b/scripts/kubernetes/start_monitor.sh new file mode 100644 index 00000000..3c89470b --- /dev/null +++ b/scripts/kubernetes/start_monitor.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +source ./scripts/kubernetes/.env.sh +source ./scripts/kubernetes/utils.sh + +title "Starting SINGA-Auto's Monitor..." + +# start logstash +LOGSTADH_LOG_FILE_PATH=$PWD/logs/start_logstash_service.log +(kubectl create -f scripts/kubernetes/start_logstash_service.json \ +&> $LOGSTADH_LOG_FILE_PATH) & +ensure_stable "SINGA-Auto's logstash Service" $LOGSTADH_LOG_FILE_PATH 1 + +LOGSTADH_LOG_FILE_PATH=$PWD/logs/start_logstash_deployment.log +(kubectl create -f scripts/kubernetes/start_logstash_deployment.json \ +&> $LOGSTADH_LOG_FILE_PATH) & +ensure_stable "SINGA-Auto's logstash Service" $LOGSTADH_LOG_FILE_PATH 1 + +# start es + +ES_LOG_FILE_PATH=$PWD/logs/start_es_service.log +(kubectl create -f scripts/kubernetes/start_es_service.json \ +&> $ES_LOG_FILE_PATH) & +ensure_stable "SINGA-Auto's es Service" $ES_LOG_FILE_PATH 1 + +ES_LOG_FILE_PATH=$PWD/logs/start_es_deployment.log +(kubectl create -f scripts/kubernetes/start_es_deployment.json \ +&> $ES_LOG_FILE_PATH) & +ensure_stable "SINGA-Auto's es Service" $ES_LOG_FILE_PATH 1 + +# kibana start + +KIBANA_LOG_FILE_PATH=$PWD/logs/start_kibana_service.log +(kubectl create -f scripts/kubernetes/start_kibana_service.json \ +&> $KIBANA_LOG_FILE_PATH) & +ensure_stable "SINGA-Auto's kibana Service" KIBANA_LOG_FILE_PATH 1 + +KIBANA_LOG_FILE_PATH=$PWD/logs/start_kibana_deployment.log +(kubectl create -f scripts/kubernetes/start_kibana_deployment.json \ +&> $KIBANA_LOG_FILE_PATH) & +ensure_stable "SINGA-Auto's kibana Service" KIBANA_LOG_FILE_PATH 1 + + +# start sparkapp + +helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator +helm install incubator/sparkoperator --namespace default --set operatorVersion=v1beta2-1.1.2-2.4.5 --generate-name + +SPARK_LOG_FILE_PATH=$PWD/logs/start_spark.log +(kubectl apply -f scripts/kubernetes/spark-rbac.yaml \ +&> $SPARK_LOG_FILE_PATH) & + +ensure_stable "SINGA-Auto's sparkoperator" KIBANA_LOG_FILE_PATH 20 + +(kubectl apply -f scripts/kubernetes/spark-app.json \ +&> $SPARK_LOG_FILE_PATH) &