-
Notifications
You must be signed in to change notification settings - Fork 2
/
timeline_tweets_to_file.py
305 lines (236 loc) · 11.3 KB
/
timeline_tweets_to_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Apr 9 11:04:24 2021
INFO
####
This script downloads Tweets of users from the full archive of Twitter using the academic
access API. It downloads all tweets per user from the given time range and saves
the results on a user-by-user basis. Please note the user list csv has to have the column
name usr_id
REQUIREMENTS
############
Files:
.twitter_keys.yaml in the script directory
premsearch_config.yaml in the script directory
Installed:
Python 3.8 or newer
Python packages:
searchtweetsv2
pandas
USAGE
#####
Run the script by typing:
python timeline_tweets_to_file.py -ul /path/to/list.csv -sd YEAR-MO-DA -ed YEAR-MO-DA -o pkl -op ~/path/to/folder/ -c 50
Replace YEAR with the year you want, MO with the month you want and DA with the
day of the month you want.
NOTE
####
The collector collects tweets starting from 00:00 hours on the starting day and
ends the collection on 23:59:59 on the day before the end date.
@author: Tuomas Väisänen & Seija Sirkiä
"""
from util_functions import v2parser, daterange
from searchtweets import ResultStream, gen_request_parameters, load_credentials, read_config
from datetime import datetime
import time
import argparse
import pandas as pd
import gc
# define function to iterate over users in chunks
def chunker(sequence, size):
return (sequence[pos:pos + size] for pos in range(0, len(sequence), size))
# Set up the argument parser
ap = argparse.ArgumentParser()
# Get userlist csv
ap.add_argument("-ul", "--userlist", required=True,
help="Path to userlist CSV file")
# get start date
ap.add_argument("-sd", "--startdate", required=True,
type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
help="Start date of the collection in the following form: "
" YEAR-MO-DA for example 2018-01-01")
# Get end date
ap.add_argument("-ed", "--enddate", required=True,
type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
help="End date of the collection in the following form: "
" YEAR-MO-DA for example 2018-02-18")
# get save format
ap.add_argument("-o", "--output", required=True, default='pkl',
help="Output file format, valid options are either pkl or csv. "
"Default: pkl")
# get output path
ap.add_argument("-op", "--outpath", required=False,
help="Path to output folder. For example: "
"~/Data/project/results/")
# get user chunk size
ap.add_argument("-c", "--chunksize", required=False, default=20,
help="The number of users to save into single dataframe."
"Default: 20")
# Parse arguments
args = vars(ap.parse_args())
# get output path
outpath = args['output']
# get chunk size
user_chunksize = args['chunksize']
# check if output filetypes are valid
if args['output'] == 'pkl':
# save to pickle
print('[INFO] - Output file set to pickle')
elif args['output'] == 'csv':
# save to csv
print('[INFO] - Output file set to csv')
else:
print('[INFO] - Invalid output file! Valid options are pickle or csv. Exiting...')
exit
# read user list here
users = pd.read_csv(args['userlist'])
# convert to list
users = users['usr_id'].values.tolist()
# load twitter keys
search_creds = load_credentials('.twitter_keys.yaml',
yaml_key = 'search_tweets_v2',
env_overwrite = False)
# load configuration for search query
config = read_config('search_config.yaml')
# fields for v2 api
tweetfields = ",".join(["attachments", "author_id", "conversation_id", "created_at",
"entities", "geo", "id", "in_reply_to_user_id", "lang",
"public_metrics", "possibly_sensitive", "referenced_tweets",
"reply_settings", "text", "withheld",])
userfields = ",".join(["created_at", "description", "entities", "location",
"name", "profile_image_url", "protected", "public_metrics",
"url", "username", "verified", "withheld"])
mediafields= ",".join(["media_key", "type", "url"])
placefields = ",".join(["contained_within", "country", "country_code", "full_name",
"geo", "id", "name", "place_type"])
expansions = ",".join(["attachments.media_keys", "author_id", "entities.mentions.username",
"geo.place_id", "in_reply_to_user_id", "referenced_tweets.id",
"referenced_tweets.id.author_id"])
# set interval to loop through
start_date = args['startdate'].date()
end_date = args['enddate'].date()
# get chunks of user list
for userchunk in chunker(users, 20):
print('users in chunk ' + str(len(userchunk)) + ' and second user id is ' + str(userchunk[1]))
# get list for user dataframes
dflist = []
# get first and last users
fuser = userchunk[0]
luser = userchunk[-1]
# loop over users in chunk
for user in userchunk:
# form search query per user and rule out retweets, replies and quote tweets
search_q = 'from:{} -is:retweet has:geo'.format(user)
# payload rules for v2 api
rule = gen_request_parameters(query = search_q,
results_per_call = config['results_per_call'],
start_time = start_date.isoformat(),
end_time = end_date.isoformat(),
tweet_fields = tweetfields,
user_fields = userfields,
media_fields = mediafields,
place_fields = placefields,
expansions = expansions,
stringify = False)
# result stream from twitter v2 api
rs = ResultStream(request_parameters = rule,
max_results=100000,
max_pages=1,
max_tweets = config['max_tweets'],
**search_creds)
# number of reconnection tries
tries = 10
# while loop to protect against 104 error
while True:
tries -= 1
# attempt retrieving tweets
try:
# indicate which day is getting retrieved
print('[INFO] - Retrieving tweets between ' + str(start_date) + ' and ' + str(end_date))
# get json response to list
tweets = list(rs.stream())
# add check to adjust wait time
if len(tweets) < 500:
# wait for 7 seconds to not hit rate limits if zero tweets
time.sleep(7)
else:
# wait a bit longer
time.sleep(18)
# break free from while loop
break
except Exception as err:
if tries == 0:
raise err
else:
print('[INFO] - Got connection error, waiting 15 seconds and trying again. ' + str(tries) + ' tries left.')
time.sleep(15)
# inform how many tweets per user were collected
print('[INFO] - Collected ' + str(len(tweets)) + ' tweets from user ' + str(user))
# parse results to dataframe
if len(tweets) != 0:
try:
# convert json to dataframe
print('[INFO] - Parsing collected tweets of user ' + str(user) + ' from ' + str(start_date) + ' to ' + str(end_date))
tweetdf = v2parser(tweets, config['results_per_call'])
except:
print('[INFO] - User id ' + str(user) + ' tweets could not be converted to dataframe..')
pass
else:
print('[INFO] - User id ' + str(user) + ' is missing or has no tweets. Moving on...')
pass
# try to order columns semantically
try:
tweetdf = tweetdf[['id', 'author_id', 'created_at', 'reply_settings', 'conversation_id',
'in_reply_to_user_id', 'text', 'possibly_sensitive',
'lang', 'referenced_tweets', 'referenced_tweets.id',
'referenced_tweets.author_id', 'referenced_tweets.type',
'public_metrics.retweet_count', 'public_metrics.reply_count',
'public_metrics.like_count', 'public_metrics.quote_count',
'entities.mentions', 'entities.urls', 'entities.hashtags',
'entities.annotations', 'attachments.media_keys',
'attachments.media_types', 'user.description', 'user.verified', 'user.id', 'user.protected',
'user.url', 'user.profile_image_url', 'user.location', 'user.name',
'user.created_at', 'user.username', 'user.public_metrics.followers_count',
'user.public_metrics.following_count', 'user.public_metrics.tweet_count',
'user.public_metrics.listed_count', 'user.entities.description.hashtags',
'user.entities.url.urls', 'user.entities.description.mentions',
'user.entities.description.urls', 'geo.place_id', 'geo.coordinates.type',
'geo.coordinates.coordinates', 'geo.coordinates.x', 'geo.coordinates.y',
'geo.full_name', 'geo.name', 'geo.place_type', 'geo.country',
'geo.country_code', 'geo.type', 'geo.bbox', 'geo.centroid',
'geo.centroid.x', 'geo.centroid.y']]
except:
pass
# append dataframe to dataframe list
try:
dflist.append(tweetdf)
gc.collect()
# delete dataframe variable to cleanse the memory buffer
del tweetdf
except:
print('[INFO] - No tweets saved because users have no content from the time period. Moving on..')
pass
# concatenate result dataframe
if len(dflist) > 0:
# concatenate
results = pd.concat(dflist, ignore_index=True)
# set up file prefix from config
file_prefix_w_date = config['filename_prefix'] + start_date.isoformat()
userstring = '_from_' + str(fuser) + '_to_' + str(luser)
outpickle = file_prefix_w_date + userstring + '.pkl'
outcsv = file_prefix_w_date + userstring + '.csv'
# save to file
if args['output'] == 'pkl':
# save to pickle
results.to_pickle(outpath + outpickle)
elif args['output'] == 'csv':
# save to csv
results.to_csv(outpath + outcsv, sep=';', encoding='utf-8')
# free up memory
del results, dflist
gc.collect()
else:
print('[INFO] - No data in current user chunk. Moving on...')
pass
print('[INFO] - ... done!')