From 70284bfddc5930ff29f9fca2e87a9b24421a0368 Mon Sep 17 00:00:00 2001 From: Yan Date: Mon, 2 Sep 2024 02:52:13 -0700 Subject: [PATCH] remove web pwn, and last vestiges of run --- web-security/level-15/.config | 1 - web-security/level-15/run | 1 - web-security/module.yml | 3 - web-security/run | 721 ---------------------------------- 4 files changed, 726 deletions(-) delete mode 100644 web-security/level-15/.config delete mode 120000 web-security/level-15/run delete mode 100755 web-security/run diff --git a/web-security/level-15/.config b/web-security/level-15/.config deleted file mode 100644 index 60d3b2f..0000000 --- a/web-security/level-15/.config +++ /dev/null @@ -1 +0,0 @@ -15 diff --git a/web-security/level-15/run b/web-security/level-15/run deleted file mode 120000 index 84ba55b..0000000 --- a/web-security/level-15/run +++ /dev/null @@ -1 +0,0 @@ -../run \ No newline at end of file diff --git a/web-security/module.yml b/web-security/module.yml index 507b8be..4fa273b 100644 --- a/web-security/module.yml +++ b/web-security/module.yml @@ -54,9 +54,6 @@ challenges: name: CSRF 4 - id: level-14 name: CSRF 5 -- id: level-15 - name: Web Pwn - description: Exploit a (memory corruption) stack injection vulnerability resources: - name: "Web Security: Introduction" type: lecture diff --git a/web-security/run b/web-security/run deleted file mode 100755 index fca5cca..0000000 --- a/web-security/run +++ /dev/null @@ -1,721 +0,0 @@ -#!/opt/pwn.college/python - -import contextlib -import ctypes -import inspect -import os -import pathlib -import sqlite3 -import subprocess -import tempfile -import time -import urllib - -from flask import Flask, request, session, redirect -from selenium import webdriver -from selenium.webdriver.firefox.options import Options as FirefoxOptions -from selenium.webdriver.firefox.service import Service as FirefoxService -from selenium.webdriver.common.by import By -from selenium.webdriver.support.wait import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC -from selenium.common.exceptions import TimeoutException - - -flag = open("/flag").read().strip() -config = (pathlib.Path(__file__).parent / ".config").read_text() -level = int(config) -challenge_host = "challenge.localhost" -hacker_host = "hacker.localhost" -app = Flask(__name__) - - -class TemporaryDB: - def __init__(self): - self.db_file = tempfile.NamedTemporaryFile("x", suffix=".db") - - def execute(self, sql, parameters=()): - connection = sqlite3.connect(self.db_file.name) - connection.row_factory = sqlite3.Row - cursor = connection.cursor() - result = cursor.execute(sql, parameters) - connection.commit() - return result - - -db = TemporaryDB() - - -def html(code): - return f"{code}\n" - - -def form(inputs): - def form_input(**fields): - return f"" - inputs_html = "".join(form_input(name=name, type="text", placeholder=name) for name in inputs) - submit_html = form_input(id="submit", type="submit", value="Submit") - return html(f'
{inputs_html}{submit_html}
') - - -@contextlib.contextmanager -def run_browser(): - options = FirefoxOptions() - options.add_argument("--headless") - - service = FirefoxService(log_path="/dev/null") - driver = webdriver.Firefox(service=service, options=options) - - try: - yield driver - finally: - driver.quit() - - -def level1(): - path = request.args.get("path") - assert path, "Missing `path` argument" - return (pathlib.Path(app.root_path) / path).read_text() - - -def level2(): - timezone = request.args.get("timezone", "UTC") - return subprocess.check_output(f"TZ={timezone} date", shell=True, encoding="latin") - - -def level3(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password'), - (flag,)) - - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f"SELECT rowid, * FROM users WHERE username = ? AND password = ?", (username, password)).fetchone() - assert user, "Invalid `username` or `password`" - - return redirect(request.path, user=int(user["rowid"])) - - if "user" in request.args: - user_id = int(request.args["user"]) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - if user: - username = user["username"] - if username == "flag": - return f"{flag}\n" - return f"Hello, {username}!\n" - - return form(["username", "password"]) - - -def level4(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password'), - (flag,)) - - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f'SELECT rowid, * FROM users WHERE username = "{username}" AND password = "{password}"').fetchone() - assert user, "Invalid `username` or `password`" - - session["user"] = int(user["rowid"]) - return redirect(request.path) - - if session.get("user"): - user_id = int(session.get("user", -1)) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - if user: - username = user["username"] - if username == "flag": - return f"{flag}\n" - return f"Hello, {username}!\n" - - return form(["username", "password"]) - - -def level5(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? AS password'), - (flag,)) - - query = request.args.get("query", "%") - users = db.execute(f'SELECT username FROM users WHERE username LIKE "{query}"').fetchall() - return "".join(f'{user["username"]}\n' for user in users) - - -def level6(): - table_name = f"table{hash(flag) & 0xFFFFFFFFFFFFFFFF}" - db.execute((f"CREATE TABLE IF NOT EXISTS {table_name} AS " - 'SELECT "flag" AS username, ? AS password'), - (flag,)) - - query = request.args.get("query", "%") - users = db.execute(f'SELECT username FROM {table_name} WHERE username LIKE "{query}"').fetchall() - return "".join(f'{user["username"]}\n' for user in users) - - -def level7(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password'), - (flag,)) - - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f'SELECT rowid, * FROM users WHERE username = "{username}" AND password = "{password}"').fetchone() - assert user, "Invalid `username` or `password`" - - session["user"] = int(user["rowid"]) - return redirect(request.path) - - if session.get("user"): - user_id = int(session.get("user", -1)) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - if user: - username = user["username"] - return f"Hello, {username}!\n" - - return form(["username", "password"]) - - -def level8(): - if request.path == "/echo": - echo = request.args.get("echo") - assert echo, "Missing `echo` argument" - return html(echo) - - if request.path == "/visit": - url = request.args.get("url") - assert url, "Missing `url` argument" - - url_arg_parsed = urllib.parse.urlparse(url) - assert url_arg_parsed.hostname == challenge_host, f"Invalid `url`, hostname should be `{challenge_host}`" - - with run_browser() as browser: - browser.get(url) - try: - WebDriverWait(browser, 1).until(EC.alert_is_present()) - except TimeoutException: - return "Failed to alert\n" - else: - return f"{flag}\n" - - return "Not Found\n", 404 - - -def level9(): - if request.path == "/echo": - echo = request.args.get("echo") - assert echo, "Missing `echo` argument" - return html(f"") - - if request.path == "/visit": - url = request.args.get("url") - assert url, "Missing `url` argument" - - url_arg_parsed = urllib.parse.urlparse(url) - assert url_arg_parsed.hostname == challenge_host, f"Invalid `url`, hostname should be `{challenge_host}`" - - with run_browser() as browser: - browser.get(url) - try: - WebDriverWait(browser, 1).until(EC.alert_is_present()) - except TimeoutException: - return "Failed to alert\n" - else: - return f"{flag}\n" - - return "Not Found\n", 404 - - -def level10(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password, ? as leak'), - (flag, False)) - - if request.path == "/login": - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f"SELECT rowid, * FROM users WHERE username = ? AND password = ?", (username, password)).fetchone() - assert user, "Invalid `username` or `password`" - - session["user"] = int(user["rowid"]) - return redirect(request.path) - - return form(["username", "password"]) - - if request.path == "/leak": - user_id = int(session.get("user", -1)) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - assert user, "Not logged in" - db.execute(f"UPDATE users SET leak = TRUE WHERE rowid = ?", (user_id,)) - return "Leaked\n" - - if request.path == "/info": - assert "user" in request.args, "Missing `user` argument" - user_id = int(request.args["user"]) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - assert user, "Invalid `user`" - info = [user["username"]] - if user["leak"]: - info.append(user["password"]) - return " ".join(info) + "\n" - - if request.path == "/visit": - url = request.args.get("url") - assert url, "Missing `url` argument" - - url_arg_parsed = urllib.parse.urlparse(url) - assert url_arg_parsed.hostname == challenge_host, f"Invalid `url`, hostname should be `{challenge_host}`" - - with run_browser() as browser: - browser.get(f"http://{challenge_host}/login") - - user_form = { - "username": "flag", - "password": flag, - } - for name, value in user_form.items(): - field = browser.find_element(By.NAME, name) - field.send_keys(value) - - submit_field = browser.find_element(By.ID, "submit") - submit_field.submit() - WebDriverWait(browser, 10).until(EC.staleness_of(submit_field)) - - browser.get(url) - time.sleep(1) - - return "Visited\n" - - if request.path == "/echo": - echo = request.args.get("echo") - assert echo, "Missing `echo` argument" - return html(echo) - - return "Not Found\n", 404 - - -def level11(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password, ? as leak'), - (flag, False)) - - if request.path == "/login": - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f"SELECT rowid, * FROM users WHERE username = ? AND password = ?", (username, password)).fetchone() - assert user, "Invalid `username` or `password`" - - session["user"] = int(user["rowid"]) - return redirect(request.path) - - return form(["username", "password"]) - - if request.path == "/leak": - user_id = int(session.get("user", -1)) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - assert user, "Not logged in" - db.execute(f"UPDATE users SET leak = TRUE WHERE rowid = ?", (user_id,)) - return "Leaked\n" - - if request.path == "/info": - assert "user" in request.args, "Missing `user` argument" - user_id = int(request.args["user"]) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - assert user, "Invalid `user`" - info = [user["username"]] - if user["leak"]: - info.append(user["password"]) - return " ".join(info) + "\n" - - if request.path == "/visit": - url = request.args.get("url") - assert url, "Missing `url` argument" - - url_arg_parsed = urllib.parse.urlparse(url) - assert url_arg_parsed.hostname == hacker_host, f"Invalid `url`, hostname should be `{hacker_host}`" - - with run_browser() as browser: - browser.get(f"http://{challenge_host}/login") - - user_form = { - "username": "flag", - "password": flag, - } - for name, value in user_form.items(): - field = browser.find_element(By.NAME, name) - field.send_keys(value) - - submit_field = browser.find_element(By.ID, "submit") - submit_field.submit() - WebDriverWait(browser, 10).until(EC.staleness_of(submit_field)) - - browser.get(url) - time.sleep(1) - - return "Visited\n" - - return "Not Found\n", 404 - - -def level12(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password, ? as leak'), - (flag, False)) - - if request.path == "/login": - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f"SELECT rowid, * FROM users WHERE username = ? AND password = ?", (username, password)).fetchone() - assert user, "Invalid `username` or `password`" - - session["user"] = int(user["rowid"]) - return redirect(request.path) - - return form(["username", "password"]) - - if request.path == "/leak" and request.method == "POST": - user_id = int(session.get("user", -1)) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - assert user, "Not logged in" - db.execute(f"UPDATE users SET leak = TRUE WHERE rowid = ?", (user_id,)) - return "Leaked\n" - - if request.path == "/info": - assert "user" in request.args, "Missing `user` argument" - user_id = int(request.args["user"]) - user = db.execute("SELECT * FROM users WHERE rowid = ?", (user_id,)).fetchone() - assert user, "Invalid `user`" - info = [user["username"]] - if user["leak"]: - info.append(user["password"]) - return " ".join(info) + "\n" - - if request.path == "/visit": - url = request.args.get("url") - assert url, "Missing `url` argument" - - url_arg_parsed = urllib.parse.urlparse(url) - assert url_arg_parsed.hostname == hacker_host, f"Invalid `url`, hostname should be `{hacker_host}`" - - with run_browser() as browser: - browser.get(f"http://{challenge_host}/login") - - user_form = { - "username": "flag", - "password": flag, - } - for name, value in user_form.items(): - field = browser.find_element(By.NAME, name) - field.send_keys(value) - - submit_field = browser.find_element(By.ID, "submit") - submit_field.submit() - WebDriverWait(browser, 10).until(EC.staleness_of(submit_field)) - - browser.get(url) - time.sleep(1) - - return "Visited\n" - - return "Not Found\n", 404 - - -def level13(): - app.config['SESSION_COOKIE_HTTPONLY'] = False - - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password'), - (flag,)) - - if request.path == "/login": - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f"SELECT rowid, * FROM users WHERE username = ? AND password = ?", (username, password)).fetchone() - assert user, "Invalid `username` or `password`" - - session["user"] = int(user["rowid"]) - return redirect(request.path) - - return form(["username", "password"]) - - if request.path == "/info": - session_user_id = int(session.get("user", -1)) - session_user = db.execute("SELECT * FROM users WHERE rowid = ?", (session_user_id,)).fetchone() - - request_user_id = int(request.args.get("user", session_user_id)) - request_user = db.execute("SELECT * FROM users WHERE rowid = ?", (request_user_id,)).fetchone() - assert request_user, "Invalid `user`" - - info = [request_user["username"]] - if request_user_id == session_user_id: - info.append(request_user["password"]) - return " ".join(info) + "\n" - - if request.path == "/visit": - url = request.args.get("url") - assert url, "Missing `url` argument" - - url_arg_parsed = urllib.parse.urlparse(url) - assert url_arg_parsed.hostname == challenge_host, f"Invalid `url`, hostname should be `{challenge_host}`" - - with run_browser() as browser: - browser.get(f"http://{challenge_host}/login") - - user_form = { - "username": "flag", - "password": flag, - } - for name, value in user_form.items(): - field = browser.find_element(By.NAME, name) - field.send_keys(value) - - submit_field = browser.find_element(By.ID, "submit") - submit_field.submit() - WebDriverWait(browser, 10).until(EC.staleness_of(submit_field)) - - browser.get(url) - time.sleep(1) - - return "Visited\n" - - if request.path == "/echo": - echo = request.args.get("echo") - assert echo, "Missing `echo` argument" - return html(echo) - - return "Not Found\n", 404 - - -def level14(): - db.execute(("CREATE TABLE IF NOT EXISTS users AS " - 'SELECT "flag" AS username, ? as password'), - (flag,)) - - if request.path == "/login": - if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") - assert username, "Missing `username` form" - assert password, "Missing `password` form" - - user = db.execute(f"SELECT rowid, * FROM users WHERE username = ? AND password = ?", (username, password)).fetchone() - assert user, "Invalid `username` or `password`" - - session["user"] = int(user["rowid"]) - return redirect(request.path) - - return form(["username", "password"]) - - if request.path == "/info": - session_user_id = int(session.get("user", -1)) - session_user = db.execute("SELECT * FROM users WHERE rowid = ?", (session_user_id,)).fetchone() - - request_user_id = int(request.args.get("user", session_user_id)) - request_user = db.execute("SELECT * FROM users WHERE rowid = ?", (request_user_id,)).fetchone() - assert request_user, "Invalid `user`" - - info = [request_user["username"]] - if request_user_id == session_user_id: - info.append(request_user["password"]) - return " ".join(info) + "\n" - - if request.path == "/visit": - url = request.args.get("url") - assert url, "Missing `url` argument" - - url_arg_parsed = urllib.parse.urlparse(url) - assert url_arg_parsed.hostname == challenge_host, f"Invalid `url`, hostname should be `{challenge_host}`" - - with run_browser() as browser: - browser.get(f"http://{challenge_host}/login") - - user_form = { - "username": "flag", - "password": flag, - } - for name, value in user_form.items(): - field = browser.find_element(By.NAME, name) - field.send_keys(value) - - submit_field = browser.find_element(By.ID, "submit") - submit_field.submit() - WebDriverWait(browser, 10).until(EC.staleness_of(submit_field)) - - browser.get(url) - time.sleep(1) - - return "Visited\n" - - if request.path == "/echo": - echo = request.args.get("echo") - assert echo, "Missing `echo` argument" - return html(echo) - - return "Not Found\n", 404 - - -def level15(): - if "libgreet" not in globals(): - global libgreet - shared_library_file = tempfile.NamedTemporaryFile("x", suffix=".so") - gcc_args = ["/usr/bin/gcc", "-x", "c", "-shared", "-fPIC", "-fno-stack-protector", "-o", shared_library_file.name, "-"] - subprocess.run(gcc_args, input=r""" - #include - #include - #include - #include - - void __attribute__ ((constructor)) disable_buffering(void) { - setvbuf(stdout, NULL, _IONBF, 0); - } - - void win(void) - { - char flag[256] = { 0 }; - int flag_fd; - - puts("You win! Here is your flag:"); - - flag_fd = open("/flag", 0); - read(flag_fd, flag, sizeof(flag)); - puts(flag); - } - - void * win_address(void) - { - return win; - } - - void greet(char *name, size_t length) - { - char buffer[256] = { 0 }; - - memcpy(buffer, "Hello, ", 7); - memcpy(buffer + 7, name, length); - memcpy(buffer + 7 + length, "!", 1); - - puts(buffer); - } - """.encode()) - libgreet = ctypes.CDLL(shared_library_file.name) - libgreet.win_address.restype = ctypes.c_void_p - - if request.path == "/win_address": - return f"{hex(libgreet.win_address())}\n" - - if request.path == "/greet": - name = request.args.get("name") - assert name, "Missing `name` argument" - - def stream_greet(): - r, w = os.pipe() - pid = os.fork() - - if pid == 0: - os.close(r) - os.dup2(w, 1) - name_buffer = ctypes.create_string_buffer(name.encode("latin")) - libgreet.greet(name_buffer, len(name)) - os._exit(0) - - os.close(w) - while True: - data = os.read(r, 256) - if not data: - break - yield data - os.wait() - - return stream_greet() - - return "Not Found\n", 404 - - -@app.before_request -def before_request(): - if request.host != challenge_host: - return "Service Unavailable\n", 503 - - -@app.route("/", methods=["GET", "POST"]) -@app.route("/", methods=["GET", "POST"]) -def catch_all(path=""): - challenge = globals()[f"level{level}"] - return challenge() - - -@app.after_request -def after_request(response): - response.headers["Server"] = "pwn.college" - del response.headers["Content-Type"] - return response - - -@app.errorhandler(AssertionError) -def assertion_handler(error): - print(f"{error}\n") - return f"{error}\n", 400 - - -@app.errorhandler(Exception) -def exception_handler(error): - import traceback - print(traceback.format_exc()) - return traceback.format_exc(), 500 - - -def challenge(): - os.environ.clear() - os.environ["PATH"] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" - os.setuid(os.geteuid()) - - app.secret_key = flag - - challenge = globals()[f"level{level}"] - challenge_code = inspect.getsource(challenge) - - print("===== Welcome to Web Security! =====") - print("In this series of challenges, you will be working to break web applications!") - print("Read the code for the level, find the security vulnerability, and develop an exploit to get the flag.") - print("Note that the web application is running Python and using the Flask library, so see the Flask documentation as necessary.") - print("") - print("Here's the source code of this level:") - print(challenge_code) - - - print("") - print("Note: you will need to use another terminal or browser to interact with this web server.") - print("") - print("Now running the web server:") - print("") - app.run(challenge_host, 80) - - -challenge()