Skip to content

Commit

Permalink
comments on exploration techniques
Browse files Browse the repository at this point in the history
Signed-off-by: degrigis <[email protected]>
  • Loading branch information
degrigis committed Dec 18, 2023
1 parent 71a2df4 commit c7d800f
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 3 deletions.
20 changes: 20 additions & 0 deletions greed/exploration_techniques/dfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,30 @@
class DFS(ExplorationTechnique):
"""
This Exploration technique implements a Classic Depth-First Search exploration
Args:
deferred_stash: the name of the stash where deferred states are put
"""
def __init__(self, deferred_stash='deferred'):
super(DFS, self).__init__()
self.deferred_stash = deferred_stash

def setup(self, simgr):
"""
Setup the technique.
Args:
simgr: the simulation manager
"""
if self.deferred_stash not in simgr.stashes:
simgr.stashes[self.deferred_stash] = []

def check_stashes(self, simgr, stashes, stash='active'):
"""
If multiple states are in the active stash, move all but the oldest to the deferred stash.
Args:
simgr: the simulation manager
stashes: the stashes
stash: the name of the stash to check
"""
if len(stashes[stash]) > 1:
# Pick the oldest state
keep = sorted(stashes[stash], key=lambda s: s.uuid)[0]
Expand All @@ -28,6 +42,12 @@ def check_stashes(self, simgr, stashes, stash='active'):
return stashes

def is_complete(self, simgr, stash='active'):
"""
Check if the exploration is complete: there are no active states, and no deferred states.
Args:
simgr: the simulation manager
stash: the name of the stash to check
"""
# We are done if there are no active, or, no deferred.
if len(simgr.stashes[stash]) == 0 and len(simgr.stashes[self.deferred_stash]) == 0:
return True
Expand Down
37 changes: 35 additions & 2 deletions greed/exploration_techniques/directed_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class DirectedSearch(ExplorationTechnique):
simgr.use_technique(dfs)
simgr.run(find=lambda s: s.curr_stmt.id == target_stmt_id)
Args:
target_stmt: the target Statement that we want to reach
pruned_stash: the name of the stash where pruned states are put
"""
def __init__(self, target_stmt, pruned_stash='pruned'):
super(DirectedSearch, self).__init__()
Expand All @@ -26,12 +30,23 @@ def __init__(self, target_stmt, pruned_stash='pruned'):
self.pruned_stash = pruned_stash

def setup(self, simgr):
"""
Setup the technique.
Args:
simgr: the simulation manager
"""
self._target_block = simgr.project.factory.block(self._target_stmt.block_id)

for s in simgr.states:
s.globals['directed_search_distance'] = None

def check_successors(self, simgr, successors):
"""
Calculate the successors that can reach the target block, otherwise prune them.
Args:
simgr: the simulation manager
successors: the successors to check
"""
new_successors = []
pruned_cnt = 0
for succ in successors:
Expand All @@ -48,9 +63,18 @@ def check_successors(self, simgr, successors):
log.debug(f"Pruned {pruned_cnt}/{len(successors)} successors")
return new_successors

# Check if the current state can reach 'block_b'
@staticmethod
def _is_reachable(state_a, block_b, factory, callgraph):
"""
Check if 'state_a' can reach 'block_b'
Args:
state_a: the state
block_b: the target block
factory: the factory
callgraph: the callgraph
Returns:
True and the distance if reachable, False and None otherwise
"""
block_a = factory.block(state_a.curr_stmt.block_id)
reachable, dist = DirectedSearch._is_reachable_without_returns(block_a, block_b, factory, callgraph)
if reachable:
Expand Down Expand Up @@ -89,9 +113,18 @@ def _is_reachable(state_a, block_b, factory, callgraph):
log.debug(f"[PRUNED] {state_a} -> {block_b} not reachable with returns")
return False, None

# Check if 'block_a' can reach 'block_b'
@staticmethod
def _is_reachable_without_returns(block_a, block_b, factory, callgraph):
"""
Check if 'block_a' can reach 'block_b' without using returns
Args:
block_a: the block
block_b: the target block
factory: the factory
callgraph: the callgraph
Returns:
True and the distance if reachable, False and None otherwise
"""
function_a = block_a.function
function_b = block_b.function
if block_a == block_b:
Expand Down
14 changes: 14 additions & 0 deletions greed/exploration_techniques/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class HeartBeat(ExplorationTechnique):
This Exploration technique implements a Classic heartbeat.
The heartbeat file will be logged during __init__.
Delete such file to stop the heartbeat and get an ipdb shell.
Args:
beat_interval: the number of steps between heartbeats
show_op: show the current operation during the heartbeat
"""
def __init__(self, beat_interval=100, show_op=False):
super(HeartBeat, self).__init__()
Expand All @@ -23,6 +26,12 @@ def __init__(self, beat_interval=100, show_op=False):
self.show_op = show_op

