diff --git a/cve_scan/README.md b/cve_scan/README.md index 3b3151a..83c163d 100644 --- a/cve_scan/README.md +++ b/cve_scan/README.md @@ -39,7 +39,6 @@ $ python3 cve_source_linux.py scan --repo ~/kernel 通过补丁对比的方式检测 Linux 内核仓库中已合并及未合并的所有 CVE 补丁。 ```sh -$ cve_searchsploit -u # 建议先更新cve-edbid $ python3 cve_patch_linux.py update # 更新CVE补丁库 $ python3 cve_patch_linux.py scan --repo ~/kernel --version 5.10 ``` @@ -50,9 +49,8 @@ $ python3 cve_patch_linux.py scan --repo ~/kernel --version 5.10 ```sh $ python3 cve_patch_android.py update --version 11 # 更新CVE补丁库 -$ python3 cve_patch_android.py format --repo ~/hmi --manifest 2024-01-01 # 生成仓库补丁 -$ python3 cve_patch_android.py format --repo ~/hmi --date 2024-01-01 # 生成仓库补丁 -$ python3 cve_patch_android.py scan --repo ~/hmi --version 11 +$ python3 cve_patch_android.py format --repo ~/hmi --date 2022-01-01 --version 11 # 生成仓库补丁 +$ python3 cve_patch_android.py scan --repo ~/hmi --version 11 --strict # 严格模式扫描 ``` ## cve_patch_qualcomm.py diff --git a/cve_scan/cve_patch_android.py b/cve_scan/cve_patch_android.py index 3ac5ba7..1a5be93 100755 --- a/cve_scan/cve_patch_android.py +++ b/cve_scan/cve_patch_android.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 +import re import sys import json import base64 @@ -7,7 +8,6 @@ import pyfiglet import argparse import requests -import contextlib from tqdm import tqdm from lxml import etree @@ -28,24 +28,36 @@ NVD_KEY = '' # 在扫描时排除的漏洞 -CVE_EXCLUDE = [] +CVE_EXCLUDE = { + '11': [ -# 转移了位置和需要排除的仓库 -REPOSITORY = { - 'migrate': { + ], + '12': [ + + ] +} + +# 排除部分仓库,提高速度 +REPO_EXCLUDE = [ + 'platform/cts', +] + +# 移动过位置的仓库 +REPO_MIGRATE = { + '12': { + 'platform/packages/modules/Bluetooth': 'platform/system/bt', + }, + '11': { 'platform/packages/modules/Bluetooth': 'platform/system/bt', 'platform/packages/modules/NeuralNetworks': 'platform/frameworks/ml', 'platform/packages/modules/StatsD': 'platform/frameworks/base', 'platform/packages/modules/Connectivity': 'platform/frameworks/base', 'platform/packages/modules/Wifi': 'platform/frameworks/opt/net/wifi', }, - 'exclude': [ - 'platform/cts', - ] } # 最早和最晚时间 -ANDROID_VERSION = { +BULLETIN_TIME = { '14': ('2023-11-01', ''), '13': ('2022-09-01', ''), '12L': ('2022-04-01', ''), @@ -58,7 +70,7 @@ } # HTML元素ID -IDS = { +BULLETIN_IDS = { 'aosp': [ 'android-runtime', '01android-runtime', @@ -118,20 +130,55 @@ } -def get_repo(url: str, flag: bool): +def all_version(): + """针对单个版本还是所有版本""" + return ANDROID_VERSION == 'all' + + +def in_version(date_str: str): + """找到符合时间的版本""" + vers = [] + url_date = datetime.strptime(date_str, '%Y-%m-%d') + for ver, (start_date, end_date) in BULLETIN_TIME.items(): + start_date = datetime.strptime(start_date, '%Y-%m-%d') + end_date = datetime.strptime(end_date, '%Y-%m-%d') if end_date else datetime.now() + if url_date >= start_date and url_date <= end_date: + vers.append(ver) + return vers + + +def get_repo(url: str, ver: str): """从fix url获取仓库名""" repo = url.split("googlesource.com/")[1].split('/+/')[0] - if flag: - if repo in REPOSITORY['exclude']: - return '' - if repo in REPOSITORY['migrate']: - repo = REPOSITORY['migrate'][repo] + if repo in REPO_EXCLUDE: + return '' + if repo in REPO_MIGRATE.get(ver, {}): + old_repo = repo + repo = REPO_MIGRATE[ver][old_repo] return repo -def get_patch(url: str): - """获取patch""" +def get_fix_repos(ver): + """获取某个版本所有补丁涉及的所有仓库""" + repos = set() + for cve_data in {**cves_data[ver].get('aaos', {}), **cves_data[ver]['aosp']}.values(): + for url in cve_data['fixes']: + if repo := get_repo(url, ver): + repos.add(repo) + print_focus(f'Version: {ver}, Repos: {len(repos)}') + print(repos) + return repos + +def get_patch_meta(url: str, text: bool=False): + """获取补丁的元数据""" + url = f'{url}&&format=JSON' if '/?s=' in url else f'{url}?format=JSON' + data = requests.get(url).text[5:] + return data if text else json.loads(data) + + +def get_patch(url: str): + """获取补丁""" class shadow: def __init__(self, browser): self.browser = browser @@ -182,29 +229,6 @@ def get_shadow(self, shadow, by: str, value: str): return patch -def get_poc(cve_id: str): - """检查是否有POC/EXP""" - pocs = [] - - poc_url = 'https://raw.githubusercontent.com/nomi-sec/PoC-in-GitHub/master' - with contextlib.suppress(Exception): - r = requests.get(f'{poc_url}/{cve_id.split("-")[1]}/{cve_id.upper()}.json', timeout=15) - if r.status_code == 200: - pocs.extend([i['html_url'] for i in r.json()]) - - edb_url = 'https://www.exploit-db.com/search?cve=' - with contextlib.suppress(Exception): - headers = { - 'X-Requested-With': 'XMLHttpRequest', - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36' - } - r = requests.get(f'{edb_url}{cve_id.upper()}', headers=headers, timeout=15) - if r.status_code == 200: - pocs.extend([f'https://www.exploit-db.com/exploits/{i["id"]}' for i in r.json()['data']]) - - return pocs - - def get_cve_detail(cve_id: str): """获取CVE详情""" result = { @@ -233,34 +257,9 @@ def get_cve_detail(cve_id: str): return result -def extract_section(soup, tag1: str, tag2: str, date_str: str): +def extract_section(soup, tag2: str, date_str: str): """解析表格""" - - found = soup.find('h3', id=tag2) - if not found: - return - - # 解析表头 - title = found.find_next('table').find('tr') - ver_idx = -1 - component_idx = -1 - type_idx = -1 - for idx, col in enumerate(title.find_all('th')): - if col.text == 'CVE': - cve_idx = idx - elif col.text in {'Component', 'Subcomponent'}: - component_idx = idx - elif col.text == 'References': - ref_idx = idx - elif col.text == 'Severity': - severity_idx = idx - elif col.text == 'Type': - type_idx = idx - elif col.text == 'Updated AOSP versions': - ver_idx = idx - - # 解析表内容 - for row in title.find_next_siblings('tr'): + def parse_table(row): flag = False type_text = '' vers_text = '' @@ -281,6 +280,7 @@ def extract_section(soup, tag1: str, tag2: str, date_str: str): flag = True idx += 2 # 列标校准 if idx == ref_idx and not flag: + bug_id = col.text.strip() urls = [] for url in col.find_all('a'): href = url.get('href') @@ -298,65 +298,115 @@ def extract_section(soup, tag1: str, tag2: str, date_str: str): item = {cve_id: { 'date': date_str, + 'bug_id': bug_id, 'fixes': urls, 'affected_versions': vers_text, 'affected_component': component_text, 'type': type_text, 'severity': severity_text }} - print_success(f'{tag2}\t{item}') item[cve_id].update(get_cve_detail(cve_id)) + return item - # 下载patch - tag_path = patch_sec_path.joinpath(tag1) - tag_path.mkdir(parents=True, exist_ok=True) - if tag1 in {'aosp', 'aaos'}: - vers = vers_text.replace(" ", "").split(',') - for ver in vers: - cves_data[tag1][ver].update(item) + items = [] + found = soup.find('h3', id=tag2) + if not found: + return items + + # 解析表头 + title = found.find_next('table').find('tr') + ver_idx = -1 + component_idx = -1 + type_idx = -1 + for idx, col in enumerate(title.find_all('th')): + if col.text == 'CVE': + cve_idx = idx + elif col.text in {'Component', 'Subcomponent'}: + component_idx = idx + elif col.text == 'References': + ref_idx = idx + elif col.text == 'Severity': + severity_idx = idx + elif col.text == 'Type': + type_idx = idx + elif col.text == 'Updated AOSP versions': + ver_idx = idx - if not urls: - print_failed(f'{tag1} {cve_id} no fixes') - continue + # 解析表内容 + for row in title.find_next_siblings('tr'): + item = parse_table(row) + items.append(item) + print_success(f'{tag2}\t{item}') - for idx, url in enumerate(urls): - try: - patch = get_patch(url) - except Exception as e: - print_failed(f'{url} download failed') - print(e, item) - continue + return items - for ver in vers: - ver_path = tag_path.joinpath(ver) - ver_path.mkdir(parents=True, exist_ok=True) - if len(urls) == 1: - patch_path = ver_path.joinpath(f'{cve_id}.patch') - else: - patch_path = ver_path.joinpath(f'{cve_id}-{idx+1}.patch') - with open(patch_path, 'w+') as f: - f.write(patch) + +def download_patches(item: dict, tag1: str, date_str: str): + """下载补丁和元数据""" + def update_cves_data(vers): + if all_version(): + for ver in vers: + cves_data[ver].setdefault(tag1, {}).update(item) + elif ANDROID_VERSION in vers: + cves_data[ANDROID_VERSION].setdefault(tag1, {}).update(item) else: - cves_data[tag1].update(item) + print_failed(f'{tag1} {cve_id} not in Android {ANDROID_VERSION}') + return False + return True - if not urls: - print_failed(f'{tag1} {cve_id} no fixes') - continue + def write_aosp_files(ver): + tag_path = patch_sec_path.joinpath(ver, tag1) + tag_path.mkdir(parents=True, exist_ok=True) + if len(urls) == 1: + patch_path = tag_path.joinpath(f'{cve_id}.patch') + patch_meta_path = tag_path.joinpath(f'{cve_id}.json') + else: + patch_path = tag_path.joinpath(f'{cve_id}-{idx+1}.patch') + patch_meta_path = tag_path.joinpath(f'{cve_id}-{idx+1}.json') + with open(patch_path, 'w+') as f1, open(patch_meta_path, 'w+') as f2: + f1.write(patch) + f2.write(patch_meta) + + def write_other_files(ver): + tag_path = patch_sec_path.joinpath(ver, tag1) + tag_path.mkdir(parents=True, exist_ok=True) + if len(urls) == 1: + patch_path = tag_path.joinpath(f'{cve_id}.patch') + else: + patch_path = tag_path.joinpath(f'{cve_id}-{idx+1}.patch') + with open(patch_path, 'w+') as f: + f.write(patch) + + tag_aosp = tag1 in {'aosp', 'aaos'} + cve_id = list(item.keys())[0] + cve_data = item[cve_id] + urls = cve_data['fixes'] + vers = cve_data['affected_versions'].replace(" ", "").split(',') if tag_aosp else in_version(date_str) + + # 更新cves_data + if not update_cves_data(vers): + return - for idx, url in enumerate(urls): - try: - patch = get_patch(url) - except Exception as e: - print_failed(f'{url} download failed') - print(e, item) - continue + # 下载补丁 + if not urls: + print_failed(f'{tag1} {cve_id} no fixes') + return - if len(urls) == 1: - patch_path = tag_path.joinpath(f'{cve_id}.patch') - else: - patch_path = tag_path.joinpath(f'{cve_id}-{idx+1}.patch') - with open(patch_path, 'w+') as f: - f.write(patch) + for idx, url in enumerate(urls): + try: + patch = get_patch(url) + if tag_aosp: + patch_meta = get_patch_meta(url, text=True) + except Exception as e: + print_failed(f'{url} download failed') + print(e, item) + continue + + if all_version(): + for ver in vers: + write_aosp_files(ver) if tag_aosp else write_other_files(ver) + else: + write_aosp_files(ANDROID_VERSION) if tag_aosp else write_other_files(ANDROID_VERSION) def updateAospThread(url: str): @@ -367,9 +417,13 @@ def updateAospThread(url: str): r = requests.get(url) soup = BeautifulSoup(r.content, 'html.parser') - for tag1, v in IDS.items(): + for tag1, v in BULLETIN_IDS.items(): + if tag1 == 'aaos': + continue for tag2 in v: - extract_section(soup, tag1, tag2, date_str) + items = extract_section(soup, tag2, date_str) + for item in items: + download_patches(item, tag1, date_str) def updateAaosThread(url: str): @@ -381,15 +435,52 @@ def updateAaosThread(url: str): soup = BeautifulSoup(r.content, 'html.parser') tag1 = 'aaos' - for tag2 in IDS[tag1]: - extract_section(soup, tag1, tag2, date_str) + for tag2 in BULLETIN_IDS[tag1]: + items = extract_section(soup, tag2, date_str) + for item in items: + download_patches(item, tag1, date_str) + + +def updatePatch(ver, repos): + """更新各版本的补丁""" + ver_meta = {} + + for repo in repos: + base_url = f'https://android.googlesource.com/{repo}/+log/refs/heads/android{ver}-security-release' + print_focus(base_url) + + # 获取多页数据 + repo_meta = [] + next_commit = '' + for _ in range(5): + try: + url = f'{base_url}/?s={next_commit}' if next_commit else base_url + meta = get_patch_meta(url) + repo_meta.extend(meta['log']) + next_commit = meta.get('next') + if not next_commit: + break + except Exception as e: + print_failed(f'{base_url} no security branch') + print(e) + break + ver_meta[repo] = repo_meta + + with open(android_meta, 'w+') as f: + json.dump(ver_meta, f, indent=4) + print_success(f'Meta saved in {android_meta}') def update(args): """更新CVE补丁库""" - start_date, end_date = ANDROID_VERSION[args.version] + if all_version(): + start_date, _ = BULLETIN_TIME['11']#['8.0'] + end_date = datetime.now() + else: + start_date, end_date = BULLETIN_TIME[ANDROID_VERSION] + end_date = datetime.strptime(end_date, '%Y-%m-%d') if end_date else datetime.now() start_date = datetime.strptime(start_date, '%Y-%m-%d') - end_date = datetime.strptime(end_date, '%Y-%m-%d') if end_date else datetime.now().strftime('%Y-%m-%d') + print_focus(f'Update from {start_date} to {end_date}') def get_urls(overview_url: str, bulletin_url: str): urls = [] @@ -425,34 +516,96 @@ def get_urls(overview_url: str, bulletin_url: str): json.dump(cves_data, f, indent=4) print_success(f'Results saved in {android_cves}') + if all_version(): + for ver in BULLETIN_TIME.keys(): + fix_repos = get_fix_repos(ver) + updatePatch(ver, fix_repos) + else: + fix_repos = get_fix_repos(ANDROID_VERSION) + updatePatch(ANDROID_VERSION, fix_repos) + def compareThread(key: str, repo: str, cve_id: str, cve_path: Path): """对比某个CVE补丁与所有补丁""" + def get_meta_str(meta): + pattern = r'[0-9a-f]{40}' # 删掉commit id,提高准确性 + message = re.sub(pattern, '', meta.get('message', '')) + author = meta.get('author', {}) + return json.dumps({ + 'author': { + 'name': author.get('name', ''), + 'email': author.get('email', '')}, + 'message': message}, + sort_keys=True) + + def filter_patches(f1): + # 有相同路径或文件名的补丁 + f1_filenames = get_modified_files(f1, mode='name') + fi_paths = get_modified_files(f1, mode='path') + + patches1 = [ + patch_all_path.joinpath(repo, f2) + for f2, f2_files in patches_data[repo].items() + if any(i.split('/')[-1] in f1_filenames for i in f2_files) + ] + patches2 = [ + patch_all_path.joinpath(repo, f2) + for f2, f2_files in patches_data[repo].items() + if any('/'.join(i.split('/')[:-1]) in fi_paths for i in f2_files) + ] + patches = set(patches1 + patches2) + if not patches: + print_failed(f'[{repo}] {cve_name} not found patches!') + return patches + + def find_patch(patches, f1): + result = [] + for patch in patches: + f2 = open(patch).read() + ratio = fuzz.ratio(f1, f2) + if ratio >= 60: + print_focus(f'[{repo}] {cve_name} found ({ratio}%): {patch.stem}') + if ratio >= 80: + result.append(f'{ratio}% {patch.stem}') + # 非strict模式下找到一个就返回 + if not args.strict: + break + return result - result = [] + # 比较通用补丁 + cve_name = cve_path.stem f1 = cve_path.read_text(errors='ignore') - f1_filenames = get_modified_files(f1, name_only=True) - - patches = [ - patch_all_path.joinpath(repo, f2) - for f2, f2_files in patches_data[repo].items() - if any(i.split('/')[-1] in f1_filenames for i in f2_files) - ] - if not patches: - print_failed(f'[{repo}] {cve_path.stem} not found patches!') - - for patch in patches: - f2 = open(patch).read() - ratio = fuzz.ratio(f1, f2) - if ratio >= 60: - print_focus(f'[{repo}] {cve_path.stem} found ({ratio}%): {patch.stem}') + patches = filter_patches(f1) + result = find_patch(patches, f1) + + # 如果没有找到,则比较特定补丁 + if not result: + specific = [] + cve_meta = json.load(open(cve_path.with_suffix('.json'))) + f1_meta = get_meta_str(cve_meta) + + for meta in meta_data[repo]: + f3_meta = get_meta_str(meta) + ratio = fuzz.ratio(f1_meta, f3_meta) if ratio >= 80: - result.append(f'{ratio}%{patch.stem}') + url = f'https://android.googlesource.com/{repo}/+/{meta["commit"]}' + specific.append((f'{ratio}%', url)) + try: + f1 = get_patch(url) + except Exception as e: + continue + patches = filter_patches(f1) + result.extend(find_patch(patches, f1)) + # 非strict模式下找到一个就返回 + if result and not args.strict: + break + print_focus(f'[{repo}] {cve_name} specific patch: {len(specific)}') + print(specific) - if patches and not result: - print_failed(f'[{repo}] {cve_path.stem} not found!') + if not result: + print_failed(f'[{repo}] {cve_name} not found!') - return key, repo, cve_id, {cve_path.stem: result} + return key, repo, cve_id, {cve_name: result} def formatThread(repo_name, repo_path): @@ -484,7 +637,7 @@ def formatThread(repo_name, repo_path): return repo_name, int(number) -def get_modified_files(patch: str, name_only: bool = False): +def get_modified_files(patch: str, mode: str='all'): """获取补丁中修改的文件""" modified_files = [] for line in patch.splitlines(): @@ -492,8 +645,10 @@ def get_modified_files(patch: str, name_only: bool = False): start = line.find(' a/') + len(' a/') end = line.find(' b/') file = line[start:end] - if name_only: + if mode == 'name': modified_files.append(file.split('/')[-1]) + elif mode == 'path': + modified_files.append('/'.join(file.split('/')[:-1])) else: modified_files.append(file) return modified_files @@ -502,16 +657,18 @@ def get_modified_files(patch: str, name_only: bool = False): def patchThread(repo: Path, patch: Path): try: # 提取diff部分 - lines = patch.read_text(errors='ignore').splitlines() + patch_data = patch.read_text(errors='ignore') + lines = patch_data.splitlines() diff_index = next((i for i, line in enumerate(lines) if line.startswith('diff --git')), -1) + meta_part = lines[:diff_index] diff_part = lines[diff_index:-3] - new_patch = '\n'.join(diff_part) - with open(patch, 'w+') as f: - f.write(new_patch) + with open(patch, 'w+') as f1, open(patch.with_suffix('.txt'), 'w+') as f2: + f1.write('\n'.join(diff_part)) + f2.write('\n'.join(meta_part)) # 找出修改的文件 - modified_files = get_modified_files(new_patch) + modified_files = get_modified_files(patch_data) return str(repo), patch.name, modified_files except Exception as e: print_failed(f'patchThread failed: {patch.name}\n{e}') @@ -567,26 +724,13 @@ def format(args): all_hmi[repo] = hmi_path.joinpath(path) # 安全补丁涉及的AOSP仓库 - fixes = set() - for ver_data in {**cves_data['aaos'], **cves_data['aosp']}.values(): - for cve_data in ver_data.values(): - for url in cve_data['fixes']: - fixes.add(get_repo(url, flag=False)) + fix_repos = get_fix_repos(ANDROID_VERSION) # 安全补丁对应的本地仓库 - sec_hmi = {key: all_hmi[key] for key in fixes if key in all_hmi} - not_hmi = fixes - sec_hmi.keys() + sec_hmi = {key: all_hmi[key] for key in fix_repos if key in all_hmi} + not_hmi = fix_repos - sec_hmi.keys() print_focus(f'Repo found: {len(sec_hmi)}, Repo not found: {len(not_hmi)}') - print(not_hmi) - # 排除无效仓库 - for key in REPOSITORY['exclude']: - sec_hmi.pop(key, None) - # 处理迁移仓库 - for key, value in REPOSITORY['migrate'].items(): - if key not in sec_hmi and value in all_hmi: - print_focus(f'Migrated: {key} -> {value}') - if value not in sec_hmi: - sec_hmi[value] = all_hmi[value] + print(not_hmi) if not_hmi else None # 生成补丁 if args.manifest: @@ -632,7 +776,7 @@ def make_data(key, data): for cve, cve_data in data.items(): if fixes := cve_data['fixes']: for fix in fixes: - if repo := get_repo(fix, flag=True): + if repo := get_repo(fix, ANDROID_VERSION): cve_fixes[key][repo][cve] = cve_data else: # 没有修复链接 @@ -644,7 +788,7 @@ def in_repo(repo1, patch): idx = int(temp[-1]) if len(temp) == 4 else 0 if idx != 0: fix = cve_fixes[key][repo][cve_id]['fixes'][idx-1] - repo2 = get_repo(fix, flag=True) + repo2 = get_repo(fix, ANDROID_VERSION) return repo1 == repo2 return True @@ -658,8 +802,8 @@ def in_repo(repo1, patch): results['aosp'] = defaultdict(dict) results['aaos'] = defaultdict(dict) - aosp = cves_data['aosp'][args.version] - aaos = cves_data['aaos'][args.version] + aosp = cves_data[ANDROID_VERSION]['aosp'] + aaos = cves_data[ANDROID_VERSION]['aaos'] aaos = {key: aaos[key] for key in aaos.keys() - aosp.keys()} print_focus(f'AOSP CVE: {len(aosp)}, AAOS CVE: {len(aaos)}') @@ -667,13 +811,13 @@ def in_repo(repo1, patch): make_data('aosp', aosp) make_data('aaos', aaos) print_focus(f'CVE fixes: AOSP {len(cve_fixes["aosp"])}, AAOS {len(cve_fixes["aaos"])}') - with open('cve_fixes.json', 'w+') as f: + with open(patch_sec_path.joinpath('cve_fixes.json'), 'w+') as f: json.dump(cve_fixes, f, indent=4) with ProcessPoolExecutor(os.cpu_count()-1) as executor: tasks = [] for key, value in cve_fixes.items(): - patches_path = patch_sec_path.joinpath(key).joinpath(args.version) + patches_path = patch_sec_path.joinpath(ANDROID_VERSION, key) patches = list(patches_path.glob('*.patch')) for repo, cve_dict in value.items(): @@ -687,7 +831,7 @@ def in_repo(repo1, patch): for cve_id, cve_data in cve_dict.items(): # 排除部分漏洞 - if cve_id in CVE_EXCLUDE: + if cve_id in CVE_EXCLUDE[ANDROID_VERSION]: results[key]['exclude'][cve_id] = cve_data continue @@ -709,9 +853,16 @@ def in_repo(repo1, patch): for key, value in results.items(): pop_list = [] for cve_id, cve_data in value['patched'].items(): - if any(i == [] for i in cve_data['scan'].values()): - results[key]['unpatched'][cve_id] = cve_data - pop_list.append(cve_id) + # strict模式下,只要有一个补丁未修复,就认为该漏洞未修复 + if args.strict: + if any(i == [] for i in cve_data['scan'].values()): + results[key]['unpatched'][cve_id] = cve_data + pop_list.append(cve_id) + # 非strict模式下,只要有一个补丁修复,就认为该漏洞修复 + else: + if all(i == [] for i in cve_data['scan'].values()): + results[key]['unpatched'][cve_id] = cve_data + pop_list.append(cve_id) for i in pop_list: value['patched'].pop(i) @@ -725,16 +876,18 @@ def argument(): subparsers = parser.add_subparsers() parser_update = subparsers.add_parser('update', help='update CVE patch data') - parser_update.add_argument('--version', help='Android version number', type=str, required=True) + parser_update.add_argument('--version', help='Android version number', type=str, default='all') parser_update.set_defaults(func=update) parser_format = subparsers.add_parser('format', help='format CVE patch data for Android repository') parser_format.add_argument('--repo', help='Android git repository path', type=str, required=True) parser_format.add_argument('--manifest', help='Manifest time "YYYY-MM-DD"', type=str, required=False) parser_format.add_argument('--date', help='Date time "YYYY-MM-DD"', type=str, required=False) + parser_format.add_argument('--version', help='Android version number', type=str, required=True) parser_format.set_defaults(func=format) parser_scan = subparsers.add_parser('scan', help='scan CVE patch in Android repository') + parser_scan.add_argument('--strict', help='Strict mode', action='store_true', default=False) parser_scan.add_argument('--repo', help='Android git repository path', type=str, required=True) parser_scan.add_argument('--version', help='Android version number', type=str, required=True) parser_scan.set_defaults(func=scan) @@ -753,12 +906,12 @@ def argument(): android_cves = patch_sec_path.joinpath('android_cves.json') args = argument() + ANDROID_VERSION = args.version + android_meta = patch_sec_path.joinpath(ANDROID_VERSION, 'meta.json') # 第一步:更新CVE补丁库 if args.func.__name__ == 'update': cves_data = defaultdict(dict) - cves_data['aosp'] = defaultdict(dict) - cves_data['aaos'] = defaultdict(dict) # 第二步:为所有仓库生成补丁 elif args.func.__name__ == 'format': @@ -778,5 +931,6 @@ def argument(): cves_data = json.load(open(android_cves)) patches_data = json.load(open(android_patches)) + meta_data = json.load(open(android_meta)) args.func(args) diff --git a/cve_scan/cve_patch_linux.py b/cve_scan/cve_patch_linux.py index 8908a18..02238dd 100755 --- a/cve_scan/cve_patch_linux.py +++ b/cve_scan/cve_patch_linux.py @@ -8,9 +8,7 @@ import mmap import pyfiglet import argparse -import requests import asyncio -import cve_searchsploit from pathlib import Path from thefuzz import fuzz @@ -97,12 +95,9 @@ def compareThread2(cve: Path): cve_name = '-'.join(cve.stem.split('-')[:3]) result = { 'url': f'https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/commit/?h=linux-{cve.parent.name}.y&id={cve.stem.split("-")[-1]}', - 'poc': [f'https://www.exploit-db.com/exploits/{edbid}' for edbid in cve_searchsploit.edbid_from_cve(cve_name)], + 'poc': get_poc(cve_name), 'scan': {'modify': [], 'unmodify':[], 'ratio':0} } - poc_url = f'https://github.com/nomi-sec/PoC-in-GitHub/blob/master/{cve_name.split("-")[1]}/{cve_name}.json' - if requests.get(poc_url): - result['poc'].append(poc_url) file_modify = extract_patch_info(cve) modify_patch = 0 @@ -212,12 +207,9 @@ def compareThread(cve: Path, patch_path: Path): cve_name = '-'.join(cve.stem.split('-')[:3]) result = { 'url': f'https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/commit/?h=linux-{cve.parent.name}.y&id={cve.stem.split("-")[-1]}', - 'poc': [f'https://www.exploit-db.com/exploits/{edbid}' for edbid in cve_searchsploit.edbid_from_cve(cve_name)], + 'poc': get_poc(cve_name), 'scan': {} } - poc_url = f'https://github.com/nomi-sec/PoC-in-GitHub/blob/master/{cve_name.split("-")[1]}/{cve_name}.json' - if requests.get(poc_url): - result['poc'].append(poc_url) try: f1 = open(cve).read() diff --git a/init_local.sh b/init_local.sh index b768ee8..96b9536 100755 --- a/init_local.sh +++ b/init_local.sh @@ -23,7 +23,7 @@ EOF sudo apt-get update && sudo apt-get -y upgrade sudo apt-get -y install python3-dev python3-pip python3-venv zip unzip npm graphviz simg2img meld maven scrcpy -python3 -m pip install virtualenv wheel pyaxmlparser requests_toolbelt future paramiko pyfiglet rich colorama openpyxl beautifulsoup4 lxml nvdlib +python3 -m pip install virtualenv wheel pyaxmlparser requests_toolbelt future paramiko pyfiglet rich colorama openpyxl beautifulsoup4 lxml xhtml2pdf nvdlib echo "[+] Installing zsh ..." sudo apt-get -y install git zsh expect @@ -154,6 +154,10 @@ echo "[+] Installing APKHunt ..." # wget -q https://github.com/Cyber-Buddy/APKHunt/archive/refs/heads/main.zip go build -o ./tools/apkhunt ./tools/apkhunt.go +echo "[+] Installing APKDeepLens ..." +wget -q https://github.com/d78ui98/APKDeepLens/archive/refs/heads/main.zip +unzip -q main.zip -d ./tools/ && rm main.zip + echo "[+] Installing androbugs ..." sudo docker pull danmx/docker-androbugs @@ -327,7 +331,6 @@ chmod +x ./tools/linux-exploit-suggester.sh echo "[+] Installing linux_kernel_cves ..." python3 -m pip install "thefuzz[speedup]" -python3 -m pip install cve_searchsploit && cve_searchsploit -u git clone --depth=1 https://github.com/nluedtke/linux_kernel_cves ~/github/linux_kernel_cves echo "[+] Installing CVEhound ..." diff --git a/utils.py b/utils.py index 299af52..cd611c4 100644 --- a/utils.py +++ b/utils.py @@ -1,6 +1,8 @@ import os import socket import hashlib +import requests +import contextlib from lxml import etree from pathlib import Path from subprocess import Popen, PIPE, STDOUT, TimeoutExpired @@ -73,6 +75,29 @@ def get_md5(file_path: str) -> str: return md5.hexdigest() +def get_poc(cve_id: str): + """检查是否有POC/EXP""" + pocs = [] + + poc_url = 'https://raw.githubusercontent.com/nomi-sec/PoC-in-GitHub/master' + with contextlib.suppress(Exception): + r = requests.get(f'{poc_url}/{cve_id.split("-")[1]}/{cve_id.upper()}.json', timeout=15) + if r.status_code == 200: + pocs.extend([i['html_url'] for i in r.json()]) + + edb_url = 'https://www.exploit-db.com/search?cve=' + with contextlib.suppress(Exception): + headers = { + 'X-Requested-With': 'XMLHttpRequest', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36' + } + r = requests.get(f'{edb_url}{cve_id.upper()}', headers=headers, timeout=15) + if r.status_code == 200: + pocs.extend([f'https://www.exploit-db.com/exploits/{i["id"]}' for i in r.json()['data']]) + + return pocs + + class ManifestUtil: def __init__(self, file_path: Path): self.path = file_path