From 5adecce14ba2f5c0cc96941a58136373313498cd Mon Sep 17 00:00:00 2001 From: Jordan Blasenhauer Date: Mon, 29 Jul 2024 11:40:21 +0200 Subject: [PATCH] refactor main.py * better form params handling * refactor bans endpoint * refactor services endpoint --- src/ui/client/vite.config.dashboard.js | 2 +- src/ui/main.py | 502 +++++++++++++------------ 2 files changed, 257 insertions(+), 247 deletions(-) diff --git a/src/ui/client/vite.config.dashboard.js b/src/ui/client/vite.config.dashboard.js index 8e4ea75fdd..9954cfe31a 100644 --- a/src/ui/client/vite.config.dashboard.js +++ b/src/ui/client/vite.config.dashboard.js @@ -34,7 +34,7 @@ export default defineConfig({ }, }, esbuild: { - drop: ["console.log"], + drop: ["console.log", "console.info", "console.warn"], }, build: { minify: "esbuild", diff --git a/src/ui/main.py b/src/ui/main.py index 703f942c76..b1e54bc3ca 100644 --- a/src/ui/main.py +++ b/src/ui/main.py @@ -319,35 +319,43 @@ def run_action(plugin: str, function_name: str = ""): return {"status": "ok", "code": 200, "data": res} -def is_request_form(url_name: str, next: bool = False): - if not request.form: - flash("Missing form data.", "error") - return redirect(url_for(url_name)) +def get_user_info(): + return current_user.get_id(), current_user.password_hash, current_user.is_two_factor_enabled, current_user.secret_token -def is_request_params(params: list, url_name: str, next: bool = False): - for param in params: - if param not in request.form: - flash(f"Missing {param} parameter.", "error") - if next: - return redirect(url_for("loading", next=url_for(url_name))) +def verify_data_in_form(data: dict[str, Union[tuple, any]] = {}, err_message: str = "", redirect_url: str = "", next: bool = False) -> Union[bool, Response]: + # Loop on each key in data + for key, values in data.items(): + if key not in request.form: + return handle_error(f"Missing {key} in form", redirect_url, next, "error") - return redirect(url_for(url_name)) + # Case we want to only check if key is in form, we can skip the values check by setting values to falsy value + if not values: + continue + + if request.form[key] not in values: + return handle_error(err_message, redirect_url, next, "error") + + return True -def redirect_flash_error(message: str, url_name: str, next: bool = False, log: Union[bool, str] = False): - flash(message, "error") +def handle_error(err_message: str = "", redirect_url: str = "", next: bool = False, log: Union[bool, str] = False) -> Union[bool, Response]: + """Handle error message, flash it, log it if needed and redirect to redirect_url if provided or return False.""" + flash(err_message, "error") if log == "error": - app.logger.error(message) + app.logger.error(err_message) if log == "exception": - app.logger.exception(message) + app.logger.exception(err_message) + + if not redirect_url: + return False if next: - return redirect(url_for("loading", next=url_for(url_name))) + return redirect(url_for("loading", next=url_for(redirect_url))) - return redirect(url_for(url_name)) + return redirect(url_for(redirect_url)) def error_message(msg: str): @@ -575,9 +583,7 @@ def setup(): if request.method == "POST": if app.config["DB"].readonly: - return redirect_flash_error("Database is in read-only mode", "setup") - - is_request_form("setup") + return handle_error("Database is in read-only mode", "setup") required_keys = [] if not ui_reverse_proxy: @@ -586,17 +592,17 @@ def setup(): required_keys.extend(["admin_username", "admin_password", "admin_password_check"]) if not any(key in request.form for key in required_keys): - return redirect_flash_error(f"Missing either one of the following parameters: {', '.join(required_keys)}.", "setup") + return handle_error(f"Missing either one of the following parameters: {', '.join(required_keys)}.", "setup") if not admin_user: if len(request.form["admin_username"]) > 256: - return redirect_flash_error("The admin username is too long. It must be less than 256 characters.", "setup") + return handle_error("The admin username is too long. It must be less than 256 characters.", "setup") if request.form["admin_password"] != request.form["admin_password_check"]: - return redirect_flash_error("The passwords do not match.", "setup") + return handle_error("The passwords do not match.", "setup") if not USER_PASSWORD_RX.match(request.form["admin_password"]): - return redirect_flash_error( + return handle_error( "The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-).", "setup", ) @@ -605,21 +611,21 @@ def setup(): ret = app.config["DB"].create_ui_user(request.form["admin_username"], admin_user.password_hash, method="ui") if ret: - return redirect_flash_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error") + return handle_error(f"Couldn't create the admin user in the database: {ret}", "setup", False, "error") flash("The admin user was created successfully", "success") if not ui_reverse_proxy: server_names = db_config["SERVER_NAME"].split(" ") if request.form["server_name"] in server_names: - return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup") + return handle_error(f"The hostname {request.form['server_name']} is already in use.", "setup") else: for server_name in server_names: if request.form["server_name"] in db_config.get(f"{server_name}_SERVER_NAME", "").split(" "): - return redirect_flash_error(f"The hostname {request.form['server_name']} is already in use.", "setup") + return handle_error(f"The hostname {request.form['server_name']} is already in use.", "setup") if not REVERSE_PROXY_PATH.match(request.form["ui_host"]): - return redirect_flash_error("The hostname is not valid.", "setup") + return handle_error("The hostname is not valid.", "setup") ui_data = get_ui_data() ui_data["RELOADING"] = True @@ -679,12 +685,11 @@ def setup_loading(): @login_required def totp(): if request.method == "POST": - is_request_form("totp") - is_request_params(["totp_token"], "totp") + verify_data_in_form(data={"totp_token": None}, err_message="No token provided on /totp.", redirect_url="totp") if not current_user.check_otp(request.form["totp_token"]): - return redirect_flash_error("The token is invalid.", "totp") + return handle_error("The token is invalid.", "totp") session["totp_validated"] = True redirect(url_for("loading", next=request.form.get("next") or url_for("home"))) @@ -773,19 +778,20 @@ def home(): def account(): if request.method == "POST": if app.config["DB"].readonly: - return redirect_flash_error("Database is in read-only mode", "account") + return handle_error("Database is in read-only mode", "account") - # Check form data validity - is_request_form("account") + verify_data_in_form( + data={"operation": ("username", "password", "totp", "activate-key")}, err_message="Invalid operation parameter.", redirect_url="account" + ) if request.form["operation"] not in ("username", "password", "totp", "activate-key"): - return redirect_flash_error("Invalid operation parameter.", "account") + return handle_error("Invalid operation parameter.", "account") if request.form["operation"] == "activate-key": - is_request_params(["license"], "account") + verify_data_in_form(data={"license": None}, err_message="Missing license for operation activate key on /account.", redirect_url="account") if len(request.form["license"]) == 0: - return redirect_flash_error("The license key is empty", "account") + return handle_error("The license key is empty", "account") variable = {} variable["PRO_LICENSE_KEY"] = request.form["license"] @@ -793,15 +799,15 @@ def account(): variable = app.config["CONFIG"].check_variables(variable, {"PRO_LICENSE_KEY": request.form["license"]}) if not variable: - return redirect_flash_error("The license key variable checks returned error", "account", True) + return handle_error("The license key variable checks returned error", "account", True) # Force job to contact PRO API # by setting the last check to None metadata = app.config["DB"].get_metadata() metadata["last_pro_check"] = None - app.config["DB"].set_metadata(metadata) + app.config["DB"].set_pro_metadata(metadata) - db_metadata = app.config["DB"].get_metadata() + curr_changes = app.config["DB"].check_changes() # Reload instances def update_global_config(threaded: bool = False): @@ -818,11 +824,7 @@ def update_global_config(threaded: bool = False): ui_data["PRO_LOADING"] = True ui_data["CONFIG_CHANGED"] = True - if any( - v - for k, v in db_metadata.items() - if k in ("custom_configs_changed", "external_plugins_changed", "pro_plugins_changed", "plugins_config_changed", "instances_changed") - ): + if any(curr_changes.values()): ui_data["RELOADING"] = True ui_data["LAST_RELOAD"] = time() Thread(target=update_global_config, args=(True,)).start() @@ -834,10 +836,10 @@ def update_global_config(threaded: bool = False): return redirect(url_for("account")) - is_request_params(["operation", "curr_password"], "account") + verify_data_in_form(data={"curr_password": None}, err_message="Missing current password parameter on /account.", redirect_url="account") if not current_user.check_password(request.form["curr_password"]): - return redirect_flash_error(f"The current password is incorrect. ({request.form['operation']})", "account") + return handle_error(f"The current password is incorrect. ({request.form['operation']})", "account") username = current_user.get_id() password = request.form["curr_password"] @@ -845,24 +847,27 @@ def update_global_config(threaded: bool = False): secret_token = current_user.secret_token if request.form["operation"] == "username": - is_request_params(["admin_username"], "account") + verify_data_in_form(data={"admin_username": None}, err_message="Missing admin username parameter on /account.", redirect_url="account") if len(request.form["admin_username"]) > 256: - return redirect_flash_error("The admin username is too long. It must be less than 256 characters. (username)", "account") + return handle_error("The admin username is too long. It must be less than 256 characters. (username)", "account") username = request.form["admin_username"] logout() if request.form["operation"] == "password": - - is_request_params(["admin_password", "admin_password_check"], "account") + verify_data_in_form( + data={"admin_password": None, "admin_password_check": None}, + err_message="Missing admin password or confirm password parameter on /account.", + redirect_url="account", + ) if request.form["admin_password"] != request.form["admin_password_check"]: - return redirect_flash_error("The passwords do not match. (password)", "account") + return handle_error("The passwords do not match. (password)", "account") if not USER_PASSWORD_RX.match(request.form["admin_password"]): - return redirect_flash_error( + return handle_error( "The admin password is not strong enough. It must contain at least 8 characters, including at least 1 uppercase letter, 1 lowercase letter, 1 number and 1 special character (#@?!$%^&*-). (password)", "account", ) @@ -872,13 +877,12 @@ def update_global_config(threaded: bool = False): logout() if request.form["operation"] == "totp": - - is_request_params(["totp_token"], "account") + verify_data_in_form(data={"totp_token": None}, err_message="Missing totp token parameter on /account.", redirect_url="account") ui_data = get_ui_data() if not current_user.check_otp(request.form["totp_token"], secret=ui_data.get("CURRENT_TOTP_TOKEN", None)): - return redirect_flash_error("The totp token is invalid. (totp)", "account") + return handle_error("The totp token is invalid. (totp)", "account") session["totp_validated"] = not current_user.is_two_factor_enabled is_two_factor_enabled = session["totp_validated"] @@ -893,7 +897,7 @@ def update_global_config(threaded: bool = False): username, user.password_hash, is_two_factor_enabled, secret_token, current_user.method if request.form["operation"] == "totp" else "ui" ) if ret: - return redirect_flash_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error") + return handle_error(f"Couldn't update the admin user in the database: {ret}", "account", False, "error") flash( ( @@ -920,6 +924,7 @@ def update_global_config(threaded: bool = False): return render_template( "account.html", + username=current_user.get_id(), is_totp=current_user.is_two_factor_enabled, secret_token=secret_token, totp_qr_image=totp_qr_image, @@ -932,16 +937,20 @@ def instances(): # Manage instances if request.method == "POST": - is_request_params(["operation", "INSTANCE_ID"], "instances", True) - - # Check operation - if request.form["operation"] not in ( - "reload", - "start", - "stop", - "restart", - ): - return redirect_flash_error("Missing operation parameter on /instances.", "instances") + verify_data_in_form(data={"INSTANCE_ID": None}, err_message="Missing instance id parameter on /instances.", redirect_url="instances", next=True) + verify_data_in_form( + data={ + "operation": ( + "reload", + "start", + "stop", + "restart", + ) + }, + err_message="Missing operation parameter on /instances.", + redirect_url="instances", + next=True, + ) ui_data = get_ui_data() ui_data["RELOADING"] = True @@ -971,123 +980,117 @@ def instances(): return render_template("instances.html", title="Instances", data_server_builder=json.dumps(data_server_builder)) -@app.route("/services", methods=["GET", "POST"]) -@login_required -def services(): - if request.method == "POST": - if app.config["DB"].readonly: - return redirect_flash_error("Database is in read-only mode", "services") - - is_request_params(["operation", "is_draft"], "services", True) - - # Check operation - if request.form["operation"] not in ("new", "edit", "delete"): - return redirect_flash_error("Missing operation parameter on /services.", "services") - - if request.form["is_draft"] not in ("yes", "no"): - return redirect_flash_error("Missing is_draft parameter on /services.", "services") +def get_service_data(): + config = app.config["DB"].get_config(methods=True, with_drafts=True) + # Check variables + variables = deepcopy(request.form.to_dict()) + del variables["csrf_token"] + operation = variables.pop("operation") + + # Delete custom client variables + variables.pop("SECURITY_LEVEL", None) + variables.pop("mode", None) + + # Get server name and old one + old_server_name = "" + if variables.get("OLD_SERVER_NAME"): + old_server_name = variables.get("OLD_SERVER_NAME", "") + del variables["OLD_SERVER_NAME"] + + server_name = variables["SERVER_NAME"].split(" ")[0] if "SERVER_NAME" in variables else old_server_name + + # Get draft if exists + was_draft = config.get(f"{server_name}_IS_DRAFT", {"value": "no"})["value"] == "yes" + is_draft = was_draft if not variables.get("is_draft") else variables.get("is_draft") == "yes" + if variables.get("is_draft"): + del variables["is_draft"] + + is_draft_unchanged = is_draft == was_draft + + # Get all variables starting with custom_config and delete them from variables + custom_configs = [] + config_types = ( + "http", + "stream", + "server-http", + "server-stream", + "default-server-http", + "default-server-stream", + "modsec", + "modsec-crs", + "crs-plugins-before", + "crs-plugins-after", + ) - # Check variables - variables = deepcopy(request.form.to_dict()) - del variables["csrf_token"] + for variable in variables: + if variable.startswith("custom_config_"): + custom_configs.append(variable) + del variables[variable] + + # custom_config variable format is custom_config__ + # we want a list of dict with each dict containing type, filename, action and server name + # after getting all configs, we want to save them after the end of current service action + # to avoid create config for none existing service or in case editing server name + format_configs = [] + for custom_config in custom_configs: + # first remove custom_config_ prefix + custom_config = custom_config.split("custom_config_")[1] + # then split the config into type, filename, action + custom_config = custom_config.split("_") + # check if the config is valid + if len(custom_config) == 2 and custom_config[0] in config_types: + format_configs.append({"type": custom_config[0], "filename": custom_config[1], "action": operation, "server_name": server_name}) + else: + return handle_error(err_message=f"Invalid custom config {custom_config}", redirect_url="services", next=True) - # Delete custom client variables - variables.pop("SECURITY_LEVEL", None) - variables.pop("mode", None) + # Edit check fields and remove already existing ones + for variable, value in variables.copy().items(): + if ( + variable in variables + and variable != "SERVER_NAME" + and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, {"value": None})["value"] + ): + del variables[variable] - is_draft = variables.pop("is_draft", "no") == "yes" + variables = app.config["CONFIG"].check_variables(variables, config) + return config, variables, format_configs, server_name, old_server_name, operation, is_draft, was_draft, is_draft_unchanged - if "OLD_SERVER_NAME" not in request.form and request.form["operation"] == "edit": - return redirect_flash_error("Missing OLD_SERVER_NAME parameter.", "services", True) - if "SERVER_NAME" not in variables: - variables["SERVER_NAME"] = variables["OLD_SERVER_NAME"] +@app.route("/services", methods=["GET", "POST"]) +@login_required +def services(): + if request.method == "POST": + if app.config["DB"].readonly: + return handle_error("Database is in read-only mode", "services") - config = app.config["DB"].get_config(methods=True, with_drafts=True) - server_name = variables["SERVER_NAME"].split(" ")[0] - was_draft = config.get(f"{server_name}_IS_DRAFT", {"value": "no"})["value"] == "yes" - operation = request.form["operation"] - # Get all variables starting with custom_config and delete them from variables - custom_configs = [] - config_types = ( - "http", - "stream", - "server-http", - "server-stream", - "default-server-http", - "default-server-stream", - "modsec", - "modsec-crs", - "crs-plugins-before", - "crs-plugins-after", + verify_data_in_form( + data={"operation": ("edit", "new", "delete")}, + err_message="Invalid operation parameter on /services.", + redirect_url="services", ) - for variable in variables: - if variable.startswith("custom_config_"): - custom_configs.append(variable) - del variables[variable] + config, variables, format_configs, server_name, old_server_name, operation, is_draft, was_draft, is_draft_unchanged = get_service_data() - # custom_config variable format is custom_config__ - # we want a list of dict with each dict containing type, filename, action and server name - # after getting all configs, we want to save them after the end of current service action - # to avoid create config for none existing service or in case editing server name - format_configs = [] - for custom_config in custom_configs: - # first remove custom_config_ prefix - custom_config = custom_config.split("custom_config_")[1] - # then split the config into type, filename, action - custom_config = custom_config.split("_") - # check if the config is valid - if len(custom_config) == 2 and custom_config[0] in config_types: - format_configs.append({"type": custom_config[0], "filename": custom_config[1], "action": operation, "server_name": server_name}) - else: - return redirect_flash_error(f"Invalid custom config {custom_config}", "services", True) + if request.form["operation"] == "edit": + if is_draft_unchanged and len(variables) == 1 and "SERVER_NAME" in variables and server_name == old_server_name: + return handle_error("The service was not edited because no values were changed.", "services", True) - if request.form["operation"] in ("new", "edit"): - del variables["operation"] - del variables["OLD_SERVER_NAME"] - - # Edit check fields and remove already existing ones - for variable, value in variables.copy().items(): - if ( - variable in variables - and variable != "SERVER_NAME" - and value == config.get(f"{server_name}_{variable}" if request.form["operation"] == "edit" else variable, {"value": None})["value"] - ): - del variables[variable] - - variables = app.config["CONFIG"].check_variables(variables, config) - - if ( - was_draft == is_draft - and request.form["operation"] == "edit" - and len(variables) == 1 - and "SERVER_NAME" in variables - and variables["SERVER_NAME"] == request.form.get("OLD_SERVER_NAME", "") - ): - return redirect_flash_error("The service was not edited because no values were changed.", "services", True) - - elif request.form["operation"] == "new" and not variables: - return redirect_flash_error("The service was not created because all values had the default value.", "services", True) + if request.form["operation"] == "new" and not variables: + return handle_error("The service was not created because all values had the default value.", "services", True) # Delete if request.form["operation"] == "delete": - is_request_params(["SERVER_NAME"], "services", True) - - variables = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]}, config) + is_service = app.config["CONFIG"].check_variables({"SERVER_NAME": request.form["SERVER_NAME"]}, config) - if not variables: + if not is_service: error_message(f"Error while deleting the service {request.form['SERVER_NAME']}") if config.get(f"{request.form['SERVER_NAME'].split(' ')[0]}_SERVER_NAME", {"method": "scheduler"})["method"] != "ui": - return redirect_flash_error("The service cannot be deleted because it has not been created with the UI.", "services", True) + return handle_error("The service cannot be deleted because it has not been created with the UI.", "services", True) db_metadata = app.config["DB"].get_metadata() - old_server_name = request.form.get("OLD_SERVER_NAME", "") - operation = request.form["operation"] - def update_services(threaded: bool = False): wait_applying() @@ -1185,7 +1188,7 @@ def update_services(threaded: bool = False): def global_config(): if request.method == "POST": if app.config["DB"].readonly: - return redirect_flash_error("Database is in read-only mode", "global_config") + return handle_error("Database is in read-only mode", "global_config") # Check variables variables = request.form.to_dict().copy() @@ -1203,7 +1206,7 @@ def global_config(): variables = app.config["CONFIG"].check_variables(variables, config) if not variables: - return redirect_flash_error("The global configuration was not edited because no values were changed.", "global_config", True) + return handle_error("The global configuration was not edited because no values were changed.", "global_config", True) for variable, value in variables.copy().items(): for service in services: @@ -1264,31 +1267,28 @@ def configs(): if request.method == "POST": if app.config["DB"].readonly: - return redirect_flash_error("Database is in read-only mode", "configs") + return handle_error("Database is in read-only mode", "configs") operation = "" - is_request_params(["operation"], "configs", True) - - # Check operation - if request.form["operation"] not in ( - "new", - "edit", - "delete", - ): - return redirect_flash_error("Operation parameter is invalid on /configs.", "configs", True) + verify_data_in_form( + data={"operation": ("new", "edit", "delete"), "type": "file", "path": None}, + err_message="Invalid operation parameter on /configs.", + redirect_url="configs", + next=True, + ) # Check variables variables = deepcopy(request.form.to_dict()) del variables["csrf_token"] if variables["type"] != "file": - return redirect_flash_error("Invalid type parameter on /configs.", "configs", True) + return handle_error("Invalid type parameter on /configs.", "configs", True) operation = app.config["CONFIGFILES"].check_path(variables["path"]) if operation: - return redirect_flash_error(operation, "configs", True) + return handle_error(operation, "configs", True) old_name = variables.get("old_name", "").replace(".conf", "") name = variables.get("name", old_name).replace(".conf", "") @@ -1297,15 +1297,15 @@ def configs(): root_dir = path_exploded[4].replace("-", "_").lower() if not old_name and not name: - return redirect_flash_error("Missing name parameter on /configs.", "configs", True) + return handle_error("Missing name parameter on /configs.", "configs", True) index = -1 for i, db_config in enumerate(db_configs): if db_config["type"] == root_dir and db_config["name"] == name and db_config["service_id"] == service_id: if request.form["operation"] == "new": - return redirect_flash_error(f"Config {name} already exists{f' for service {service_id}' if service_id else ''}", "configs", True) + return handle_error(f"Config {name} already exists{f' for service {service_id}' if service_id else ''}", "configs", True) elif db_config["method"] not in ("ui", "manual"): - return redirect_flash_error( + return handle_error( f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it was not created by the UI or manually", "configs", True, @@ -1316,7 +1316,7 @@ def configs(): # New or edit a config if request.form["operation"] in ("new", "edit"): if not app.config["CONFIGFILES"].check_name(name): - return redirect_flash_error( + return handle_error( f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores, dots and hyphens (min 4 characters and max 64))", "configs", True, @@ -1329,14 +1329,14 @@ def configs(): operation = f"Created config {name}{f' for service {service_id}' if service_id else ''}" elif request.form["operation"] == "edit": if index == -1: - return redirect_flash_error( + return handle_error( f"Can't edit config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True ) if old_name != name: db_configs[index]["name"] = name elif db_configs[index]["data"] == content: - return redirect_flash_error( + return handle_error( f"Config {name} was not edited because no values were changed{f' for service {service_id}' if service_id else ''}", "configs", True, @@ -1348,9 +1348,7 @@ def configs(): # Delete a config elif request.form["operation"] == "delete": if index == -1: - return redirect_flash_error( - f"Can't delete config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True - ) + return handle_error(f"Can't delete config {name}{f' for service {service_id}' if service_id else ''} because it doesn't exist", "configs", True) del db_configs[index] operation = f"Deleted config {name}{f' for service {service_id}' if service_id else ''}" @@ -1358,7 +1356,7 @@ def configs(): error = app.config["DB"].save_custom_configs([config for config in db_configs if config["method"] == "ui"], "ui") if error: app.logger.error(f"Could not save custom configs: {error}") - return redirect_flash_error("Couldn't save custom configs", "configs", True) + return handle_error("Couldn't save custom configs", "configs", True) ui_data = get_ui_data() ui_data["CONFIG_CHANGED"] = True @@ -1388,20 +1386,25 @@ def plugins(): if request.method == "POST": if app.config["DB"].readonly: - return redirect_flash_error("Database is in read-only mode", "plugins") + return handle_error("Database is in read-only mode", "plugins") + + verify_data_in_form( + data={"operation": ("delete"), "type": None}, + err_message="Missing type parameter for operation delete on /plugins.", + redirect_url="plugins", + next=True, + ) error = 0 # Delete plugin - if "operation" in request.form and request.form["operation"] == "delete": - - is_request_params(["type"], "plugins", True) + if request.form["operation"] == "delete": # Check variables variables = deepcopy(request.form.to_dict()) del variables["csrf_token"] if variables["type"] in ("core", "pro"): - return redirect_flash_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True) + return handle_error(f"Can't delete {variables['type']} plugin {variables['name']}", "plugins", True) db_metadata = app.config["DB"].get_metadata() @@ -1450,7 +1453,7 @@ def update_plugins(threaded: bool = False): # type: ignore else: # Upload plugins if not tmp_ui_path.exists() or not listdir(str(tmp_ui_path)): - return redirect_flash_error("Please upload new plugins to reload plugins", "plugins", True) + return handle_error("Please upload new plugins to reload plugins", "plugins", True) errors = 0 files_count = 0 @@ -2189,8 +2192,14 @@ def reports(): @app.route("/bans", methods=["GET", "POST"]) @login_required def bans(): - if request.method == "POST" and app.config["DB"].readonly: - return redirect_flash_error("Database is in read-only mode", "bans") + if request.method == "POST": + + if app.config["DB"].readonly: + return handle_error("Database is in read-only mode", "bans") + + # Check variables + verify_data_in_form(data={"operation": ("ban", "unban")}, err_message="Invalid operation parameter on /bans.", redirect_url="bans") + verify_data_in_form(data={"data": None}, err_message="Missing data parameter on /bans.", redirect_url="bans") redis_client = None db_config = app.config["CONFIG"].get_config( @@ -2280,70 +2289,71 @@ def bans(): redis_client = None flash("Couldn't connect to redis, ban list might be incomplete", "error") - if request.method == "POST": - # Check variables - is_request_form("bans") - - is_request_params(["operation", "data"], "bans") - + def get_load_data(): try: data = json_loads(request.form["data"]) assert isinstance(data, list) + return data except BaseException: - return redirect_flash_error("Data must be a list of dict", "bans", False, "exception") + return handle_error("Data must be a list of dict", "bans", False, "exception") - if request.form["operation"] not in ("ban", "unban"): - return redirect_flash_error("Operation unknown", "bans") + if request.method == "POST" and request.form["operation"] == "unban": - if request.form["operation"] == "unban": - for unban in data: - try: - unban = json_loads(unban.replace('"', '"').replace("'", '"')) - except BaseException: - flash(f"Invalid unban: {unban}, skipping it ...", "error") - app.logger.exception(f"Couldn't unban {unban['ip']}") - continue + data = get_load_data() - if "ip" not in unban: - flash(f"Invalid unban: {unban}, skipping it ...", "error") - continue + for unban in data: + try: + unban = json_loads(unban.replace('"', '"').replace("'", '"')) + except BaseException: + flash(f"Invalid unban: {unban}, skipping it ...", "error") + app.logger.exception(f"Couldn't unban {unban['ip']}") + continue - if redis_client: - if not redis_client.delete(f"bans_ip_{unban['ip']}"): - flash(f"Couldn't unban {unban['ip']} on redis", "error") + if "ip" not in unban: + flash(f"Invalid unban: {unban}, skipping it ...", "error") + continue - resp = app.config["INSTANCES"].unban(unban["ip"]) - if resp: - flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error") - else: - flash(f"Successfully unbanned {unban['ip']}") + if redis_client: + if not redis_client.delete(f"bans_ip_{unban['ip']}"): + flash(f"Couldn't unban {unban['ip']} on redis", "error") - if request.form["operation"] == "ban": - for ban in data: - if not isinstance(ban, dict) or "ip" not in ban: - flash(f"Invalid ban: {ban}, skipping it ...", "error") - continue + resp = app.config["INSTANCES"].unban(unban["ip"]) + if resp: + flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error") + else: + flash(f"Successfully unbanned {unban['ip']}") - reason = ban.get("reason", "ui") - ban_end = 86400.0 - if "ban_end" in ban: - try: - ban_end = float(ban["ban_end"]) - except ValueError: - continue - ban_end = (datetime.fromtimestamp(ban_end) - datetime.now()).total_seconds() + return redirect(url_for("loading", next=url_for("bans"), message="Update bans")) - if redis_client: - ok = redis_client.set(f"bans_ip_{ban['ip']}", dumps({"reason": reason, "date": time()})) - if not ok: - flash(f"Couldn't ban {ban['ip']} on redis", "error") - redis_client.expire(f"bans_ip_{ban['ip']}", int(ban_end)) + if request.method == "POST" and request.form["operation"] == "ban": - resp = app.config["INSTANCES"].ban(ban["ip"], ban_end, reason) - if resp: - flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error") - else: - flash(f"Successfully banned {ban['ip']}") + data = get_load_data() + + for ban in data: + if not isinstance(ban, dict) or "ip" not in ban: + flash(f"Invalid ban: {ban}, skipping it ...", "error") + continue + + reason = ban.get("reason", "ui") + ban_end = 86400.0 + if "ban_end" in ban: + try: + ban_end = float(ban["ban_end"]) + except ValueError: + continue + ban_end = (datetime.fromtimestamp(ban_end) - datetime.now()).total_seconds() + + if redis_client: + ok = redis_client.set(f"bans_ip_{ban['ip']}", dumps({"reason": reason, "date": time()})) + if not ok: + flash(f"Couldn't ban {ban['ip']} on redis", "error") + redis_client.expire(f"bans_ip_{ban['ip']}", int(ban_end)) + + resp = app.config["INSTANCES"].ban(ban["ip"], ban_end, reason) + if resp: + flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error") + else: + flash(f"Successfully banned {ban['ip']}") return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))