-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·193 lines (148 loc) · 5.96 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/env python3
import re
from sys import stdin
import grequests
import argparse
# pip install grequests
# check if package.json or package-lock.json already exists in the url
# show the urls that use the affected pluggin
parser = argparse.ArgumentParser(
description='Get subdomains from stdin and search for dependency confusion.')
parser.add_argument(
"-th", help="Number of concurrence threads", default=10, type=int)
parser.add_argument("-to", help="Timeout (in seconds)",
default=15, type=int, required=False)
parser.add_argument(
"-a", help="String to append in the end of url. E.g: -a=\"?token=foo\"", default="", required=False)
parser.add_argument("-v", help="Verbose mode",
type=int, default=0, required=False)
parser.add_argument(
"--version", help="Show version and exit", action="store_true")
parser.add_argument(
"-s", help="Silent, only shows the useful results", action="store_true")
parser.add_argument(
"-link", help="Show full link to the npm possible vulnerable package", action="store_true")
parser.add_argument(
"-p", help="Not include the path provided in the url", action="store_true")
args = parser.parse_args()
user_input = vars(args)
def sprint(message, require_verbose: int = 0):
if user_input["s"] == True:
return
if user_input["v"] >= require_verbose:
print(message)
sprint(user_input, 3)
if user_input["version"]:
sprint("version 0.1", 0)
exit(0)
if stdin.isatty():
sprint("Error, you must provide subdomains in stdin")
exit()
lines = stdin.readlines()
def send_async_request(urls: list, threads: int = 10, timeout: int = 15):
all_responses = []
req_list_for_threads = []
for i in range(len(urls)):
cr = grequests.get(urls[i], timeout=timeout/10)
req_list_for_threads.append(cr)
sprint(cr.url, 2)
# every time the number of request reches the concurret threads limit,
# it is going to send and wait for the response, and only then send more requests
if i % threads == 0 or len(urls) == i - 1:
res_list = grequests.map(req_list_for_threads)
req_list_for_threads = []
# saving the batch of request this to one array
for res in res_list:
all_responses.append(res)
return all_responses
if not len(lines):
print("Please, provide the urls/domains in stdin!")
exit(0)
obj_lines = {}
target_url_list = []
# async request
for i in range(len(lines)):
line = lines[i]
obj_lines[i] = {}
# [0]protocol [1]domain [2]path
s = re.search("^(https?\:\/\/)?(.+?)(\/.*)?$", line)
if not s:
sprint("Invalid url provided! " + line, 3)
obj_lines[i]["valid"] = False
continue
obj_lines[i]["valid"] = True
# unless specified http, it is going to be https
obj_lines[i]["protocol"] = "https://"
if s.group(1):
obj_lines[i]["protocol"] = s.group(1)
obj_lines[i]["domain"] = s.group(2)
obj_lines[i]["path"] = "/"
if s.group(3):
obj_lines[i]["path"] = s.group(3)
url = obj_lines[i]["protocol"] + obj_lines[i]["domain"] + \
obj_lines[i]["path"] if not user_input["p"] else "" + \
("/" if obj_lines[i]["path"][-1] != "/" else "")
target_url_list.append(url + "package.json" + str(user_input["a"]))
target_url_list.append(url + "package-lock.json" + str(user_input["a"]))
sprint("Sending requests to the target")
target_res_list = send_async_request(
target_url_list, user_input["th"], user_input["to"])
# loop through the responses and check if they are valid package or package-lock files
# and if they do, send a request using npm api checking if all the packages exists
# if the packages doesn't exist, we have a possible depencency confusion
all_json_res = []
checked_pkgs = [] # avoid request more than once the package
checked_pkgs_links = []
async_npm_req_list = []
for i in range(len(target_res_list)):
res = target_res_list[i]
json_res = {}
is_json = False
try:
json_res = res.json()
is_json = True
except:
is_json = False
if not is_json == True or not len(json_res) > 0 or json_res == {}:
continue
# It is a json, I should be able to write :p
json_res["url"] = res.url
all_json_res.append(json_res)
# OK, here we have a valid json
# but is it really what we want?
if "devDependencies" in json_res or "dependencies" in json_res: # package-lock.json
for x in json_res:
x = str(x)
if x in ["devDependencies", "dependencies"]:
for pkg in json_res[x]:
# print(pkg)
if pkg not in checked_pkgs:
checked_pkgs.append(pkg)
checked_pkgs_links.append(
"https://registry.npmjs.org/"+pkg)
else:
sprint("PACKAGE ALREDY INCLUDED " + pkg, 3)
# if verbose then print already included + pkg
continue
continue
#
if not len(checked_pkgs):
sprint("No package file was found.")
sprint("Sending requests to NPM api")
npm_res_list = send_async_request(
checked_pkgs_links, user_input["th"], user_input["to"])
for r in npm_res_list:
if r.status_code != 200:
u = str(r.url).replace("https://registry.npmjs.org/", "")
print(str(r.url) if user_input["link"] else u)
# go throgh all the valid responses
for json_res in all_json_res:
# go throgh all the properties of this response
for x in json_res:
# check if it is a valid property
if x in ["devDependencies", "dependencies"]:
# go throgh all packages of this property
for pkg in json_res[x]:
# check if the package is affected
if pkg == u:
sprint("=>" + json_res["url"])