-
Notifications
You must be signed in to change notification settings - Fork 0
/
authenticator.py
137 lines (99 loc) · 4.59 KB
/
authenticator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import flask
import google.oauth2.credentials
import google_auth_oauthlib.flow
import urllib.parse
import json
from my_library import mThreading
import requests
class Authenticate:
def __init__(self, credentials_json='client_secret.json', permissions=['https://www.googleapis.com/auth/youtube.upload'], redirect_uri=None):
"""
Actually i don't remember how this works because oauth is a little complicated
and i had forgotten to do a step one time so, yep, i'm more confused than you
but it works with all the steps ok
Just know that in the api you should use the urls:
http://localhost:5000/
and the redirect:
http://localhost:5000/oauth2callback
:param credentials_json: the client secret file from the google oauth api
:param permissions: well what you will want to do with your program (i know this repository is just for reposting but idc, i might want to use it for another thing)
:param redirect_uri: i dont remember what this does but i think it is this: http://localhost:5000
"""
self.credentials_json = credentials_json
self.permissions = permissions
self.redirect_uri = redirect_uri
if self.redirect_uri is None:
with open(self.credentials_json, "r") as credentials_json_file:
self.redirect_uri = json.load(credentials_json_file)["web"]["redirect_uris"][0]
self.app = flask.Flask(__name__)
def start(self):
authorization_url, state = self.get_authorization_url()
@self.app.route('/')
def bar():
return flask.redirect(authorization_url)
@self.app.route('/oauth2callback')
def oauth2callback():
with open("code.json", "w") as token_json:
args = flask.request.args.to_dict()
args['url'] = flask.request.url
token_json.write(json.dumps(args))
return flask.request.args
def get_authorization_url(self):
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
self.credentials_json,
self.permissions)
flow.redirect_uri = self.redirect_uri
authorization_url, state = flow.authorization_url(
access_type='offline',
include_granted_scopes='true')
print(authorization_url)
return authorization_url, state
@staticmethod
def credentials_to_dict(credentials):
return {'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes}
def exchange_code(self):
"""
with open("code.json", "r") as token_json:
codes = json.load(token_json)
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
self.credentials_json, scopes=self.permissions, state=codes['state'])
flow.redirect_uri = self.redirect_uri
authorization_response = codes['url']
flow.fetch_token(authorization_response=authorization_response)
credentials = flow.credentials
values = {'credentials': Authenticate.credentials_to_dict(credentials)}
with open('token.json', 'w') as token:
token.write(json.dumps(values))
return values
"""
with open("code.json", "r") as token_json:
code = json.load(token_json)["code"]
with open(self.credentials_json, "r") as credentials:
credentials_dict = json.load(credentials)
client_id = credentials_dict["web"]["client_id"]
client_secret = credentials_dict["web"]["client_secret"]
redirect_uri = credentials_dict["web"]["redirect_uris"][0]
print(redirect_uri)
data = dict(
grant_type="authorization_code",
code=code,
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri
)
req = requests.post("https://accounts.google.com/o/oauth2/token", data=data)
with open("token.json", "wb") as token:
token.write(req.content)
return req.content
@mThreading.thread
def run(self, host, port):
self.app.run(host=host, port=port)
if __name__ == '__main__':
auth = Authenticate()
auth.start()
auth.run('localhost', 5000)