Skip to content

Commit

Permalink
addr2line: reformat with black
Browse files Browse the repository at this point in the history
Reformat addr2line python code with black. The changes are mostly minor
and seem good.
  • Loading branch information
travisdowns committed Sep 18, 2024
1 parent 8e91537 commit 3132d3b
Show file tree
Hide file tree
Showing 2 changed files with 374 additions and 184 deletions.
102 changes: 69 additions & 33 deletions scripts/addr2line.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# special binary path/module indicating that the address is from the kernel
KERNEL_MODULE = '<kernel>'


class Addr2Line:

# Matcher for a line that appears at the end a single decoded
Expand All @@ -42,10 +43,10 @@ class Addr2Line:
# 0x0: ?? at /v/llvm/llvm/src/compiler-rt/lib/asan/asan_fake_stack.h:133
# so that's why we liberally accept .* as the part after "at" below
dummy_pattern = re.compile(
r"(.*0x0000000000000000: \?\? \?\?:0\n)" # addr2line pattern
r"(.*0x0000000000000000: \?\? \?\?:0\n)" # addr2line pattern
r"|"
r"(.*0x0: \?\? at .*\n)" # llvm-addr2line pattern
)
)

def __init__(self, binary, concise=False, cmd_path="addr2line"):
self._binary = binary
Expand All @@ -59,9 +60,19 @@ def __init__(self, binary, concise=False, cmd_path="addr2line"):
print('{}'.format(s))

