From fa263a0ae4d8df03fc3e60a2136f26b4783922c7 Mon Sep 17 00:00:00 2001 From: Robin David Date: Fri, 28 Jul 2023 23:51:20 +0200 Subject: [PATCH] add option for replay timeout +fix proxy issues --- bin/pastis-benchmark | 6 ++- engines/pastis-triton/pastisdse/pastisdse.py | 28 ++++++++--- pastisbroker/broker.py | 52 +++++++++++++------- pastisbroker/coverage.py | 29 ++++++----- 4 files changed, 75 insertions(+), 40 deletions(-) diff --git a/bin/pastis-benchmark b/bin/pastis-benchmark index 338bf9d..6e3dc96 100755 --- a/bin/pastis-benchmark +++ b/bin/pastis-benchmark @@ -163,12 +163,13 @@ def showmap(bins: str): @click.option('--filter-inputs', type=bool, is_flag=True, default=False, help="Filter inputs that do not generate coverage", show_default=True) @click.option('--stream', type=bool, is_flag=True, default=False, help="Stream input and coverage info in the given file", show_default=True) @click.option('--replay-threads', type=int, default=4, help="number of threads to use for input replay", show_default=True) +@click.option('--replay-timeout', type=int, default=60, help="Timeout for seed replay", show_default=True) @click.option('--proxy', type=str, default="", help="Run the broker as a proxy to another broker: pymodule@ip:port") @click.argument('pargs', nargs=-1) def run(workspace: str, bins: str, seeds: str, mode: str, injloc: str, aflpp: bool, hfuzz: bool, triton: bool, debug: bool, timeout: Optional[int], port: int, hfuzz_path: str, hfuzz_threads: int, spawn: bool, allow_remote: bool, probe: Tuple[str], skip_cpufreq: bool, mem_threshold: int, start_quorum: int, proxy: str, - filter_inputs: bool, stream: bool, replay_threads: int, pargs: Tuple[str]): + filter_inputs: bool, stream: bool, replay_threads: int, replay_timeout: int, pargs: Tuple[str]): configure_logging(logging.DEBUG if debug else logging.INFO, "%(asctime)s %(name)s [%(levelname)s] %(message)s") @@ -183,7 +184,8 @@ def run(workspace: str, bins: str, seeds: str, mode: str, injloc: str, aflpp: bo start_quorum, filter_inputs, stream, - replay_threads) + replay_threads, + replay_timeout) if proxy: # proxy format should be: IP:port@py_module try: diff --git a/engines/pastis-triton/pastisdse/pastisdse.py b/engines/pastis-triton/pastisdse/pastisdse.py index c3c73c7..a8d9810 100644 --- a/engines/pastis-triton/pastisdse/pastisdse.py +++ b/engines/pastis-triton/pastisdse/pastisdse.py @@ -3,6 +3,7 @@ import os import time import logging +from hashlib import md5 from pathlib import Path import threading import platform @@ -24,6 +25,17 @@ from tritondse.trace import QBDITrace, TraceException from tritondse.worklist import FreshSeedPrioritizerWorklist, WorklistAddressToSet +def to_h(seed: Seed) -> str: + if seed.is_composite(): + if PastisDSE.INPUT_FILE_NAME in seed.content.files: + return md5(seed.content.files[PastisDSE.INPUT_FILE_NAME]).hexdigest() + elif "stdin" in seed.content.files: + return md5(seed.content.files["stdin"]).hexdigest() + else: + raise NameError("can't find main payload in Seed") + else: + return md5(seed.content).hexdigest() + class PastisDSE(object): @@ -492,11 +504,11 @@ def seed_received(self, typ: SeedType, seed: bytes): seed = self._get_seed(seed) if seed in self._seed_received: - logging.warning(f"receiving seed already known: {seed.hash} (dropped)") + logging.warning(f"receiving seed already known: {to_h(seed)} (dropped)") return else: self._seed_queue.put((seed, typ)) - logging.info(f"seed received {seed.hash} (pool: {self._seed_queue.qsize()})") + logging.info(f"seed received {to_h(seed)} (pool: {self._seed_queue.qsize()})") def _process_seed_received(self, typ: SeedType, seed: Seed): @@ -515,7 +527,7 @@ def _process_seed_received(self, typ: SeedType, seed: Seed): else: # Try running the seed to know whether to keep it # NOTE: re-run the seed regardless of its status coverage = None - logging.info(f"process seed received {seed.hash} (pool: {self._seed_queue.qsize()})") + logging.info(f"process seed received {to_h(seed)} (pool: {self._seed_queue.qsize()})") data = seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.bytes() self.replay_seed_file.write_bytes(data) @@ -555,7 +567,7 @@ def _process_seed_received(self, typ: SeedType, seed: Seed): logging.warning('There was an error while trying to re-run the seed') if not coverage: - logging.warning(f"coverage not found after replaying: {seed.hash} [{typ.name}] (add it anyway)") + logging.warning(f"coverage not found after replaying: {to_h(seed)} [{typ.name}] (add it anyway)") # Add the seed anyway, if it was not possible to re-run the seed. # TODO Set seed.coverage_objectives as "empty" (use ellipsis # object). Modify WorklistAddressToSet to support it. @@ -564,7 +576,7 @@ def _process_seed_received(self, typ: SeedType, seed: Seed): else: # Check whether the seed improves the current coverage. if self.dse.coverage.improve_coverage(coverage): - logging.info(f"seed added {seed.hash} [{typ.name}] (coverage merged)") + logging.info(f"seed added {to_h(seed)} [{typ.name}] (coverage merged)") self.seeds_merged += 1 self.dse.coverage.merge(coverage) self.dse.seeds_manager.worklist.update_worklist(coverage) @@ -572,7 +584,7 @@ def _process_seed_received(self, typ: SeedType, seed: Seed): seed.coverage_objectives = self.dse.coverage.new_items_to_cover(coverage) self.dse.add_input_seed(seed) else: - logging.info(f"seed archived {seed.hash} [{typ.name}] (NOT merging coverage)") + logging.info(f"seed archived {to_h(seed)} [{typ.name}] (NOT merging coverage)") self.seeds_rejected += 1 #self.dse.seeds_manager.archive_seed(seed) # logging.info(f"seed archived {seed.hash} [{typ.name}]") @@ -596,7 +608,7 @@ def stop_received(self): if self.dse: self.dse.stop_exploration() - self.save_stats() # Save stats + self.save_stats() # Save stats self._stop = True # self.agent.stop() # Can't call it here as this function executed from within agent thread @@ -635,7 +647,7 @@ def dual_log(self, level: LogLevel, message: str) -> None: def send_seed_to_broker(self, se: SymbolicExecutor, state: ProcessState, seed: Seed): if seed not in self._seed_received: # Do not send back a seed that already came from broker self._sending_count += 1 - logging.info(f"Sending new: {seed.hash} [{self._sending_count}]") + logging.info(f"Sending new: {to_h(seed)} [{self._sending_count}]") bytes = seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.content self.agent.send_seed(SeedType.INPUT, bytes) diff --git a/pastisbroker/broker.py b/pastisbroker/broker.py index 418d5cd..2678806 100644 --- a/pastisbroker/broker.py +++ b/pastisbroker/broker.py @@ -51,7 +51,8 @@ def __init__(self, workspace: PathLike, start_quorum: int = 0, filter_inputs: bool = False, stream: bool = False, - replay_threads: int = 4): + replay_threads: int = 4, + replay_timeout: int = 60): super(PastisBroker, self).__init__() # Initialize workspace @@ -126,7 +127,7 @@ def __init__(self, workspace: PathLike, if (path := self.find_vanilla_binary()) is not None: # Find an executable suitable for coverage logging.info(f"Coverage binary: {path}") stream_file = self.workspace.coverage_history if stream else "" - self._coverage_manager = CoverageManager(replay_threads, filter_inputs, path, self.argv, self.inject, stream_file) + self._coverage_manager = CoverageManager(replay_threads, replay_timeout, filter_inputs, path, self.argv, self.inject, stream_file) else: logging.warning("filtering or stream enabled but cannot find vanilla binary") @@ -585,7 +586,21 @@ def start(self, running: bool = True): for seed in self._init_seed_pool.keys(): # Push initial corpus to set baseline coverage fname = self.mk_input_name("INITIAL", seed) sp = fname.split("_") - covi = ClientInput(seed, "", f"{sp[0]}_{sp[1]}", sp[2], sp[4], fname, SeedType.INPUT, b"INITIAL", "INITIAL", "GRANTED", "", -1, []) + hash = sp[4].split(".")[0] + covi = ClientInput( + content=seed, + log_time="", + recv_time=f"{sp[0]}_{sp[1]}", + elapsed=sp[2], + hash=hash, + path=fname, + seed_status=SeedType.INPUT, + fuzzer_id=b"INITIAL", + fuzzer_name="INITIAL", + broker_status="GRANTED", # Unless rejected (later) + replay_status="", + replay_time=-1, + new_coverage=[]) self._coverage_manager.push_input(covi) if self.is_proxied and self._proxy_cli: @@ -621,24 +636,26 @@ def run(self, timeout: int = None): if cli.engine.SHORT_NAME == "TT": # is triton self.kick_client(cli.netid) - # if inputs are filtered. Get granted inputs and forward them to appropriate clients - if self.filter_inputs: - for item in self._coverage_manager.iter_granted_inputs(): - self.seed_granted(item.fuzzer_id, item.seed_status, item.content) - # Check if we received the start signal from the proxy-master if self._proxy_start_signal: + logging.info("signal received start clients !") self._proxy_start_signal = False self.start_pending_clients() - # Check if there are seed coming from the proxy-master to forward to clients - if not self._proxy_seed_queue.empty(): - try: - while True: - origin, typ, seed = self._proxy_seed_queue.get_nowait() - self.send_seed_to_all_others(origin, typ, seed) - except queue.Empty: - pass + if self._running: # Perform following checks only if running + # if inputs are filtered. Get granted inputs and forward them to appropriate clients + if self.filter_inputs: + for item in self._coverage_manager.iter_granted_inputs(): + self.seed_granted(item.fuzzer_id, item.seed_status, item.content) + + # Check if there are seed coming from the proxy-master to forward to clients + if not self._proxy_seed_queue.empty(): + try: + while True: + origin, typ, seed = self._proxy_seed_queue.get_nowait() + self.send_seed_to_all_others(origin, typ, seed) + except queue.Empty: + pass if self._stop: logging.info("broker terminate") @@ -776,12 +793,13 @@ def _proxy_start_received(self, fname: str, binary: bytes, engine: FuzzingEngine # FIXME: Use parameters received logging.info("[PROXY] start received !") self._running = True + self._proxy_start_signal = True # if self._running: # self.start_pending_clients() def _proxy_seed_received(self, typ: SeedType, seed: bytes): # Forward the seed to underlying clients - logging.info(f"[PROXY] seed {typ.name} received forward to agents") + logging.info(f"[PROXY] receive {md5(seed).hexdigest()} [{typ.name}] (forward it)") # Save the seed locally self.write_seed(typ, "PROXY", seed) diff --git a/pastisbroker/coverage.py b/pastisbroker/coverage.py index 8ccbc56..b14c103 100755 --- a/pastisbroker/coverage.py +++ b/pastisbroker/coverage.py @@ -44,9 +44,10 @@ class CoverageManager(object): ARGV_PLACEHOLDER = "@@" STRATEGY = CoverageStrategy.EDGE - def __init__(self, pool_size: int, filter: bool, program: str, args: list[str], inj_loc: SeedInjectLoc, stream_file: str = ""): + def __init__(self, pool_size: int, replay_timeout: int, filter: bool, program: str, args: list[str], inj_loc: SeedInjectLoc, stream_file: str = ""): # Base info for replay self.pool_size = pool_size + self.replay_timeout = replay_timeout self.filter_enabled = filter self.program = str(program) self.args = args @@ -86,7 +87,7 @@ def start(self) -> None: logging.info("Starting coverage manager") for work_id in range(self.pool_size): - self.pool.apply_async(self.worker, (self.input_queue, self.cov_queue, self.program, self.args, self.inj_loc)) + self.pool.apply_async(self.worker, (self.input_queue, self.cov_queue, self.program, self.args, self.inj_loc, self.replay_timeout)) def stop(self) -> None: self._running = False @@ -174,14 +175,17 @@ def coverage_worker(self): os.unlink(cov_file) except FileNotFoundError: if item.seed_status == SeedType.INPUT: - logging.warning(f"seed {item.hash}({item.seed_status}) can't load coverage file (maybe had crashed?)") + pass + # logging.warning(f"seed {item.hash}({item.seed_status}) can't load coverage file (maybe had crashed?)") else: - logging.info(f"seed {item.hash}({item.seed_status}) cannot get coverage (normal..)") + pass + # logging.info(f"seed {item.hash}({item.seed_status}) cannot get coverage (normal..)") # Grant input self.seeds_accepted += 1 - self.granted_queue.put(item) + if item.fuzzer_name != "INITIAL": # if not initial corpus add it + self.granted_queue.put(item) - logging.info(f"seed {item.hash} ({item.fuzzer_name}) [replay:{self.mk_rpl_status(item.replay_status)}][status:{self.mk_broker_status(item.broker_status, bool(new_items))}] ({len(new_items)} new edges)") + logging.info(f"seed {item.hash} ({item.fuzzer_name}) [replay:{self.mk_rpl_status(item.replay_status)}][{self.mk_broker_status(item.broker_status, bool(new_items))}][{int(item.replay_time):}s] ({len(new_items)} new edges) (pool:{self.input_queue.qsize()})") # Regardless if it was a success or not log it self.add_item_coverage_stream(item) except queue.Empty: @@ -209,7 +213,7 @@ def mk_broker_status(status: str, new_items: bool) -> str: return mk_color(status, Bcolors.FAIL) @staticmethod - def worker(input_queue: Queue, cov_queue: Queue, program: str, argv: list[str], seed_inj: SeedInjectLoc) -> None: + def worker(input_queue: Queue, cov_queue: Queue, program: str, argv: list[str], seed_inj: SeedInjectLoc, timeout) -> None: """ worker thread that unstack inputs and replay them. """ @@ -236,26 +240,25 @@ def worker(input_queue: Queue, cov_queue: Queue, program: str, argv: list[str], logging.error(f"seed injection {seed_inj.name} but can't find '@@' on program argv: {argv}: {e}") continue + t0 = time.time() try: # Run the seed - t0 = time.time() if QBDITrace.run(CoverageManager.STRATEGY, program, cur_argv, # argv[1:] if len(argv) > 1 else [], output_path=str(cov_file), stdin_file=str(tmpfile) if seed_inj == SeedInjectLoc.STDIN else None, cwd=Path(program).parent, - timeout=60): - item.replay_time = time.time() - t0 + timeout=timeout): item.replay_status = "SUCCESS" # logging.info(f"[worker-{pid}] replaying {item.hash} sucessful") else: item.replay_status = "FAIL_NO_COV" - logging.warning("Cannot load the coverage file generated (maybe had crashed?)") + # logging.warning("Cannot load the coverage file generated (maybe had crashed?)") except TraceException: item.replay_status = "FAIL_TIMEOUT" - logging.warning('Timeout hit, while trying to re-run the seed') - + # logging.warning('Timeout hit, while trying to re-run the seed') + item.replay_time = time.time() - t0 # Add it to the coverage queue (even if it failed cov_queue.put((item, cov_file)) except KeyboardInterrupt: