-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweet_consumer.py
73 lines (51 loc) · 1.72 KB
/
tweet_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from multiprocessing.dummy import Pool as ThreadPool
import time
from kafka import KafkaConsumer
import boto3
from textblob import TextBlob
import json
sns = boto3.resource('sns',aws_access_key_id='',
aws_secret_access_key='' )
topic = sns.Topic('')
def getkafkadata(n):
consumer = KafkaConsumer('tweet',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')),
auto_offset_reset='earliest', enable_auto_commit=False)
# code to retrieve all text data from Kafka Broker
count = 0
for message in consumer:
try:
#print(message)
text = message.value
text = json.loads(text)
sentiment = TextBlob(text['text'])
loc = text['coordinates']
text['sentiment'] = sentiment.sentiment.polarity
print(text)
time.sleep(0.2)
response = topic.publish(
Message = json.dumps(text),
MessageAttributes = {
}
)
print(response)
count += 1
except Exception as e:
print(e)
return text
# function to be mapped over
def calculateParallel(numbers, threads):
# configuring the worker pool
pool = ThreadPool(threads)
results = pool.map(getkafkadata,numbers)
#print(results)
pool.close()
pool.join()
return results
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5, 6]
for n in range(50):
tweet_text = calculateParallel(numbers, 10)
#print "Tweet is ", tweet_text
print(n)