options = f"-{'C' if not concise else ''}fpia"
self._input = subprocess.Popen([cmd_path, options, "-e", self._binary], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
self._input = subprocess.Popen(
[cmd_path, options, "-e", self._binary],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
)
if concise:
self._output = subprocess.Popen(["c++filt", "-p"], stdin=self._input.stdout, stdout=subprocess.PIPE, universal_newlines=True)
self._output = subprocess.Popen(
["c++filt", "-p"],
stdin=self._input.stdout,
stdout=subprocess.PIPE,
universal_newlines=True,
)
else:
self._output = self._input

Expand Down Expand Up @@ -93,13 +104,14 @@ def __call__(self, address):
self._input.stdin.flush()
return self._read_resolved_address()


class KernelResolver:
"""A resolver for kernel addresses which tries to read from /proc/kallsyms."""

LAST_SYMBOL_MAX_SIZE = 1024

def __init__(self, kallsyms='/proc/kallsyms'):
syms : list[tuple[int, str]] = []
syms: list[tuple[int, str]] = []
ksym_re = re.compile(r'(?P<addr>[0-9a-f]+) (?P<type>.+) (?P<name>\S+)')
warnings_left = 10

Expand All @@ -116,8 +128,11 @@ def __init__(self, kallsyms='/proc/kallsyms'):
for line in f:
m = ksym_re.match(line)
if not m:
if warnings_left > 0: # don't spam too much
print(f'WARNING: {kallsyms} regex match failure: {line.strip()}', file=sys.stdout)
if warnings_left > 0: # don't spam too much
print(
f'WARNING: {kallsyms} regex match failure: {line.strip()}',
file=sys.stdout,
)
warnings_left -= 1
else:
syms.append((int(m.group('addr'), 16), m.group('name')))
Expand All @@ -140,10 +155,9 @@ def __init__(self, kallsyms='/proc/kallsyms'):
return

# split because bisect can't take a key func before 3.10
self.sym_addrs : tuple[int]
self.sym_names : tuple[str]
self.sym_addrs, self.sym_names = zip(*syms) # type: ignore

self.sym_addrs: tuple[int]
self.sym_names: tuple[str]
self.sym_addrs, self.sym_names = zip(*syms) # type: ignore

def __call__(self, addrstr):
if self.error:
Expand Down Expand Up @@ -187,22 +201,30 @@ def __init__(self):
token = fr"(?:{path}\+)?{addr}"
full_addr_match = fr"(?:(?P<path>{path})\s*\+\s*)?(?P<addr>{addr})"
ignore_addr_match = fr"(?:(?P<path>{path})\s*\+\s*)?(?:{addr})"
self.oneline_re = re.compile(fr"^((?:.*(?:(?:at|backtrace):?|:))?(?:\s+))?({token}(?:\s+{token})*)(?:\).*|\s*)$", flags=re.IGNORECASE)
self.oneline_re = re.compile(
fr"^((?:.*(?:(?:at|backtrace):?|:))?(?:\s+))?({token}(?:\s+{token})*)(?:\).*|\s*)$",
flags=re.IGNORECASE,
)
self.address_re = re.compile(full_addr_match, flags=re.IGNORECASE)
self.syslog_re = re.compile(fr"^(?:#\d+\s+)(?P<addr>{addr})(?:.*\s+)\({ignore_addr_match}\)\s*$", flags=re.IGNORECASE)
self.syslog_re = re.compile(
fr"^(?:#\d+\s+)(?P<addr>{addr})(?:.*\s+)\({ignore_addr_match}\)\s*$",
flags=re.IGNORECASE,
)
self.kernel_re = re.compile(fr'^.*kernel callstack: (?P<addrs>(?:{addr}\s*)+)$')
self.asan_re = re.compile(fr"^(?:.*\s+)\({full_addr_match}\)(\s+\(BuildId: [0-9a-fA-F]+\))?$", flags=re.IGNORECASE)
self.asan_re = re.compile(
fr"^(?:.*\s+)\({full_addr_match}\)(\s+\(BuildId: [0-9a-fA-F]+\))?$",
flags=re.IGNORECASE,
)
self.asan_ignore_re = re.compile(f"^=.*$", flags=re.IGNORECASE)
self.generic_re = re.compile(fr"^(?:.*\s+){full_addr_match}\s*$", flags=re.IGNORECASE)
self.separator_re = re.compile(r'^\W*-+\W*$')


def split_addresses(self, addrstring: str, default_path=None):
addresses : list[dict[str, Any]] = []
addresses: list[dict[str, Any]] = []
for obj in addrstring.split():
m = re.match(self.address_re, obj)
assert m, f'addr did not match address regex: {obj}'
#print(f" >>> '{obj}': address {m.groups()}")
# print(f" >>> '{obj}': address {m.groups()}")
addresses.append({'path': m.group(1) or default_path, 'addr': m.group(2)})
return addresses

Expand All @@ -220,42 +242,42 @@ def get_prefix(s):
return {
'type': self.Type.ADDRESS,
'prefix': 'kernel callstack: ',
'addresses' : self.split_addresses(m.group('addrs'), KERNEL_MODULE)
'addresses': self.split_addresses(m.group('addrs'), KERNEL_MODULE),
}

m = re.match(self.oneline_re, line)
if m:
#print(f">>> '{line}': oneline {m.groups()}")
# print(f">>> '{line}': oneline {m.groups()}")
return {
'type': self.Type.ADDRESS,
'prefix': get_prefix(m.group(1)),
'addresses': self.split_addresses(m.group(2))
'addresses': self.split_addresses(m.group(2)),
}

m = re.match(self.syslog_re, line)
if m:
#print(f">>> '{line}': syslog {m.groups()}")
# print(f">>> '{line}': syslog {m.groups()}")
ret = {'type': self.Type.ADDRESS}
ret['prefix'] = None
ret['addresses'] = [{'path': m.group('path'), 'addr': m.group('addr')}]
return ret

m = re.match(self.asan_ignore_re, line)
if m:
#print(f">>> '{line}': asan ignore")
# print(f">>> '{line}': asan ignore")
return None

m = re.match(self.asan_re, line)
if m:
#print(f">>> '{line}': asan {m.groups()}")
# print(f">>> '{line}': asan {m.groups()}")
ret = {'type': self.Type.ADDRESS}
ret['prefix'] = None
ret['addresses'] = [{'path': m.group('path'), 'addr': m.group('addr')}]
return ret

m = re.match(self.generic_re, line)
if m:
#print(f">>> '{line}': generic {m.groups()}")
# print(f">>> '{line}': generic {m.groups()}")
ret = {'type': self.Type.ADDRESS}
ret['prefix'] = None
ret['addresses'] = [{'path': m.group('path'), 'addr': m.group('addr')}]
Expand All @@ -265,10 +287,19 @@ def get_prefix(s):
if match:
return {'type': self.Type.SEPARATOR}

#print(f">>> '{line}': None")
# print(f">>> '{line}': None")
return None

def __init__(self, executable, kallsyms='/proc/kallsyms', before_lines=1, context_re='', verbose=False, concise=False, cmd_path='addr2line'):
def __init__(
self,
executable,
kallsyms='/proc/kallsyms',
before_lines=1,
context_re='',
verbose=False,
concise=False,
cmd_path='addr2line',
):
self._executable = executable
self._kallsyms = kallsyms
self._current_backtrace = []
Expand All @@ -285,7 +316,9 @@ def __init__(self, executable, kallsyms='/proc/kallsyms', before_lines=1, contex
self._concise = concise
self._cmd_path = cmd_path
self._known_modules = {}
self._get_resolver_for_module(self._executable) # fail fast if there is something wrong with the exe resolver
self._get_resolver_for_module(
self._executable
) # fail fast if there is something wrong with the exe resolver
self.parser = self.BacktraceParser()

def _get_resolver_for_module(self, module):
Expand Down Expand Up @@ -346,8 +379,12 @@ def _print_current_backtrace(self):

backtrace = "".join(map(str, self._current_backtrace))
if backtrace in self._known_backtraces:
print("[Backtrace #{}] Already seen, not resolving again.".format(self._known_backtraces[backtrace]))
print("") # To separate traces with an empty line
print(
"[Backtrace #{}] Already seen, not resolving again.".format(
self._known_backtraces[backtrace]
)
)
print("") # To separate traces with an empty line
self._current_backtrace = []
return

Expand All @@ -358,7 +395,7 @@ def _print_current_backtrace(self):
for module, addr in self._current_backtrace:
self._print_resolved_address(module, addr)

print("") # To separate traces with an empty line
print("") # To separate traces with an empty line

self._current_backtrace = []
self._i += 1
Expand All @@ -371,9 +408,9 @@ def __call__(self, line):
if self._before_lines > 0:
self._before_lines_queue.append(line)
elif self._before_lines < 0:
sys.stdout.write(line) # line already has a trailing newline
sys.stdout.write(line) # line already has a trailing newline
else:
pass # when == 0 no non-backtrace lines are printed
pass # when == 0 no non-backtrace lines are printed
elif res['type'] == self.BacktraceParser.Type.SEPARATOR:
pass
elif res['type'] == self.BacktraceParser.Type.ADDRESS:
Expand All @@ -392,4 +429,3 @@ def __call__(self, line):
else:
print(f"Unknown '{line}': {res}")
raise RuntimeError("Unknown result type {res}")

Loading

0 comments on commit 3132d3b

Please sign in to comment.