-
Notifications
You must be signed in to change notification settings - Fork 130
/
Copy pathgithub.py
111 lines (96 loc) · 4.44 KB
/
github.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
OAuth backend for LinkedIn
"""
import json
import logging
import requests
from oic.utils.authn.authn_context import UNSPECIFIED
from oic.oauth2.consumer import stateID
from oic.oauth2.message import AuthorizationResponse
from satosa.backends.oauth import _OAuthBackend
from satosa.internal import AuthenticationInformation
from satosa.internal import InternalData
from satosa.response import Redirect
from satosa.util import rndstr
logger = logging.getLogger(__name__)
class GitHubBackend(_OAuthBackend):
"""GitHub OAuth 2.0 backend"""
def __init__(self, outgoing, internal_attributes, config, base_url, name):
"""GitHub backend constructor
:param outgoing: Callback should be called by the module after the
authorization in the backend is done.
:param internal_attributes: Mapping dictionary between SATOSA internal
attribute names and the names returned by underlying IdP's/OP's as
well as what attributes the calling SP's and RP's expects namevice.
:param config: configuration parameters for the module.
:param base_url: base url of the service
:param name: name of the plugin
:type outgoing:
(satosa.context.Context, satosa.internal.InternalData) ->
satosa.response.Response
:type internal_attributes: dict[string, dict[str, str | list[str]]]
:type config: dict[str, dict[str, str] | list[str] | str]
:type base_url: str
:type name: str
"""
config.setdefault('response_type', 'code')
config['verify_accesstoken_state'] = False
super().__init__(
outgoing, internal_attributes, config, base_url, name, 'github',
'id')
def start_auth(self, context, internal_request, get_state=stateID, **kwargs):
"""
:param get_state: Generates a state to be used in authentication call
:type get_state: Callable[[str, bytes], str]
:type context: satosa.context.Context
:type internal_request: satosa.internal.InternalData
:rtype satosa.response.Redirect
"""
oauth_state = get_state(self.config["base_url"], rndstr().encode())
context.state[self.name] = dict(state=oauth_state)
request_args = dict(
client_id=self.config['client_config']['client_id'],
redirect_uri=self.redirect_url,
state=oauth_state,
allow_signup=self.config.get('allow_signup', False))
scope = ' '.join(self.config['scope'])
if scope:
request_args['scope'] = scope
cis = self.consumer.construct_AuthorizationRequest(
request_args=request_args)
return Redirect(cis.request(self.consumer.authorization_endpoint))
def auth_info(self, requrest):
return AuthenticationInformation(
UNSPECIFIED, None,
self.config['server_info']['authorization_endpoint'])
def _authn_response(self, context, **kwargs):
state_data = context.state[self.name]
aresp = self.consumer.parse_response(
AuthorizationResponse, info=json.dumps(context.request))
self._verify_state(aresp, state_data, context.state)
url = self.config['server_info']['token_endpoint']
data = dict(
code=aresp['code'],
redirect_uri=self.redirect_url,
client_id=self.config['client_config']['client_id'],
client_secret=self.config['client_secret'], )
headers = {'Accept': 'application/json'}
r = requests.post(url, data=data, headers=headers)
response = r.json()
if self.config.get('verify_accesstoken_state', True):
self._verify_state(response, state_data, context.state)
user_info = self.user_information(response["access_token"])
auth_info = self.auth_info(context.request)
internal_response = InternalData(auth_info=auth_info)
internal_response.attributes = self.converter.to_internal(
self.external_type, user_info)
internal_response.subject_id = str(user_info[self.user_id_attr])
del context.state[self.name]
return self.auth_callback_func(context, internal_response)
def user_information(self, access_token):
url = self.config['server_info']['user_info']
headers = {'Authorization': 'token {}'.format(access_token)}
r = requests.get(url, headers=headers)
ret = r.json()
ret['id'] = str(ret['id'])
return r.json()