forked from 0xn0ne/weblogicScanner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
125 lines (105 loc) · 4.24 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from typing import Tuple, Dict
import requests
import re
from urllib.parse import quote
__SCHEME_TO_PORT__ = {
'ftp': '21',
'ssh': '22',
'telnet': '23',
'tftp': '69',
'socks4': '1080',
'socks5': '1080',
'http': '80',
'pop2': '109',
'pop3': '110',
'sftp': '115',
'https': '443',
'sqlserver': '1433',
'mysql': '3306',
'postgresql': '5432',
'redis': '6379',
}
class DictString(dict):
def __setitem__(self, key, value):
super().__setitem__(key, str(value))
class Url:
def __init__(self, url: str):
'''
:param url: 需要解析的url
https://example.com:8952/nothing.py;param1=v1;param2=v2?query1=v1&query2=v2#frag
scheme=>https, netloc=>example.com:8952, path=>/nothing.py, params=>param1=v1;param2=v2,
query=>query1=v1&query2=v2, fragment=>frag, hostname=>example.com, port=>8952
'''
self.scheme, self.netloc, self.path, self.params, self.query = '', '', '', DictString(), DictString()
self.fragment, self.hostname, self.port, self.username, self.password = '', '', '', '', ''
try:
self.scheme, user_pass, self.netloc, self.path = re.search(
r'(.+)://([^\\/]*:[^\\/]*@)?([^\\/]+)(/[^;?#]*)?', url).groups()
if not self.path:
self.path = '/'
if user_pass:
self.username, self.password = re.search(r'([^@:]+):([^@:]+)', user_pass).groups()
self.hostname, self.port = re.search(r'([^:]+):?(\d+)?', self.netloc).groups()
if not self.port:
self.port = self.get_default_port(self.scheme)
except AttributeError:
raise ValueError('Incorrect URL')
r = re.findall(r';([^?#]+?)=([^?#;]+)', url)
if r:
self.params = DictString(r)
else:
self.params = DictString()
r = re.findall(r'[?&]([^;?#]+?)=([^;?#&]+)', url)
if r:
self.query = DictString(r)
else:
self.query = DictString()
r = re.search(r'#([^;?#]+)', url)
if r:
self.fragment = r.group(1)
@classmethod
def get_default_port(cls, scheme):
return __SCHEME_TO_PORT__[scheme]
def url_index(self):
base = f'{self.scheme}://'
if self.username:
base += f'{self.username}:{self.password}@'
base += self.netloc
return base
def url_path(self, encoded=True):
base = self.path
if self.params:
for k in self.params:
base += f';{k}={quote(self.params[k]) if encoded else self.params[k]}'
if self.query:
first = True
for k in self.query:
if first:
base += '?'
first = False
else:
base += f'&'
base += f'{k}={quote(self.query[k]) if encoded else self.query[k]}'
if self.fragment:
base += f'#{self.fragment}'
return base
def url_full(self, encoded=True):
return self.url_index() + self.url_path(encoded)
def __str__(self):
return f"URL(scheme={self.scheme}, netloc={self.netloc}, path={self.path}, params={self.params}, query={self.query}, fragment={self.fragment}, hostname={self.hostname}, port={self.port}, username={self.username}, password={self.password})"
def http(url, method='GET', headers=None, params=None, data=None, timeout=10, verify=True) -> (Tuple[requests.Response, None], Dict):
if not headers:
headers = {}
headers.update({'User-Agent': 'TestUA/1.0'})
nurl = Url(url)
try:
nurl.scheme = 'http'
return requests.request(method, nurl.url_full(), headers=headers, params=params, data=data, timeout=timeout,
verify=verify), {'code': 0, 'message': 'request success'}
except requests.exceptions.RequestException:
try:
nurl.scheme = 'https'
return requests.request(method, nurl.url_full(), headers=headers, params=params, data=data, timeout=timeout,
verify=verify),{'code': 0, 'message': 'request success'}
except requests.exceptions.RequestException as e:
return None, {'code': -10, 'message': e.__str__()}