Skip to content

Commit

Permalink
Add Monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
nailixing committed Jun 10, 2020
1 parent 1850906 commit fd9afc3
Show file tree
Hide file tree
Showing 28 changed files with 1,167 additions and 2 deletions.
39 changes: 39 additions & 0 deletions dockerfiles/logstash.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
FROM logstash:7.7.0

MAINTAINER NailiXing <[email protected]>


RUN /usr/share/logstash/bin/logstash-plugin install logstash-input-http_poller
RUN /usr/share/logstash/bin/logstash-plugin install logstash-input-exec
RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-json_encode


EXPOSE 9600 5044

ARG LOGSTASH_DOCKER_WORKDIR_PATH

WORKDIR $LOGSTASH_DOCKER_WORKDIR_PATH

COPY scripts/config/logstash.conf $LOGSTASH_DOCKER_WORKDIR_PATH/logstash.conf

COPY ./logs $LOGSTASH_DOCKER_WORKDIR_PATH/logs

CMD bin/logstash -f logstash.conf
23 changes: 23 additions & 0 deletions dockerfiles/spark.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
FROM gcr.io/spark-operator/spark:v2.4.5

ARG SPAEK_DOCKER_JARS_PATH
COPY log_minitor/target/log_minitor-jar-with-dependencies.jar $SPAEK_DOCKER_JARS_PATH/log_minitor-jar-with-dependencies.jar
COPY log_minitor/src/main/resources/log4j.properties /opt/spark/conf/log4j.properties
2 changes: 2 additions & 0 deletions log_minitor/log_minitor.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
111 changes: 111 additions & 0 deletions log_minitor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>log_minitor</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.4.5</spark.version>
<scala_spark_core.version>2.11</scala_spark_core.version>
<scala.version>2.11.12</scala.version>
<elasticsearch.version>7.7.1</elasticsearch.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala_spark_core.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala_spark_core.version}</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala_spark_core.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

</dependencies>

<build>
<finalName>log_minitor</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>LogStreamProcess</mainClass>
</manifest>log4j.properties
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>


</plugins>
</build>


</project>
8 changes: 8 additions & 0 deletions log_minitor/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
log4j.rootLogger=ERROR, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
48 changes: 48 additions & 0 deletions log_minitor/src/main/scala/com/singa/auto/Config.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.singa.auto

object Config {

// spark table view
val LogViewTable = "logs"
// model
// val SPARK_APP_NAME = "logs_process"
// val RUNNING_MODEL="local[*]"

// kafka config
val TOPIC = "sa-logs"
val BROKER = "singa-auto-kafka:9092"
val CONSUME_GROUP = "log_consumer";

// es config
val ES_HOST = "elasticsearch";
val ES_PORT = "9200";
val ES_INDEX = "spark/logs"

// spark stream time between 2 pull
val TIME = "2"

// don't store below log message at es
val FilterMsg:Array[String] = Array("kafka.conn INFO", "kafka.consumer.subscription_state INFO")



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.singa.auto.common.configuration

import org.apache.spark.SparkConf

object SparkCfg {

def setConf(sparkConf: SparkConf):SparkConf={
sparkConf.set("spark.dynamicAllocation.enabled", "false");
sparkConf.set("spark.streaming.kafka.consumer.poll.ms", "2000");
sparkConf.set("spark.streaming.unpersist", "true")
sparkConf.set("spark.cleaner.ttl", "7200s")
sparkConf
}
}
40 changes: 40 additions & 0 deletions log_minitor/src/main/scala/com/singa/auto/common/store/Kafka.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.singa.auto.common.store

import org.apache.kafka.common.serialization.StringDeserializer

object Kafka {

def getKafkaParams(brokers: String, kafkaConsumerGroup: String): Map[String, Object] = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest", // earliest,latest
"group.id" -> kafkaConsumerGroup,
"session.timeout.ms" -> (120 * 1000: java.lang.Integer),
"request.timeout.ms" -> (150 * 1000: java.lang.Integer),
"enable.auto.commit" -> (true: java.lang.Boolean))

kafkaParams
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.singa.auto.common.udf

import com.singa.auto.Config
import org.apache.spark.sql.SparkSession

class SparkUdfs(spark: SparkSession) extends Serializable {

def regist(): SparkSession = {

spark.udf.register("parserMsg",
(message: String)=>
{
message
}
)

spark.udf.register("parserName",
(path: String)=>
{
path.split("/").last
}
)

spark.udf.register("filterMsg",
(message: String) => {
var isvalid: String = "valid";
for (msg <- Config.FilterMsg) {
if (message.contains(msg)) {
isvalid = "invalid"
}
}
isvalid
}
)

spark

}
}
Loading

0 comments on commit fd9afc3

Please sign in to comment.