Skip to content

Commit

Permalink
add source code
Browse files Browse the repository at this point in the history
  • Loading branch information
Malik Fajar committed Mar 19, 2019
1 parent 72d604d commit dde0b13
Show file tree
Hide file tree
Showing 7 changed files with 45,631 additions and 0 deletions.
45,253 changes: 45,253 additions & 0 deletions humidity.json

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,70 @@
<artifactId>Spark-KafkaToHBase</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
59 changes: 59 additions & 0 deletions src/main/java/com/malik/config/HBaseConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.malik.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class HBaseConfig {

private String outputTable;
private String hbaseMaster;
private String zookeeperQuorum;

/**
* Set variable in constructor
* @param outputTable -> String output table name
* @param hbaseMaster -> HBase Master host
* @param zookeeperQuorum -> Zookeeper host
*/
public HBaseConfig(String outputTable, String hbaseMaster, String zookeeperQuorum) {
this.outputTable = outputTable;
this.hbaseMaster = hbaseMaster;
this.zookeeperQuorum = zookeeperQuorum;
}

/**
* Set HBase configuration
* @return HBase configuration
*/
private Configuration hbaseConfiguration() {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.master", hbaseMaster);
configuration.set("zookeeper.znode.parent", "/hbase-unsecure");
configuration.setInt("timeout", 120000);
configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
configuration.set("hbase.client.keyvalue.maxsize", "0");
configuration.set("hbase.client.scanner.timeout.period", "100000");
configuration.set("hbase.rpc.timeout", "100000");
configuration.set("mapred.output.dir", "/tmp");
configuration.set("mapreduce.output.fileoutputformat.outputdir", "/tmp");

return configuration;
}

/**
* Set HBase job
* @return HBase job
* @throws IOException -> Exception
*/
public Job hbaseJob() throws IOException {
Job job = Job.getInstance(hbaseConfiguration());
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTable);
job.setOutputFormatClass(TableOutputFormat.class);

return job;
}
}
95 changes: 95 additions & 0 deletions src/main/java/com/malik/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.malik.config;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class KafkaConfig implements Serializable {

private String bootstrapServer;
private String kafkaGroupName;
private String autoOffsetReset;
private Boolean autoCommit;
private Map<String, Integer> kafkaTopics;

/**
* Set variable in constructor. For Kafka Producer.
* @param bootstrapServer -> String kafka broker (host:port)
*/
public KafkaConfig(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
kafkaGroupName = "group1";
autoOffsetReset = "earliest";
autoCommit = false;
kafkaTopics = new HashMap<>();
}

/**
* Set variable in constructor. For Kafka Consumer.
* @param bootstrapServer -> String kafka broker (host:port)
* @param kafkaGroupName -> String kafka group
* @param autoOffsetReset -> String kafka auto offset reset
* @param autoCommit -> Boolean kafka auto commit
* @param kafkaTopics -> Map kafka topics (String topic name, Integer partitions)
*/
public KafkaConfig(String bootstrapServer, String kafkaGroupName, String autoOffsetReset, Boolean autoCommit, Map<String, Integer> kafkaTopics) {
this.bootstrapServer = bootstrapServer;
this.kafkaGroupName = kafkaGroupName;
this.autoOffsetReset = autoOffsetReset;
this.autoCommit = autoCommit;
this.kafkaTopics = kafkaTopics;
}

/**
* Set kafka config
* @return Map kafka configuration
*/
public Map<String, Object> kafkaConfiguration() {
Map<String, Object> map = new HashMap<>();
map.put("bootstrap.servers", bootstrapServer);
map.put("request.timeout.ms", 120000);

// consumer
map.put("key.deserializer", StringDeserializer.class);
map.put("value.deserializer", StringDeserializer.class);
map.put("group.id", kafkaGroupName);
map.put("auto.offset.reset", autoOffsetReset);
map.put("enable.auto.commit", autoCommit);
map.put("max.poll.records", 250);
map.put("max.poll.interval.ms", 450000);
map.put("session.timeout.ms", 30000);

// producer
map.put("key.serializer", StringSerializer.class);
map.put("value.serializer", StringSerializer.class);
map.put("retries", 3);
map.put("compression.type", "snappy");
map.put("batch.size", 24576);
map.put("delivery.timeout.ms", 180000);
map.put("linger.ms", 30000);
map.put("acks", "all");

return map;
}

/**
* Set kafka topics and its partitions. It's possible consume multiple topics
* @return List of TopicPartition
*/
public List<TopicPartition> kafkaTopics() {
List<TopicPartition> topicPartitionList = new ArrayList<>();

kafkaTopics.forEach((topicName, topicPartitions) -> {
for (int partitionNumber = 0; partitionNumber < topicPartitions; partitionNumber++)
topicPartitionList.add(new TopicPartition(topicName, partitionNumber));
});

return topicPartitionList;
}
}
93 changes: 93 additions & 0 deletions src/main/java/com/malik/main/ConsumerMain.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.malik.main;