def check_successors(self, simgr, successors):
"""
Check if the heartbeat should be printed.
Args:
simgr: the simulation manager
successors: the successors to check
"""
self.beat_cnt += 1
self.steps_cnt += 1
if self.beat_cnt == self.beat_interval:
Expand All @@ -39,5 +48,10 @@ def check_successors(self, simgr, successors):
return successors

def change_beat(self, new_beat_interval):
"""
Change the beat interval.
Args:
new_beat_interval: the new beat interval.
"""
self.beat_interval = new_beat_interval
self.beat_cnt = 0
37 changes: 37 additions & 0 deletions greed/exploration_techniques/other.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@


class Whitelist(ExplorationTechnique):
"""
This technique skips all statements that are not in the whitelist until one of them is reached.
The result variables of the skipped statements are set to a fresh symbolic variable.
Args:
whitelist: the list of statements' names in the whitelist (e.g., ["MSTORE", "MLOAD"])
"""
def __init__(self, whitelist):
super().__init__()
self.whitelist = whitelist

def check_state(self, simgr, state):
"""
Check if the current statement is in the whitelist.
If not, skip to the next statement.
"""
while state.curr_stmt.__internal_name__ not in self.whitelist:
# stub res vars
for res_var in state.curr_stmt.res_vars:
Expand All @@ -25,15 +35,33 @@ def check_state(self, simgr, state):


class LoopLimiter(ExplorationTechnique):
"""
This technique limits the number of times a loop can be executed.
When the limit is reached, the state is halted.
Args:
n: the maximum number of times a loop can be executed
"""
def __init__(self, n):
super().__init__()
self.n = n

def setup(self, simgr):
"""
Setup the technique.
Args:
simgr: the simulation manager
"""
for state in simgr.states:
state.globals["loop_counter"] = defaultdict(int)

def check_state(self, simgr, state):
"""
Check if the loop has been executed more than n times.
If so, halt the state.
Args:
simgr: the simulation manager
state: the state to check
"""
state.globals["loop_counter"][state.pc] += 1
if state.globals["loop_counter"][state.pc] > self.n:
state.halt = True
Expand All @@ -42,6 +70,9 @@ def check_state(self, simgr, state):


class MstoreConcretizer(ExplorationTechnique):
"""
This technique concretizes the offset of MSTOREs.
"""
def __init__(self):
super().__init__()

Expand All @@ -60,6 +91,12 @@ def __init__(self):
self.relevant_mstores = None

def setup(self, simgr, _debug=False):
"""
Setup the technique.
Args:
simgr: the simulation manager
_debug: whether to print debug info
"""
self.debug = _debug

for s in simgr.states:
Expand Down
7 changes: 6 additions & 1 deletion greed/exploration_techniques/prioritizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

class Prioritizer(ExplorationTechnique):
"""
This Exploration technique implements a Classic Depth-First Search exploration
This Exploration technique implements a DFS with prioritization of states.
The prioritization is done by a scoring function that is applied to each state.
For instance, the scoring function can be the distance (in basic blocks) from a target statement.
Args:
scoring_function: the scoring function
deferred_stash: the name of the stash where deferred states are put
"""
def __init__(self, scoring_function, deferred_stash='deferred'):
super(Prioritizer, self).__init__()
Expand Down
13 changes: 13 additions & 0 deletions greed/exploration_techniques/simgrviz.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def check_successors(self, simgr, successors):

@staticmethod
def _get_state_hash(state):
"""
Calculate a unique hash for the state.
"""
h = hashlib.sha256()
h.update(str(state.pc).encode("utf-8"))
h.update(str(state.callstack).encode("utf-8"))
Expand All @@ -51,6 +54,9 @@ def _get_state_hash(state):
return str(h_hexdigest)

def _add_node(self, state):
"""
Add a node to the graph.
"""
state_id = self._get_state_hash(state)
if state_id in self._simgGraph:
return state_id
Expand All @@ -64,9 +70,16 @@ def _add_node(self, state):
return state_id

def _add_edge(self, new_state_id, parent_state_id):
"""
Add an edge to the graph.
"""
self._simgGraph.add_edge(new_state_id, parent_state_id)

def _dump_graph(self):
"""
Dump the graph to a .dot file.
The dumped graph is located in the /tmp/ directory and its name is printed in the log.
"""
output_filename = tempfile.NamedTemporaryFile(prefix="greed_simgrviz_", suffix=".dot", delete=False).name
log.info(f"Dumping simgrviz .dot output to file {output_filename}")

Expand Down

0 comments on commit c7d800f

Please sign in to comment.