-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgetRecentTweet.py
94 lines (78 loc) · 3.05 KB
/
getRecentTweet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import requests
import os
import json
# To set your environment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'
class getRecentTweet:
user_params = ['handle', 'username']
def __init__(self, input):
self.id = input.get('id', '1')
self.request_data = input.get('data')
def validate_request_data(self):
if self.request_data is None:
return False
if self.request_data == {}:
return False
return True
def auth(self):
return os.environ.get("TWITTER_BEARER_TOKEN")
def set_params(self):
for param in self.user_params:
self.user_param = self.request_data.get(param)
if self.user_param is not None:
break
def create_url(self):
query = "from:" + self.user_param
# Tweet fields are adjustable.
# Options include:
# attachments, author_id, context_annotations,
# conversation_id, created_at, entities, geo, id,
# in_reply_to_user_id, lang, non_public_metrics, organic_metrics,
# possibly_sensitive, promoted_metrics, public_metrics, referenced_tweets,
# source, text, and withheld
tweet_fields = "tweet.fields=author_id"
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}".format(
query, tweet_fields
)
return url
def create_headers(self, bearer_token):
headers = {"Authorization": "Bearer {}".format(bearer_token)}
return headers
def connect_to_endpoint(self, url, headers):
response = requests.request("GET", url, headers=headers)
print(response.status_code)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()
def result_success(self, data):
self.result = {
'jobRunID': self.id,
'data': self.result, # was data
'result': self.result,
'statusCode': 200,
}
def extractAddress(self, tweet):
address = ""
for i in range(0, len(tweet) - 1):
if tweet[i] == "0" and tweet[i+1] == "x":
address = tweet[i:]
break
if address == "":
return "No address found"
else:
return address
def grabRecent(self):
bearer_token = self.auth()
print(bearer_token)
self.set_params()
url = self.create_url()
headers = self.create_headers(bearer_token)
json_response = self.connect_to_endpoint(url, headers)
# print(json.dumps(json_response, indent=4, sort_keys=True))
# self.result = json_response['data'][0]['text']
try:
address = self.extractAddress(json_response['data'][0]['text'])
self.result = int(address, 0) # add error check if python can't convert tweet to a number then have it return 0 which we know no user has the zero address
except:
self.result = 0
self.result_success(json_response)