-
Notifications
You must be signed in to change notification settings - Fork 47
/
yggdrasil.py
154 lines (129 loc) · 4.29 KB
/
yggdrasil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from __future__ import unicode_literals
import logging
try:
import simplejson as json
except ImportError:
import json
from six.moves.urllib.error import HTTPError
from six.moves.urllib.request import Request, urlopen
logger = logging.getLogger()
class YggdrasilCore(object):
ygg_version = 1
ygg_url = 'https://authserver.mojang.com'
def __init__(self, username='', password='', client_token='',
access_token=''):
self.username = username
self.password = password
self.client_token = client_token
self.access_token = access_token
self.available_profiles = []
self.selected_profile = {}
def login(self):
if self.access_token and self.validate():
return True
if self.access_token and self.client_token and self.refresh():
return True
return self.username and self.password and self.authenticate()
def logout(self):
return self.access_token and self.client_token and self.invalidate()
def _ygg_req(self, endpoint, payload):
try:
resp = urlopen(Request(
url=self.ygg_url + endpoint,
data=json.dumps(payload).encode('utf-8'),
headers={'Content-Type': 'application/json'})
)
except HTTPError as e:
resp = e
data = resp.read().decode('utf-8')
return json.loads(data) if data else dict()
def authenticate(self):
"""
Generate an access token using an username and password. Any existing
client token is invalidated if not provided.
Returns:
dict: Response or error dict
"""
endpoint = '/authenticate'
payload = {
'agent': {
'name': 'Minecraft',
'version': self.ygg_version,
},
'username': self.username,
'password': self.password,
'clientToken': self.client_token,
}
rep = self._ygg_req(endpoint, payload)
if not rep or 'error' in rep:
return False
self.access_token = rep['accessToken']
self.client_token = rep['clientToken']
self.available_profiles = rep['availableProfiles']
self.selected_profile = rep['selectedProfile']
return True
def refresh(self):
"""
Generate an access token with a client/access token pair. Used
access token is invalidated.
Returns:
dict: Response or error dict
"""
endpoint = '/refresh'
payload = {
'accessToken': self.access_token,
'clientToken': self.client_token,
}
rep = self._ygg_req(endpoint, payload)
if not rep or 'error' in rep:
return False
self.access_token = rep['accessToken']
self.client_token = rep['clientToken']
self.selected_profile = rep['selectedProfile']
return True
def signout(self):
"""
Invalidate access tokens with a username and password.
Returns:
dict: Empty or error dict
"""
endpoint = '/signout'
payload = {
'username': self.username,
'password': self.password,
}
rep = self._ygg_req(endpoint, payload)
if not rep or 'error' in rep:
return False
self.client_token = ''
self.access_token = ''
self.available_profiles = []
self.selected_profile = {}
return True
def invalidate(self):
"""
Invalidate access tokens with a client/access token pair
Returns:
dict: Empty or error dict
"""
endpoint = '/invalidate'
payload = {
'accessToken': self.access_token,
'clientToken': self.client_token,
}
self._ygg_req(endpoint, payload)
self.client_token = ''
self.access_token = ''
self.available_profiles = []
self.selected_profile = {}
return True
def validate(self):
"""
Check if an access token is valid
Returns:
dict: Empty or error dict
"""
endpoint = '/validate'
payload = dict(accessToken=self.access_token)
rep = self._ygg_req(endpoint, payload)
return not bool(rep)