import com.malik.config.HBaseConfig;
import com.malik.config.KafkaConfig;
import com.malik.util.ApplicationUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConsumerMain {
public static void main(String[] args) throws InterruptedException, IOException {

// Instantiate Spark
SparkSession sparkSession = SparkSession.builder()
.appName("ConsumerMain")
.getOrCreate();
JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, new Duration(5000));

// Instantiate config / util
ApplicationUtil applicationUtil = new ApplicationUtil();

Map<String, Object> mapConfig = applicationUtil.convertJSONToMap(args[0]);
Map<String, Object> mapKafkaConfig = (Map<String, Object>) mapConfig.get("kafka");
Map<String, Object> mapHbaseConfig = (Map<String, Object>) mapConfig.get("hbase");

KafkaConfig kafkaConfig = new KafkaConfig((String) mapKafkaConfig.get("broker"), (String) mapKafkaConfig.get("group_name"), (String) mapKafkaConfig.get("auto_offset_reset"), (Boolean) mapKafkaConfig.get("auto_commit"), (Map<String, Integer>) mapKafkaConfig.get("kafka_topics"));
HBaseConfig hBaseConfig = new HBaseConfig((String) mapHbaseConfig.get("output_table"), (String) mapHbaseConfig.get("master"), (String) mapHbaseConfig.get("quorum"));

// Create direct stream from Kafka
JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Assign(kafkaConfig.kafkaTopics(), kafkaConfig.kafkaConfiguration()));

directStream.foreachRDD(javaRDD -> {

// Get offset range
OffsetRange[] offsetRanges = ((HasOffsetRanges) javaRDD.rdd()).offsetRanges();

// Spark transformation
JavaPairRDD<ImmutableBytesWritable, Put> pairRDD = javaRDD.flatMap(consumerRecord -> {
Map<String, Object> mapValue = applicationUtil.convertJSONToMap(consumerRecord.value());
List<Map<String, Object>> newValue = new ArrayList<>();

mapValue.keySet()
.stream()
.filter(stringKey -> !stringKey.equals("datetime"))
.forEach(stringKey -> {
Map<String, Object> map = new HashMap<>();
map.put("city", stringKey);
map.put("humidity", mapValue.get(stringKey));
map.put("datetime", mapValue.get("datetime"));

newValue.add(map);
});

return newValue.iterator();
}).mapToPair(mapValue -> {

// Set HBase row id. Use city name as row id
Put put = new Put(Bytes.toBytes(((String) mapValue.get("city")).toLowerCase().replace(" ", "_")));

// Set HBase column family, column qualifier, and column value. Humidity as column family, datetime value as column qualifier, and humidity value as column value.
put.addColumn(Bytes.toBytes("humidity"), Bytes.toBytes(((String) mapValue.get("datetime")).replace(" ", "_")), Bytes.toBytes((String) mapValue.get("humidity")));

return new Tuple2<>(new ImmutableBytesWritable(), put);
});

// Spark action
pairRDD.saveAsNewAPIHadoopDataset(hBaseConfig.hbaseJob().getConfiguration());

// Commit offset manually
((CanCommitOffsets) directStream.inputDStream()).commitAsync(offsetRanges);
});

streamingContext.start();
streamingContext.awaitTermination();
}
}
47 changes: 47 additions & 0 deletions src/main/java/com/malik/main/ProducerMain.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.malik.main;

import com.malik.config.KafkaConfig;
import com.malik.util.ApplicationUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.io.IOException;
import java.util.Map;

public class ProducerMain {
private static transient Producer<String, String> kafkaProducer;

public static void main(String[] args) throws IOException {
// Instantiate Spark
SparkSession sparkSession = SparkSession.builder()
.appName("ProducerMain")
.getOrCreate();
JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());

// Instantiate config / util
ApplicationUtil applicationUtil = new ApplicationUtil();

Map<String, Object> mapConfig = applicationUtil.convertJSONToMap(args[0]);
Map<String, Object> mapKafkaConfig = (Map<String, Object>) mapConfig.get("kafka");

KafkaConfig kafkaConfig = new KafkaConfig((String) mapKafkaConfig.get("broker"));

// Instantiate Kafka Producer
kafkaProducer = new KafkaProducer<>(kafkaConfig.kafkaConfiguration());

// Read file then send its value to Kafka. In my code I use spark to read file.
sparkContext.textFile("file://" + args[1])
.foreach(stringJSON -> {

// Send message (string JSON) to Kafka
kafkaProducer.send(new ProducerRecord<>((String) mapKafkaConfig.get("kafka_topic"), stringJSON));

});

// Close Kafka Producer
kafkaProducer.close();
}
}
19 changes: 19 additions & 0 deletions src/main/java/com/malik/util/ApplicationUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.malik.util;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

public class ApplicationUtil implements Serializable {

/**
* Convert String JSON to Map
* @param json -> String JSON
* @return Map
*/
public Map<String, Object> convertJSONToMap(String json) throws IOException {
return new ObjectMapper().readValue(json, Map.class);
}
}

0 comments on commit dde0b13

Please sign in to comment.