Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dbsheta committed Nov 5, 2018
0 parents commit 4805c5f
Show file tree
Hide file tree
Showing 31 changed files with 505 additions and 0 deletions.
13 changes: 13 additions & 0 deletions .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions kafka-twitter-producer.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
80 changes: 80 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>ml.dhoomilsheta.app</groupId>
<artifactId>kafka-twitter-producer</artifactId>
<version>1.0-SNAPSHOT</version>

<name>kafka-twitter-producer</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>

<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
14 changes: 14 additions & 0 deletions src/main/java/ml/dhoomilsheta/app/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ml.dhoomilsheta.app;

import ml.dhoomilsheta.app.producer.TwitterKafkaProducer;

/**
* Hello world!
*/
public class App {
public static void main(String[] args) {
System.out.println("Hello World!");
TwitterKafkaProducer producer = new TwitterKafkaProducer();
producer.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ml.dhoomilsheta.app.config;

public class KafkaConfiguration {
public static final String SERVERS = "bigdata-1:9092, bigdata-2:9092, bigdata-3:9092";
public static final String TOPIC = "bigdata-tweets";
public static final long SLEEP_TIMER = 1000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ml.dhoomilsheta.app.config;

public class TwitterConfiguration {
public static final String CONSUMER_KEY = "PbHuTRuSqSHTUzAJGJuqZAZoz";
public static final String CONSUMER_SECRET = "BjpUlQLT7lVuXR7kI0PJCYS77b6hFDuQKYLLex7ETfKRYDiLpX";
public static final String ACCESS_TOKEN = "319641898-eSXscJnl6aTyExpEZUCVi6RyQ8JzRPIQ4R9qFDA5";
public static final String TOKEN_SECRET = "1AYcrO4wodtFxVZKWv5UB5v7eKgZq21wJ8xaTRYkvhCKU";
public static final String HASHTAG = "#bigdata";
}
85 changes: 85 additions & 0 deletions src/main/java/ml/dhoomilsheta/app/model/Tweet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package ml.dhoomilsheta.app.model;

import com.google.gson.annotations.SerializedName;

public class Tweet {
private long id;
private String text;
private String lang;
private User user;

@SerializedName("retweet_count")
private int retweetCount;

@SerializedName("favorite_count")
private int favoriteCount;

public Tweet(long id, String text, String lang, User user, int retweetCount, int favoriteCount) {
this.id = id;
this.text = text;
this.lang = lang;
this.user = user;
this.retweetCount = retweetCount;
this.favoriteCount = favoriteCount;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public String getText() {
return text;
}

public void setText(String text) {
this.text = text;
}

public String getLang() {
return lang;
}

public void setLang(String lang) {
this.lang = lang;
}

public User getUser() {
return user;
}

public void setUser(User user) {
this.user = user;
}

public int getRetweetCount() {
return retweetCount;
}

public void setRetweetCount(int retweetCount) {
this.retweetCount = retweetCount;
}

public int getFavoriteCount() {
return favoriteCount;
}

public void setFavoriteCount(int favoriteCount) {
this.favoriteCount = favoriteCount;
}

@Override
public String toString() {
return "Tweet{" +
"id=" + id +
", text='" + text + '\'' +
", lang='" + lang + '\'' +
", user=" + user +
", retweetCount=" + retweetCount +
", favoriteCount=" + favoriteCount +
'}';
}
}
74 changes: 74 additions & 0 deletions src/main/java/ml/dhoomilsheta/app/model/User.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package ml.dhoomilsheta.app.model;

import com.google.gson.annotations.SerializedName;

public class User {
private long id;
private String name;

@SerializedName("screen_name")
private String screenName;
private String location;

@SerializedName("followers_count")
private int followersCount;

public User(long id, String name, String screenName, String location, int followersCount) {
this.id = id;
this.name = name;
this.screenName = screenName;
this.location = location;
this.followersCount = followersCount;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getScreenName() {
return screenName;
}

public void setScreenName(String screenName) {
this.screenName = screenName;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

public int getFollowersCount() {
return followersCount;
}

public void setFollowersCount(int followersCount) {
this.followersCount = followersCount;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", screenName='" + screenName + '\'' +
", location='" + location + '\'' +
", followersCount=" + followersCount +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package ml.dhoomilsheta.app.producer;

import com.google.gson.Gson;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import ml.dhoomilsheta.app.config.KafkaConfiguration;
import ml.dhoomilsheta.app.config.TwitterConfiguration;
import ml.dhoomilsheta.app.model.Tweet;
import ml.dhoomilsheta.app.producer.callback.BasicCallback;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class TwitterKafkaProducer {
private Client client;
private BlockingQueue<String> queue;
private Gson gson;
private Callback callback;

public TwitterKafkaProducer() {
// Configure auth
Authentication authentication = new OAuth1(
TwitterConfiguration.CONSUMER_KEY,
TwitterConfiguration.CONSUMER_SECRET,
TwitterConfiguration.ACCESS_TOKEN,
TwitterConfiguration.TOKEN_SECRET);

// track the terms of your choice. here im only tracking #bigdata.
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Collections.singletonList(TwitterConfiguration.HASHTAG));

queue = new LinkedBlockingQueue<>(10000);

client = new ClientBuilder()
.hosts(Constants.STREAM_HOST)
.authentication(authentication)
.endpoint(endpoint)
.processor(new StringDelimitedProcessor(queue))
.build();
gson = new Gson();
callback = new BasicCallback();
}

private Producer<Long, String> getProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfiguration.SERVERS);
properties.put(ProducerConfig.ACKS_CONFIG, "1");
properties.put(ProducerConfig.LINGER_MS_CONFIG, 500);
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

return new KafkaProducer<>(properties);
}

public void run() {
client.connect();
try (Producer<Long, String> producer = getProducer()) {
while (true) {
Tweet tweet = gson.fromJson(queue.take(), Tweet.class);
System.out.printf("Fetched tweet id %d\n", tweet.getId());
long key = tweet.getId();
String msg = tweet.toString();
ProducerRecord<Long, String> record = new ProducerRecord<>(KafkaConfiguration.TOPIC, key, msg);
producer.send(record, callback);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ml.dhoomilsheta.app.producer.callback;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

public class BasicCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.printf("Message with offset %d acknowledged by partition %d\n",
metadata.offset(), metadata.partition());
} else {
System.out.println(exception.getMessage());
}
}
}
Empty file.
Loading

0 comments on commit 4805c5f

Please sign in to comment.