-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter_scraper.py
398 lines (325 loc) · 11.2 KB
/
twitter_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
import os
import sys
import pandas as pd
from datetime import datetime
from fake_headers import Headers
from time import sleep
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import (
NoSuchElementException,
StaleElementReferenceException,
WebDriverException,
)
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as FirefoxService
from webdriver_manager.firefox import GeckoDriverManager
TWITTER_LOGIN_URL = "https://twitter.com/i/flow/login"
# Cubrir estos datos para que funcione. Hay que añadir los datos de login (mail, username, password)
# Path es para incluir la ruta en la que está instalado el webdriver, el resto de variables
# son variables que se usan durante la ejecución del código
mail = ""
username = ""
password = ""
path = ""
tweet_ids = set()
data = []
tweet_cards = []
# Driver
def _get_driver():
header = Headers().generate()["User-Agent"]
# dependiendo del navegador que usemos, se usará
# la linea de chrome (en el comentario) o la de firefox
# browser_option = ChromeOptions()
browser_option = FirefoxOptions()
browser_option.add_argument("--no-sandbox")
browser_option.add_argument("--disable-dev-shm-usage")
browser_option.add_argument("--ignore-certificate-errors")
browser_option.add_argument("--disable-gpu")
browser_option.add_argument("--log-level=3")
browser_option.add_argument("--disable-notifications")
browser_option.add_argument("--disable-popup-blocking")
browser_option.add_argument("--user-agent={}".format(header))
# For Hiding Browser
browser_option.add_argument("--headless")
try:
driver = webdriver.Firefox(
options=browser_option,
)
return driver
except WebDriverException:
try:
print("Downloading FirefoxDriver...")
firefoxdriver_path = GeckoDriverManager().install()
firefox_service = FirefoxService(executable_path=firefoxdriver_path)
print("Initializing FirefoxDriver...")
driver = webdriver.Firefox(
service=firefox_service,
options=browser_option,
)
return driver
except Exception as e:
print(f"Error setting up WebDriver: {e}")
sys.exit(1)
# Buscamos el driver y si no lo tenemos descargado en el PC
# lo descarga automáticamente
driver = _get_driver()
###
def _input_username():
input_attempt = 0
while True:
try:
username_input = driver.find_element(
"xpath", "//input[@autocomplete='username']"
)
username_input.send_keys(mail)
username_input.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
print()
print(
"""There was an error inputting the username.
It may be due to the following:
- Internet connection is unstable
- Username is incorrect
- Twitter is experiencing unusual activity"""
)
driver.quit()
sys.exit(1)
else:
print("Re-attempting to input username...")
sleep(2)
def _input_unusual_activity():
input_attempt = 0
while True:
try:
unusual_activity = driver.find_element(
"xpath", "//input[@data-testid='ocfEnterTextTextInput']"
)
unusual_activity.send_keys(username)
unusual_activity.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
break
def _input_password():
input_attempt = 0
while True:
try:
password_input = driver.find_element(
"xpath", "//input[@autocomplete='current-password']"
)
password_input.send_keys(password)
password_input.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
print()
print(
"""There was an error inputting the password.
It may be due to the following:
- Internet connection is unstable
- Password is incorrect
- Twitter is experiencing unusual activity"""
)
driver.quit()
sys.exit(1)
else:
print("Re-attempting to input password...")
sleep(2)
# Función que hacer la operación de loguearse en twitter.
def login():
print()
print("Logging in to Twitter...") # escribe por terminal que se está accediendo al logging de twitter
try:
driver.maximize_window() # maximiza la ventana
driver.get(TWITTER_LOGIN_URL) # entra a la url de login
sleep(5) # espera a que cargue la página
_input_username() # introduce usuario
_input_unusual_activity()
_input_password() # introduce contraseña
cookies = driver.get_cookies() # crea las cookies para mantenerse conectado
auth_token = None
# guarda las cookies
for cookie in cookies:
if cookie["name"] == "auth_token":
auth_token = cookie["value"]
break
if auth_token is None:
raise ValueError(
"""This may be due to the following:
- Internet connection is unstable
- Username is incorrect
- Password is incorrect
"""
)
print()
print("Login Successful")
print()
except Exception as e:
print()
print(f"Login Failed: {e}")
sys.exit(1)
def go_to_path():
driver.get(f"https://twitter.com/{path}")
sleep(3)
def get_tweet_cards():
tweet_cards = driver.find_elements(
"xpath", '//article[@data-testid="tweet" and not(@disabled)]'
)
return tweet_cards
def remove_hidden_cards():
try:
hidden_cards = driver.find_elements(
"xpath", '//article[@data-testid="tweet" and @disabled]'
)
for card in hidden_cards[1:-2]:
driver.execute_script(
"arguments[0].parentNode.parentNode.parentNode.remove();", card
)
except Exception as e:
return
def get_tweet(card):
error = False
try:
handle = card.find_element(
"xpath", './/span[contains(text(), "@")]'
).text
except NoSuchElementException:
error = True
handle = "skip"
try:
date_time = card.find_element("xpath", ".//time").get_attribute(
"datetime"
)
if date_time is not None:
is_ad = False
except NoSuchElementException:
is_ad = True
error = True
date_time = "skip"
if error:
return
content = ""
contents = card.find_elements(
"xpath",
'(.//div[@data-testid="tweetText"])[1]/span | (.//div[@data-testid="tweetText"])[1]/a',
)
for c in contents:
content += c.text
return (
handle,
date_time,
content,
is_ad,
)
def scrape_tweets():
go_to_path()
# Aceptar cookies para que desaparezca el banner
try:
accept_cookies_btn = driver.find_element(
"xpath", "//span[text()='Refuse non-essential cookies']/../../..")
accept_cookies_btn.click()
except NoSuchElementException:
pass
refresh_count = 0
added_tweets = 0
empty_count = 0
retry_cnt = 0
# bucle principal que va iterando sobre los tweets y guardando
# la información en la variable data
while True:
try:
tweet_cards = get_tweet_cards()
added_tweets = 0
for card in tweet_cards[-15:]:
try:
tweet_id = str(card)
if tweet_id not in tweet_ids:
tweet_ids.add(tweet_id)
driver.execute_script(
"arguments[0].scrollIntoView();", card
)
tweet = get_tweet(card)
if tweet:
if tweet is not None:
if not tweet[-1]: # is_ad
data.append(tweet)
added_tweets += 1
else:
continue
else:
continue
else:
continue
else:
continue
except NoSuchElementException:
continue
if added_tweets == 0:
# Check if there is a button "Retry" and click on it with a regular basis until a certain amount of tries
try:
while retry_cnt < 15:
retry_button = driver.find_element(
"xpath", "//span[text()='Retry']/../../..")
sleep(58)
retry_button.click()
retry_cnt += 1
sleep(2)
# There is no Retry button so the counter is reseted
except NoSuchElementException:
retry_cnt = 0
if empty_count >= 5:
if refresh_count >= 3:
print()
print("No more tweets to scrape")
break
refresh_count += 1
empty_count += 1
sleep(1)
else:
empty_count = 0
refresh_count = 0
except StaleElementReferenceException:
sleep(2)
continue
print("")
# se guardan los datos scrapeados en formato CSV
def save_to_csv():
print("Saving Tweets to CSV...")
now = datetime.now()
folder_path = "./tweets/"
# los datos se van a guardar en la carpeta tweets de nuestro directorio del script
# así que si no existe esa carpeta, la crea.
if not os.path.exists(folder_path):
os.makedirs(folder_path)
print("Created Folder: {}".format(folder_path))
# se guarda el usuario, hora y contenido del tweet
csv_data = {
"Handle": [tweet[0] for tweet in data],
"Timestamp": [tweet[1] for tweet in data],
"Content": [tweet[2] for tweet in data],
}
# se genera el dataframe con pandas (formato para almacenar los datos
# en un fichero estructurado
df = pd.DataFrame(csv_data)
current_time = now.strftime("%Y-%m-%d_%H-%M-%S")
file_path = f"{folder_path}{current_time}_tweets.csv"
pd.set_option("display.max_colwidth", None)
df.to_csv(file_path, index=False, encoding="utf-8")
print("CSV Saved: {}".format(file_path))
# Código principal del script
def main():
login()
scrape_tweets()
save_to_csv()
# se procesará esta función al ejecutar el código
if __name__ == "__main__":
main()