-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
200 lines (160 loc) · 6.22 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import os
import time
import threading
import argparse
import nltk
from src.data.reddit_collector import RedditCollector
from src.data.news_collector import NewsCollector
from src.data.preprocessor import TextPreprocessor
from src.models.classifier import SentimentClassifier
from src.visualization.dashboard import SentimentDashboard
def setup_dependencies():
"""Download required NLTK data."""
print("Setting up dependencies...")
# Download all necessary NLTK data packages
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
print("Setup complete.")
def run_reddit_collector(stop_event):
"""Run Reddit collection in a separate thread."""
try:
collector = RedditCollector()
collector.start_collection()
# Keep running until stop event is set
while not stop_event.is_set():
time.sleep(1)
print("Reddit collection stopped")
except Exception as e:
print(f"Error in Reddit collector: {e}")
def run_news_collector(stop_event):
"""Run news collection in a separate thread."""
try:
collector = NewsCollector()
while not stop_event.is_set():
collector.collect_news()
# Sleep for the configured interval before fetching news again
time.sleep(60 * 60) # Collect news every hour
except Exception as e:
print(f"Error in News collector: {e}")
def run_preprocessor(stop_event):
"""Run post preprocessing in a separate thread."""
preprocessor = TextPreprocessor()
while not stop_event.is_set():
# Process a batch of posts
processed = preprocessor.process_unprocessed_tweets(batch_size=50)
# If no posts were processed, wait a bit
if processed == 0:
time.sleep(10)
else:
time.sleep(2)
def run_classifier(stop_event):
"""Run sentiment classification in a separate thread."""
try:
classifier = SentimentClassifier()
while not stop_event.is_set():
# Classify a batch of tweets
classified = classifier.classify_processed_tweets(batch_size=50)
# If no tweets were classified, wait a bit
if classified == 0:
time.sleep(10)
else:
time.sleep(2)
except FileNotFoundError:
print("Error: Sentiment model not found. Please train a model first.")
return
def run_dashboard():
"""Run the visualization dashboard."""
dashboard = SentimentDashboard()
dashboard.run(debug=False)
def run_pipeline():
"""Run the complete sentiment analysis pipeline."""
stop_event = threading.Event()
try:
# Start Reddit collector thread
reddit_thread = threading.Thread(target=run_reddit_collector, args=(stop_event,))
reddit_thread.daemon = True
reddit_thread.start()
print("Reddit collector started")
# Start News collector thread
news_thread = threading.Thread(target=run_news_collector, args=(stop_event,))
news_thread.daemon = True
news_thread.start()
print("News collector started")
# Start preprocessor thread
preprocessor_thread = threading.Thread(
target=run_preprocessor, args=(stop_event,)
)
preprocessor_thread.daemon = True
preprocessor_thread.start()
print("Content preprocessor started")
# Check if model exists before starting classifier
if os.path.exists("models/saved_models/sentiment_model.pkl"):
classifier_thread = threading.Thread(
target=run_classifier, args=(stop_event,)
)
classifier_thread.daemon = True
classifier_thread.start()
print("Sentiment classifier started")
else:
print("Warning: No sentiment model found. Classification will not run.")
print("Train a model using the trainer module first.")
# Start dashboard (this will block until dashboard is closed)
print("Starting dashboard...")
run_dashboard()
except KeyboardInterrupt:
print("\nStopping all processes...")
finally:
# Set the event to signal threads to exit
stop_event.set()
time.sleep(2) # Give threads time to clean up
print("Pipeline stopped")
def train_model():
"""Train a sentiment analysis model using sample data."""
# First ensure all dependencies are set up
setup_dependencies()
from src.models.trainer import SentimentModelTrainer
print("Model training requires a labeled dataset.")
print("Please download a dataset like Sentiment140 from Kaggle:")
print("https://www.kaggle.com/datasets/kazanova/sentiment140")
print("and place it in the 'data/raw' directory.")
# Check if both compressed or uncompressed versions might exist
file_path = r"data\raw\training.1600000.processed.noemoticon.csv"
zip_path = r"data\raw\training.1600000.processed.noemoticon.csv.zip"
if os.path.exists(zip_path):
file_path = zip_path
elif not os.path.exists(file_path):
print(f"Error: File not found at {file_path} or {zip_path}")
return
print(f"Using dataset: {file_path}")
# For Sentiment140:
# Column 0: sentiment (0=negative, 4=positive)
# Column 5: tweet text
text_column = 5
label_column = 0
trainer = SentimentModelTrainer()
trainer.train_from_file(
file_path=file_path, text_column=text_column, label_column=label_column
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Reddit & News Sentiment Analysis Pipeline")
parser.add_argument(
"--train", action="store_true", help="Train the sentiment model"
)
parser.add_argument(
"--dashboard-only", action="store_true", help="Run only the dashboard"
)
parser.add_argument(
"--setup", action="store_true", help="Just download dependencies"
)
args = parser.parse_args()
if args.setup:
setup_dependencies()
elif args.train:
train_model()
elif args.dashboard_only:
run_dashboard()
else:
# Ensure dependencies are set up before running the pipeline
setup_dependencies()
run_pipeline()