-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add generic dataflow framework + example + stack value analysis #118
base: dev
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import List, Any, TypeVar, Generic, TYPE_CHECKING | ||
|
||
from tealer.teal.basic_blocks import BasicBlock | ||
|
||
AbstractValues = TypeVar("AbstractValues") | ||
|
||
if TYPE_CHECKING: | ||
from tealer.teal.teal import Teal | ||
|
||
|
||
class AbstractDataflow(ABC, Generic[AbstractValues]): | ||
def __init__(self, teal: "Teal"): | ||
self.teal = teal | ||
|
||
@abstractmethod | ||
def _merge_predecessor(self, bb: BasicBlock) -> AbstractValues: | ||
pass | ||
|
||
@abstractmethod | ||
def _is_fix_point(self, bb: BasicBlock, values: AbstractValues) -> bool: | ||
pass | ||
|
||
@abstractmethod | ||
def _transfer_function(self, bb: BasicBlock) -> AbstractValues: | ||
pass | ||
|
||
@abstractmethod | ||
def _store_values_in(self, bb: BasicBlock, values: AbstractValues) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def _store_values_out(self, bb: BasicBlock, values: AbstractValues) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def _filter_successors(self, bb: BasicBlock) -> List[BasicBlock]: | ||
pass | ||
|
||
@abstractmethod | ||
def result(self) -> Any: | ||
pass | ||
|
||
def explore(self, bb: BasicBlock, is_entry_node: bool = False) -> None: | ||
|
||
values = self._merge_predecessor(bb) | ||
|
||
if not is_entry_node and self._is_fix_point(bb, values): | ||
return | ||
|
||
self._store_values_in(bb, values) | ||
values = self._transfer_function(bb) | ||
self._store_values_out(bb, values) | ||
|
||
successors = self._filter_successors(bb) | ||
for successor in successors: | ||
self.explore(successor) | ||
|
||
def run_analysis(self) -> None: | ||
self.explore(self.teal.bbs[0], is_entry_node=True) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from collections import defaultdict | ||
from typing import Union, Set, Any, List, Dict, TYPE_CHECKING | ||
|
||
from tealer.analyses.dataflow.abstract import AbstractDataflow | ||
from tealer.teal.basic_blocks import BasicBlock | ||
from tealer.teal.instructions.instructions import Instruction | ||
|
||
if TYPE_CHECKING: | ||
from tealer.teal.teal import Teal | ||
|
||
MAX_ELEMS = 35 | ||
|
||
|
||
class InstructionSet: | ||
def __init__(self, values: Union[str, Instruction, Set[Instruction]]) -> None: | ||
|
||
if isinstance(values, str): | ||
assert values in ["TOP", "BOTTOM"] | ||
|
||
if isinstance(values, set) and len(values) > MAX_ELEMS: | ||
values = "TOP" | ||
if isinstance(values, Instruction): | ||
values = {values} | ||
|
||
self.values: Union[str, Set[Instruction]] = values | ||
|
||
@property | ||
def is_top(self) -> bool: | ||
return isinstance(self.values, str) and self.values == "TOP" | ||
|
||
@property | ||
def is_bottom(self) -> bool: | ||
return isinstance(self.values, str) and self.values == "BOTTOM" | ||
|
||
def union(self, instructions: "InstructionSet") -> "InstructionSet": | ||
v0 = self.values | ||
v1 = instructions.values | ||
if v0 == "TOP" or v1 == "TOP": | ||
return InstructionSet("TOP") | ||
|
||
if v0 == "BOTTOM": | ||
return InstructionSet(v1) | ||
|
||
if v1 == "BOTTOM": | ||
return InstructionSet(v0) | ||
|
||
assert isinstance(v0, set) | ||
assert isinstance(v1, set) | ||
|
||
return InstructionSet(v0.union(v1)) | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
if isinstance(other, InstructionSet): | ||
if isinstance(self.values, str) and isinstance(other.values, str): | ||
return self.values == other.values | ||
if isinstance(self.values, set) and isinstance(other.values, set): | ||
return self.values == other.values | ||
return False | ||
|
||
def __str__(self) -> str: | ||
if isinstance(self.values, str): | ||
return self.values | ||
return "[" + ",".join([str(x) for x in self.values]) + "]" | ||
|
||
|
||
class CollectInstructions(AbstractDataflow[InstructionSet]): | ||
def __init__(self, teal: "Teal") -> None: | ||
super().__init__(teal) | ||
self.bb_in: Dict[BasicBlock, InstructionSet] = defaultdict(lambda: InstructionSet("BOTTOM")) | ||
self.bb_out: Dict[BasicBlock, InstructionSet] = defaultdict( | ||
lambda: InstructionSet("BOTTOM") | ||
) | ||
|
||
def _merge_predecessor(self, bb: BasicBlock) -> InstructionSet: | ||
s = InstructionSet("BOTTOM") | ||
for bb_prev in bb.prev: | ||
s = s.union(self.bb_out[bb_prev]) | ||
|
||
return s | ||
|
||
def _is_fix_point(self, bb: BasicBlock, values: InstructionSet) -> bool: | ||
return self.bb_in[bb] == values | ||
|
||
def _transfer_function(self, bb: BasicBlock) -> InstructionSet: | ||
bb_out = self.bb_in[bb] | ||
|
||
for ins in bb.instructions: | ||
bb_out = bb_out.union(InstructionSet(ins)) | ||
|
||
return bb_out | ||
|
||
def _store_values_in(self, bb: BasicBlock, values: InstructionSet) -> None: | ||
self.bb_in[bb] = values | ||
|
||
def _store_values_out(self, bb: BasicBlock, values: InstructionSet) -> None: | ||
self.bb_out[bb] = values | ||
|
||
def _filter_successors(self, bb: BasicBlock) -> List[BasicBlock]: | ||
return bb.next | ||
|
||
def result(self) -> Dict[BasicBlock, InstructionSet]: | ||
return self.bb_out |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
from collections import defaultdict | ||
from typing import Union, Set, Any, List, Dict, TYPE_CHECKING, Type, Callable, Tuple | ||
|
||
from tealer.analyses.dataflow.abstract import AbstractDataflow | ||
from tealer.teal.basic_blocks import BasicBlock | ||
from tealer.teal.global_field import GlobalField | ||
from tealer.teal.instructions import instructions | ||
from tealer.teal.instructions.instructions import Instruction | ||
from tealer.teal.instructions.transaction_field import TransactionField | ||
|
||
if TYPE_CHECKING: | ||
from tealer.teal.teal import Teal | ||
|
||
MAX_ELEMS_PER_STACK_VALUE = 35 | ||
MAX_STACK_DEPTH = 100 | ||
|
||
|
||
# pylint: disable=too-few-public-methods | ||
class TOP: | ||
def __eq__(self, other: Any) -> bool: | ||
return isinstance(other, TOP) | ||
|
||
def __str__(self) -> str: | ||
return "TOP" | ||
|
||
|
||
# pylint: disable=too-few-public-methods | ||
class BOTTOM: | ||
def __eq__(self, other: Any) -> bool: | ||
return isinstance(other, BOTTOM) | ||
|
||
def __str__(self) -> str: | ||
return "BOTTOM" | ||
|
||
|
||
VALUES_TRACKED = Union[Set[Union[GlobalField, TransactionField, int, str]], TOP, BOTTOM] | ||
|
||
|
||
class StackValue: | ||
""" | ||
StackValue represent an abstract value on the stack | ||
It can be either a set of int/str/fields, or TOP/BOTTOM | ||
The set's size is limited to MAX_ELEMS_PER_STACK_VALUE, if above, it becomes TOP | ||
|
||
""" | ||
|
||
def __init__(self, values: VALUES_TRACKED): | ||
if isinstance(values, set) and len(values) > MAX_ELEMS_PER_STACK_VALUE: | ||
values = TOP() | ||
self.values = values | ||
|
||
def union(self, other_stack_value: "StackValue") -> "StackValue": | ||
self_values = self.values | ||
other_values = other_stack_value.values | ||
if isinstance(self_values, TOP) or isinstance(other_values, TOP): | ||
return StackValue(TOP()) | ||
if isinstance(self_values, BOTTOM): | ||
return StackValue(other_values) | ||
if isinstance(other_values, BOTTOM): | ||
return StackValue(self_values) | ||
assert isinstance(self_values, set) | ||
assert isinstance(other_values, set) | ||
return StackValue(self_values | other_values) | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
if isinstance(other, StackValue): | ||
return self.values == other.values | ||
return False | ||
|
||
def __str__(self) -> str: | ||
values = self.values | ||
if isinstance(values, (TOP, BOTTOM)): | ||
return str(values) | ||
assert isinstance(values, set) | ||
return str({str(x) for x in values}) | ||
|
||
|
||
class Stack: | ||
""" | ||
Represent an abstract stack | ||
The length is limited by MAX_STACK_DEPTH | ||
self.values contains the abstract element | ||
|
||
If there is two paths merged, where one path push 1 (concrete stack [1]) | ||
and the other push 3; (concrete stack [3]) | ||
then the abstract stack is | ||
[ [1;3] ]x | ||
Ie: the top can either be 1 or 3 | ||
|
||
If we pop beyond the known values, we return TOP(). | ||
As a result, if the most left elements are TOP in the stack, we can stop tracking them | ||
|
||
If there is two paths merged, and the stack size has a different size, then | ||
we use TOP for the elements in the difference. Ex: | ||
- one path push 1; push 2; (concrete stack [2;1]) | ||
- and the other push 3; (concrete stack [3]) | ||
- then the abstract stack is | ||
- [ [TOP] ; [1;3] ] | ||
Ie: the top can either be 1 or 3, and the second one is TOP | ||
Because the most left element of the stack can be removed, this can be simplied as | ||
[ [1;3] ] | ||
|
||
|
||
|
||
""" | ||
|
||
def __init__(self, values: List[StackValue]) -> None: | ||
if len(values) > MAX_STACK_DEPTH: | ||
values = values[:-MAX_STACK_DEPTH] | ||
|
||
while values and values[0] == StackValue(TOP()): | ||
values.pop() | ||
|
||
self.values = values | ||
|
||
def union(self, stack: "Stack") -> "Stack": | ||
|
||
v1 = self.values | ||
v2 = stack.values | ||
|
||
min_length = min(len(v1), len(v2)) | ||
v1 = v1[:-min_length] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this lines be v1 = v1[-min_length:]
v2 = v2[-min_length:] |
||
v2 = v2[:-min_length] | ||
|
||
v3 = [] | ||
for i in range(min_length): | ||
v3.append(v1[i].union(v2[i])) | ||
|
||
return Stack(v3) | ||
|
||
def pop(self, number: int) -> Tuple["Stack", List[StackValue]]: | ||
if number == 0: | ||
return Stack(self.values), [] | ||
if len(self.values) < number: | ||
diff = number - len(self.values) | ||
poped_values = list(self.values) | ||
return Stack([]), [StackValue(TOP()) for _ in range(diff)] + poped_values | ||
|
||
poped_values = self.values[:-number] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this line be |
||
return Stack(self.values[: len(self.values) - number]), poped_values | ||
|
||
def push(self, pushed_values: List[StackValue]) -> "Stack": | ||
return Stack(self.values + pushed_values) | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
if isinstance(other, Stack): | ||
return self.values == other.values | ||
return False | ||
|
||
def __str__(self) -> str: | ||
return str([str(x) for x in self.values]) | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def handle_int(ins: Instruction, stack: Stack) -> List[StackValue]: | ||
assert isinstance(ins, instructions.Int) | ||
return [StackValue({ins.value})] | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def handle_pushint(ins: Instruction, stack: Stack) -> List[StackValue]: | ||
assert isinstance(ins, instructions.PushInt) | ||
return [StackValue({ins.value})] | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def handle_txn(ins: Instruction, stack: Stack) -> List[StackValue]: | ||
assert isinstance(ins, instructions.Txn) | ||
return [StackValue({ins.field})] | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def handle_global(ins: Instruction, stack: Stack) -> List[StackValue]: | ||
assert isinstance(ins, instructions.Global) | ||
return [StackValue({ins.field})] | ||
|
||
|
||
special_handling: Dict[Type[Instruction], Callable[[Instruction, Stack], List[StackValue]]] = { | ||
instructions.Int: handle_int, | ||
instructions.PushInt: handle_pushint, | ||
instructions.Txn: handle_txn, | ||
instructions.Global: handle_global, | ||
} | ||
|
||
|
||
class StackValueAnalysis(AbstractDataflow[Stack]): | ||
def __init__(self, teal: "Teal") -> None: | ||
super().__init__(teal) | ||
self.bb_in: Dict[BasicBlock, Stack] = defaultdict(lambda: Stack([StackValue(BOTTOM())])) | ||
self.bb_out: Dict[BasicBlock, Stack] = defaultdict(lambda: Stack([])) | ||
|
||
self.ins_in: Dict[Instruction, Stack] = defaultdict(lambda: Stack([])) | ||
self.ins_out: Dict[Instruction, Stack] = defaultdict(lambda: Stack([])) | ||
|
||
def _merge_predecessor(self, bb: BasicBlock) -> Stack: | ||
s = Stack([]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stack union depends on the length of inputs. def union(self, stack: "Stack") -> "Stack":
v1 = self.values
v2 = stack.values
min_length = min(len(v1), len(v2))
v1 = v1[:-min_length]
v2 = v2[:-min_length]
v3 = []
for i in range(min_length):
v3.append(v1[i].union(v2[i]))
return Stack(v3) Because initial if not bb.prev: # zero predecessors
return Stack([])
s = self.bb_out[bb.prev[0]]
for bb_prev in bb.prev[1:]:
s = s.union(self.bb_out[bb_prev])
return s |
||
for bb_prev in bb.prev: | ||
s = s.union(self.bb_out[bb_prev]) | ||
return s | ||
|
||
def _is_fix_point(self, bb: BasicBlock, values: Stack) -> bool: | ||
return self.bb_in[bb] == values | ||
|
||
def _transfer_function(self, bb: BasicBlock) -> Stack: | ||
bb_out = self.bb_in[bb] | ||
|
||
for ins in bb.instructions: | ||
self.ins_in[ins] = Stack(bb_out.values) | ||
if type(ins) in special_handling: | ||
pushed_elems = special_handling[type(ins)](ins, bb_out) | ||
else: | ||
pushed_elems = [StackValue(TOP()) for _ in range(ins.stack_push_size)] | ||
bb_out, _ = bb_out.pop(ins.stack_pop_size) | ||
bb_out = bb_out.push(pushed_elems) | ||
self.ins_out[ins] = Stack(bb_out.values) | ||
|
||
return bb_out | ||
|
||
def _store_values_in(self, bb: BasicBlock, values: Stack) -> None: | ||
self.bb_in[bb] = values | ||
|
||
def _store_values_out(self, bb: BasicBlock, values: Stack) -> None: | ||
self.bb_out[bb] = values | ||
|
||
def _filter_successors(self, bb: BasicBlock) -> List[BasicBlock]: | ||
return bb.next | ||
|
||
def result(self) -> Dict[BasicBlock, Stack]: | ||
return self.bb_out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this supposed to be
Or
I'm guessing it's the first one (?)