From 6355fc9b27919a964c4a8a51bfba61e3f3f5a513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20Vask=C3=B3?= <1771332+vlaci@users.noreply.github.com> Date: Tue, 6 Jun 2023 16:54:08 +0200 Subject: [PATCH] feat: introduce landlock based sandboxing Co-authored-by: Quentin Kaiser --- unblob/processing.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/unblob/processing.py b/unblob/processing.py index 83eb9dc64d..31fb95c5cc 100644 --- a/unblob/processing.py +++ b/unblob/processing.py @@ -1,5 +1,6 @@ import multiprocessing import shutil +import sys from operator import attrgetter from pathlib import Path from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type, Union @@ -9,6 +10,7 @@ import plotext as plt from structlog import get_logger from unblob_native import math_tools as mt +from unblob_native.sandbox import AccessFS, restrict_access from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers @@ -111,6 +113,29 @@ def get_extract_dir_for(self, path: Path) -> Path: return extract_dir.expanduser().resolve() +def sandbox(extract_dir: Path, report_file: Optional[Path]): + restrictions = [ + AccessFS.read("/"), + AccessFS.read_write("/dev/shm"), # noqa: S108 + AccessFS.read_write(extract_dir.as_posix()), + AccessFS.make_dir(extract_dir.parent.as_posix()), + ] + + if report_file: + restrictions += [ + AccessFS.read_write(report_file), + AccessFS.make_reg(report_file.parent), + ] + + if "pytest" in sys.modules: + restrictions += [ + AccessFS.read_write("/tmp"), # noqa: S108 + AccessFS.read_write(Path(__file__).parent.parent.resolve().as_posix()), + ] + + restrict_access(*restrictions) + + @terminate_gracefully def process_file( config: ExtractionConfig, input_path: Path, report_file: Optional[Path] = None @@ -135,6 +160,10 @@ def process_file( ) return ProcessResult() + if not hasattr(process_file, "_sandboxed"): + sandbox(extract_dir, report_file) + process_file._sandboxed = True # noqa: SLF001 + process_result = _process_task(config, task) if not config.skip_extraction: