diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..cbb89a3 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/kafka-twitter-producer.iml b/kafka-twitter-producer.iml new file mode 100644 index 0000000..78b2cc5 --- /dev/null +++ b/kafka-twitter-producer.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..81e480e --- /dev/null +++ b/pom.xml @@ -0,0 +1,80 @@ + + + + 4.0.0 + + ml.dhoomilsheta.app + kafka-twitter-producer + 1.0-SNAPSHOT + + kafka-twitter-producer + + http://www.example.com + + + UTF-8 + 1.7 + 1.7 + + + + + junit + junit + 4.11 + test + + + org.apache.kafka + kafka-clients + 2.0.0 + + + com.twitter + hbc-core + 2.2.0 + + + com.google.code.gson + gson + 2.8.5 + + + + + + + + maven-clean-plugin + 3.0.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.7.0 + + + maven-surefire-plugin + 2.20.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + + diff --git a/src/main/java/ml/dhoomilsheta/app/App.java b/src/main/java/ml/dhoomilsheta/app/App.java new file mode 100644 index 0000000..4aac6f6 --- /dev/null +++ b/src/main/java/ml/dhoomilsheta/app/App.java @@ -0,0 +1,14 @@ +package ml.dhoomilsheta.app; + +import ml.dhoomilsheta.app.producer.TwitterKafkaProducer; + +/** + * Hello world! + */ +public class App { + public static void main(String[] args) { + System.out.println("Hello World!"); + TwitterKafkaProducer producer = new TwitterKafkaProducer(); + producer.run(); + } +} diff --git a/src/main/java/ml/dhoomilsheta/app/config/KafkaConfiguration.java b/src/main/java/ml/dhoomilsheta/app/config/KafkaConfiguration.java new file mode 100644 index 0000000..c8b00f8 --- /dev/null +++ b/src/main/java/ml/dhoomilsheta/app/config/KafkaConfiguration.java @@ -0,0 +1,7 @@ +package ml.dhoomilsheta.app.config; + +public class KafkaConfiguration { + public static final String SERVERS = "bigdata-1:9092, bigdata-2:9092, bigdata-3:9092"; + public static final String TOPIC = "bigdata-tweets"; + public static final long SLEEP_TIMER = 1000; +} diff --git a/src/main/java/ml/dhoomilsheta/app/config/TwitterConfiguration.java b/src/main/java/ml/dhoomilsheta/app/config/TwitterConfiguration.java new file mode 100644 index 0000000..df1a8bf --- /dev/null +++ b/src/main/java/ml/dhoomilsheta/app/config/TwitterConfiguration.java @@ -0,0 +1,9 @@ +package ml.dhoomilsheta.app.config; + +public class TwitterConfiguration { + public static final String CONSUMER_KEY = "PbHuTRuSqSHTUzAJGJuqZAZoz"; + public static final String CONSUMER_SECRET = "BjpUlQLT7lVuXR7kI0PJCYS77b6hFDuQKYLLex7ETfKRYDiLpX"; + public static final String ACCESS_TOKEN = "319641898-eSXscJnl6aTyExpEZUCVi6RyQ8JzRPIQ4R9qFDA5"; + public static final String TOKEN_SECRET = "1AYcrO4wodtFxVZKWv5UB5v7eKgZq21wJ8xaTRYkvhCKU"; + public static final String HASHTAG = "#bigdata"; +} diff --git a/src/main/java/ml/dhoomilsheta/app/model/Tweet.java b/src/main/java/ml/dhoomilsheta/app/model/Tweet.java new file mode 100644 index 0000000..280a27b --- /dev/null +++ b/src/main/java/ml/dhoomilsheta/app/model/Tweet.java @@ -0,0 +1,85 @@ +package ml.dhoomilsheta.app.model; + +import com.google.gson.annotations.SerializedName; + +public class Tweet { + private long id; + private String text; + private String lang; + private User user; + + @SerializedName("retweet_count") + private int retweetCount; + + @SerializedName("favorite_count") + private int favoriteCount; + + public Tweet(long id, String text, String lang, User user, int retweetCount, int favoriteCount) { + this.id = id; + this.text = text; + this.lang = lang; + this.user = user; + this.retweetCount = retweetCount; + this.favoriteCount = favoriteCount; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public String getLang() { + return lang; + } + + public void setLang(String lang) { + this.lang = lang; + } + + public User getUser() { + return user; + } + + public void setUser(User user) { + this.user = user; + } + + public int getRetweetCount() { + return retweetCount; + } + + public void setRetweetCount(int retweetCount) { + this.retweetCount = retweetCount; + } + + public int getFavoriteCount() { + return favoriteCount; + } + + public void setFavoriteCount(int favoriteCount) { + this.favoriteCount = favoriteCount; + } + + @Override + public String toString() { + return "Tweet{" + + "id=" + id + + ", text='" + text + '\'' + + ", lang='" + lang + '\'' + + ", user=" + user + + ", retweetCount=" + retweetCount + + ", favoriteCount=" + favoriteCount + + '}'; + } +} diff --git a/src/main/java/ml/dhoomilsheta/app/model/User.java b/src/main/java/ml/dhoomilsheta/app/model/User.java new file mode 100644 index 0000000..d9060a7 --- /dev/null +++ b/src/main/java/ml/dhoomilsheta/app/model/User.java @@ -0,0 +1,74 @@ +package ml.dhoomilsheta.app.model; + +import com.google.gson.annotations.SerializedName; + +public class User { + private long id; + private String name; + + @SerializedName("screen_name") + private String screenName; + private String location; + + @SerializedName("followers_count") + private int followersCount; + + public User(long id, String name, String screenName, String location, int followersCount) { + this.id = id; + this.name = name; + this.screenName = screenName; + this.location = location; + this.followersCount = followersCount; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getScreenName() { + return screenName; + } + + public void setScreenName(String screenName) { + this.screenName = screenName; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public int getFollowersCount() { + return followersCount; + } + + public void setFollowersCount(int followersCount) { + this.followersCount = followersCount; + } + + @Override + public String toString() { + return "User{" + + "id=" + id + + ", name='" + name + '\'' + + ", screenName='" + screenName + '\'' + + ", location='" + location + '\'' + + ", followersCount=" + followersCount + + '}'; + } +} diff --git a/src/main/java/ml/dhoomilsheta/app/producer/TwitterKafkaProducer.java b/src/main/java/ml/dhoomilsheta/app/producer/TwitterKafkaProducer.java new file mode 100644 index 0000000..d3cc30c --- /dev/null +++ b/src/main/java/ml/dhoomilsheta/app/producer/TwitterKafkaProducer.java @@ -0,0 +1,84 @@ +package ml.dhoomilsheta.app.producer; + +import com.google.gson.Gson; +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.core.Client; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; +import com.twitter.hbc.core.processor.StringDelimitedProcessor; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; +import ml.dhoomilsheta.app.config.KafkaConfiguration; +import ml.dhoomilsheta.app.config.TwitterConfiguration; +import ml.dhoomilsheta.app.model.Tweet; +import ml.dhoomilsheta.app.producer.callback.BasicCallback; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + +public class TwitterKafkaProducer { + private Client client; + private BlockingQueue queue; + private Gson gson; + private Callback callback; + + public TwitterKafkaProducer() { + // Configure auth + Authentication authentication = new OAuth1( + TwitterConfiguration.CONSUMER_KEY, + TwitterConfiguration.CONSUMER_SECRET, + TwitterConfiguration.ACCESS_TOKEN, + TwitterConfiguration.TOKEN_SECRET); + + // track the terms of your choice. here im only tracking #bigdata. + StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint(); + endpoint.trackTerms(Collections.singletonList(TwitterConfiguration.HASHTAG)); + + queue = new LinkedBlockingQueue<>(10000); + + client = new ClientBuilder() + .hosts(Constants.STREAM_HOST) + .authentication(authentication) + .endpoint(endpoint) + .processor(new StringDelimitedProcessor(queue)) + .build(); + gson = new Gson(); + callback = new BasicCallback(); + } + + private Producer getProducer() { + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfiguration.SERVERS); + properties.put(ProducerConfig.ACKS_CONFIG, "1"); + properties.put(ProducerConfig.LINGER_MS_CONFIG, 500); + properties.put(ProducerConfig.RETRIES_CONFIG, 0); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + return new KafkaProducer<>(properties); + } + + public void run() { + client.connect(); + try (Producer producer = getProducer()) { + while (true) { + Tweet tweet = gson.fromJson(queue.take(), Tweet.class); + System.out.printf("Fetched tweet id %d\n", tweet.getId()); + long key = tweet.getId(); + String msg = tweet.toString(); + ProducerRecord record = new ProducerRecord<>(KafkaConfiguration.TOPIC, key, msg); + producer.send(record, callback); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + client.stop(); + } + } +} diff --git a/src/main/java/ml/dhoomilsheta/app/producer/callback/BasicCallback.java b/src/main/java/ml/dhoomilsheta/app/producer/callback/BasicCallback.java new file mode 100644 index 0000000..0cb6a3b --- /dev/null +++ b/src/main/java/ml/dhoomilsheta/app/producer/callback/BasicCallback.java @@ -0,0 +1,16 @@ +package ml.dhoomilsheta.app.producer.callback; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; + +public class BasicCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) { + System.out.printf("Message with offset %d acknowledged by partition %d\n", + metadata.offset(), metadata.partition()); + } else { + System.out.println(exception.getMessage()); + } + } +} diff --git a/src/main/resources/kafka.properties b/src/main/resources/kafka.properties new file mode 100644 index 0000000..e69de29 diff --git a/src/main/resources/twitter.properties b/src/main/resources/twitter.properties new file mode 100644 index 0000000..0294028 --- /dev/null +++ b/src/main/resources/twitter.properties @@ -0,0 +1,5 @@ +consumer.key=PbHuTRuSqSHTUzAJGJuqZAZoz +consumer.secret=BjpUlQLT7lVuXR7kI0PJCYS77b6hFDuQKYLLex7ETfKRYDiLpX +access.token=319641898-eSXscJnl6aTyExpEZUCVi6RyQ8JzRPIQ4R9qFDA5 +access.secret=1AYcrO4wodtFxVZKWv5UB5v7eKgZq21wJ8xaTRYkvhCKU +hashtag="#bigdata" \ No newline at end of file diff --git a/src/test/java/ml/dhoomilsheta/app/AppTest.java b/src/test/java/ml/dhoomilsheta/app/AppTest.java new file mode 100644 index 0000000..d5fb9a6 --- /dev/null +++ b/src/test/java/ml/dhoomilsheta/app/AppTest.java @@ -0,0 +1,20 @@ +package ml.dhoomilsheta.app; + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Unit test for simple App. + */ +public class AppTest +{ + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() + { + assertTrue( true ); + } +} diff --git a/target/classes/kafka.properties b/target/classes/kafka.properties new file mode 100644 index 0000000..e69de29 diff --git a/target/classes/ml/dhoomilsheta/app/App.class b/target/classes/ml/dhoomilsheta/app/App.class new file mode 100644 index 0000000..1effa82 Binary files /dev/null and b/target/classes/ml/dhoomilsheta/app/App.class differ diff --git a/target/classes/ml/dhoomilsheta/app/config/KafkaConfiguration.class b/target/classes/ml/dhoomilsheta/app/config/KafkaConfiguration.class new file mode 100644 index 0000000..7e6187d Binary files /dev/null and b/target/classes/ml/dhoomilsheta/app/config/KafkaConfiguration.class differ diff --git a/target/classes/ml/dhoomilsheta/app/config/TwitterConfiguration.class b/target/classes/ml/dhoomilsheta/app/config/TwitterConfiguration.class new file mode 100644 index 0000000..5225d68 Binary files /dev/null and b/target/classes/ml/dhoomilsheta/app/config/TwitterConfiguration.class differ diff --git a/target/classes/ml/dhoomilsheta/app/model/Tweet.class b/target/classes/ml/dhoomilsheta/app/model/Tweet.class new file mode 100644 index 0000000..641c97f Binary files /dev/null and b/target/classes/ml/dhoomilsheta/app/model/Tweet.class differ diff --git a/target/classes/ml/dhoomilsheta/app/model/User.class b/target/classes/ml/dhoomilsheta/app/model/User.class new file mode 100644 index 0000000..aeda244 Binary files /dev/null and b/target/classes/ml/dhoomilsheta/app/model/User.class differ diff --git a/target/classes/ml/dhoomilsheta/app/producer/TwitterKafkaProducer.class b/target/classes/ml/dhoomilsheta/app/producer/TwitterKafkaProducer.class new file mode 100644 index 0000000..26ddf51 Binary files /dev/null and b/target/classes/ml/dhoomilsheta/app/producer/TwitterKafkaProducer.class differ diff --git a/target/classes/ml/dhoomilsheta/app/producer/callback/BasicCallback.class b/target/classes/ml/dhoomilsheta/app/producer/callback/BasicCallback.class new file mode 100644 index 0000000..e60cd89 Binary files /dev/null and b/target/classes/ml/dhoomilsheta/app/producer/callback/BasicCallback.class differ diff --git a/target/classes/twitter.properties b/target/classes/twitter.properties new file mode 100644 index 0000000..0294028 --- /dev/null +++ b/target/classes/twitter.properties @@ -0,0 +1,5 @@ +consumer.key=PbHuTRuSqSHTUzAJGJuqZAZoz +consumer.secret=BjpUlQLT7lVuXR7kI0PJCYS77b6hFDuQKYLLex7ETfKRYDiLpX +access.token=319641898-eSXscJnl6aTyExpEZUCVi6RyQ8JzRPIQ4R9qFDA5 +access.secret=1AYcrO4wodtFxVZKWv5UB5v7eKgZq21wJ8xaTRYkvhCKU +hashtag="#bigdata" \ No newline at end of file diff --git a/target/kafka-twitter-producer-1.0-SNAPSHOT.jar b/target/kafka-twitter-producer-1.0-SNAPSHOT.jar new file mode 100644 index 0000000..1f3e32e Binary files /dev/null and b/target/kafka-twitter-producer-1.0-SNAPSHOT.jar differ diff --git a/target/maven-archiver/pom.properties b/target/maven-archiver/pom.properties new file mode 100644 index 0000000..4e6d429 --- /dev/null +++ b/target/maven-archiver/pom.properties @@ -0,0 +1,4 @@ +#Created by Apache Maven 3.3.9 +version=1.0-SNAPSHOT +groupId=ml.dhoomilsheta.app +artifactId=kafka-twitter-producer diff --git a/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 0000000..0de74b1 --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,7 @@ +ml/dhoomilsheta/app/model/Tweet.class +ml/dhoomilsheta/app/producer/callback/BasicCallback.class +ml/dhoomilsheta/app/App.class +ml/dhoomilsheta/app/config/KafkaConfiguration.class +ml/dhoomilsheta/app/producer/TwitterKafkaProducer.class +ml/dhoomilsheta/app/model/User.class +ml/dhoomilsheta/app/config/TwitterConfiguration.class diff --git a/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 0000000..9ed3018 --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1,7 @@ +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/main/java/ml/dhoomilsheta/app/producer/TwitterKafkaProducer.java +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/main/java/ml/dhoomilsheta/app/model/User.java +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/main/java/ml/dhoomilsheta/app/producer/callback/BasicCallback.java +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/main/java/ml/dhoomilsheta/app/config/KafkaConfiguration.java +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/main/java/ml/dhoomilsheta/app/config/TwitterConfiguration.java +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/main/java/ml/dhoomilsheta/app/model/Tweet.java +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/main/java/ml/dhoomilsheta/app/App.java diff --git a/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst new file mode 100644 index 0000000..ddea187 --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst @@ -0,0 +1 @@ +ml/dhoomilsheta/app/AppTest.class diff --git a/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst new file mode 100644 index 0000000..8063f32 --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst @@ -0,0 +1 @@ +/Users/dhoomilbsheta/Personal/kafka-twitter-producer/src/test/java/ml/dhoomilsheta/app/AppTest.java diff --git a/target/surefire-reports/TEST-ml.dhoomilsheta.app.AppTest.xml b/target/surefire-reports/TEST-ml.dhoomilsheta.app.AppTest.xml new file mode 100644 index 0000000..0bcf4e1 --- /dev/null +++ b/target/surefire-reports/TEST-ml.dhoomilsheta.app.AppTest.xml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/target/surefire-reports/ml.dhoomilsheta.app.AppTest.txt b/target/surefire-reports/ml.dhoomilsheta.app.AppTest.txt new file mode 100644 index 0000000..d2add23 --- /dev/null +++ b/target/surefire-reports/ml.dhoomilsheta.app.AppTest.txt @@ -0,0 +1,4 @@ +------------------------------------------------------------------------------- +Test set: ml.dhoomilsheta.app.AppTest +------------------------------------------------------------------------------- +Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.015 s - in ml.dhoomilsheta.app.AppTest diff --git a/target/test-classes/ml/dhoomilsheta/app/AppTest.class b/target/test-classes/ml/dhoomilsheta/app/AppTest.class new file mode 100644 index 0000000..5408f8f Binary files /dev/null and b/target/test-classes/ml/dhoomilsheta/app/AppTest.class differ