forked from SpockBotMC/SpockBot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth.py
150 lines (125 loc) · 4.75 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Provides authorization functions for Mojang's login and session servers
"""
import hashlib
import json
# This is for python2 compatibility
try:
import urllib.request as request
from urllib.error import URLError
except ImportError:
import urllib2 as request
from urllib2 import URLError
import logging
import os
from spockbot.mcp.yggdrasil import YggdrasilCore
from spockbot.plugins.base import PluginBase, pl_announce
logger = logging.getLogger('spockbot')
# This function courtesy of barneygale
def java_hex_digest(digest):
d = int(digest.hexdigest(), 16)
if d >> 39 * 4 & 0x8:
d = "-%x" % ((-d) & (2 ** (40 * 4) - 1))
else:
d = "%x" % d
return d
class AuthCore(object):
def __init__(self, event, online_mode, auth_timeout):
self.online_mode = online_mode
self.auth_timeout = auth_timeout
self.__event = event
self.ygg = YggdrasilCore()
self._shared_secret = None
self._username = None
def get_username(self):
return self._username
def set_username(self, username):
self.ygg.username = username
username = property(get_username, set_username)
def set_password(self, password):
if password and not self.online_mode:
logger.warning("PASSWORD PROVIDED WITH ONLINE_MODE == FALSE")
logger.warning("YOU PROBABLY DIDN'T WANT TO DO THAT")
self.ygg.password = password
password = property(lambda x: bool(x.ygg.password), set_password)
def set_client_token(self, client_token):
if not self.online_mode:
logger.warning("CLIENT TOKEN PROVIDED WITH ONLINE_MODE == FALSE")
logger.warning("YOU PROBABLY DIDN'T WANT TO DO THAT")
self.ygg.client_token = client_token
client_token = property(
lambda x: bool(x.ygg.client_token), set_client_token
)
def set_auth_token(self, auth_token):
if not self.online_mode:
logger.warning("AUTH TOKEN PROVIDED WITH ONLINE_MODE == FALSE")
logger.warning("YOU PROBABLY DIDN'T WANT TO DO THAT")
self.ygg.auth_token = auth_token
auth_token = property(
lambda x: bool(x.ygg.auth_token), set_auth_token
)
def get_shared_secret(self):
self._shared_secret = self._shared_secret or os.urandom(16)
return self._shared_secret
shared_secret = property(get_shared_secret)
def start_session(self):
if not self.online_mode:
self._username = self.ygg.username
return True
if self.ygg.login():
self._username = self.ygg.selected_profile['name']
return True
self.__event.emit('auth_session_error')
return False
def send_session_auth(self, pubkey_raw, server_id_raw):
server_id = java_hex_digest(hashlib.sha1(
server_id_raw.encode('ascii') + self.shared_secret + pubkey_raw
))
logger.info('Attempting to authenticate with Mojang session server')
url = "https://sessionserver.mojang.com/session/minecraft/join"
data = json.dumps({
'accessToken': self.ygg.access_token,
'selectedProfile': self.ygg.selected_profile,
'serverId': server_id,
}).encode('utf-8')
headers = {'Content-Type': 'application/json'}
req = request.Request(url, data, headers)
try:
rep = request.urlopen(
req, timeout=self.auth_timeout
).read().decode('ascii')
except URLError:
rep = "Couldn't connect to sessionserver.mojang.com"
if rep:
logger.warning('Mojang session auth response: %s', rep)
logger.info('Session authentication successful')
@pl_announce('Auth')
class AuthPlugin(PluginBase):
requires = 'Event'
defaults = {
'online_mode': True,
'auth_timeout': 3, # No idea how long this should be, 3s seems good
'auth_quit': True,
'sess_quit': True,
}
events = {
'auth_login_error': 'handle_auth_error',
'auth_session_error': 'handle_session_error',
}
def __init__(self, ploader, settings):
super(AuthPlugin, self).__init__(ploader, settings)
self.sess_quit = self.settings['sess_quit']
self.auth_quit = self.settings['auth_quit']
ploader.provides('Auth', AuthCore(
self.event,
self.settings['online_mode'],
self.settings['auth_timeout']
))
def handle_auth_error(self, name, data):
if self.auth_quit:
logger.error('AUTH: Session authentication error, calling kill')
self.event.kill()
def handle_session_error(self, name, data):
if self.sess_quit:
logger.error('AUTH: Session start error, calling kill')
self.event.kill()