Skip to content

Commit

Permalink
Merge pull request #568 from mvt-project/feature/tombstone-parser
Browse files Browse the repository at this point in the history
Add parser for Android tombstone files
  • Loading branch information
DonnchaC authored Feb 6, 2025
2 parents 6da3339 + a2dabb4 commit e5865b1
Show file tree
Hide file tree
Showing 13 changed files with 1,806 additions and 1 deletion.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ install:
test-requirements:
python3 -m pip install --upgrade -r test-requirements.txt

generate-proto-parsers:
# Generate python parsers for protobuf files
PROTO_FILES=$$(find src/mvt/android/parsers/proto/ -iname "*.proto"); \
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto_out=src/mvt/android/parsers/proto/ $$PROTO_FILES

clean:
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/src/mvt.egg-info

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"cryptography >=42.0.5",
"pyyaml >=6.0",
"pyahocorasick >= 2.0.0",
"betterproto >=1.2.0",
"pydantic >= 2.10.0",
"pydantic-settings >= 2.7.0",
]
Expand Down
269 changes: 269 additions & 0 deletions src/mvt/android/artifacts/tombstone_crashes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

import datetime
from typing import List, Optional, Union

import pydantic
import betterproto

from mvt.common.utils import convert_datetime_to_iso
from mvt.android.parsers.proto.tombstone import Tombstone
from .artifact import AndroidArtifact


TOMBSTONE_DELIMITER = "*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***"

# Map the legacy crash file keys to the new format.
TOMBSTONE_TEXT_KEY_MAPPINGS = {
"Build fingerprint": "build_fingerprint",
"Revision": "revision",
"ABI": "arch",
"Timestamp": "timestamp",
"Process uptime": "process_uptime",
"Cmdline": "command_line",
"pid": "pid",
"tid": "tid",
"name": "process_name",
"binary_path": "binary_path",
"uid": "uid",
"signal": "signal_info",
"code": "code",
"Cause": "cause",
}


class SignalInfo(pydantic.BaseModel):
code: int
code_name: str
name: str
number: Optional[int] = None


class TombstoneCrashResult(pydantic.BaseModel):
"""
MVT Result model for a tombstone crash result.
Needed for validation and serialization, and consistency between text and protobuf tombstones.
"""

file_name: str
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
build_fingerprint: str
revision: int
arch: Optional[str] = None
timestamp: str # We store the timestamp as a string to avoid timezone issues
process_uptime: Optional[int] = None
command_line: Optional[List[str]] = None
pid: int
tid: int
process_name: Optional[str] = None
binary_path: Optional[str] = None
selinux_label: Optional[str] = None
uid: Optional[int] = None
signal_info: SignalInfo
cause: Optional[str] = None
extra: Optional[str] = None


class TombstoneCrashArtifact(AndroidArtifact):
""" "
Parser for Android tombstone crash files.
This parser can parse both text and protobuf tombstone crash files.
"""

def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["timestamp"],
"module": self.__class__.__name__,
"event": "Tombstone",
"data": (
f"Crash in '{record['process_name']}' process running as UID '{record['uid']}' at "
f"{record['timestamp']}. Crash type '{record['signal_info']['name']}' with code '{record['signal_info']['code_name']}'"
),
}

def check_indicators(self) -> None:
if not self.indicators:
return

for result in self.results:
ioc = self.indicators.check_process(result["process_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue

if result.get("command_line", []):
command_name = result.get("command_line")[0].split("/")[-1]
ioc = self.indicators.check_process(command_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue

SUSPICIOUS_UIDS = [
0, # root
1000, # system
2000, # shell
]
if result["uid"] in SUSPICIOUS_UIDS:
self.log.warning(
f"Potentially suspicious crash in process '{result['process_name']}' "
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
)
self.detected.append(result)

def parse_protobuf(
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
) -> None:
"""
Parse Android tombstone crash files from a protobuf object.
"""
tombstone_pb = Tombstone().parse(data)
tombstone_dict = tombstone_pb.to_dict(betterproto.Casing.SNAKE)

# Add some extra metadata
tombstone_dict["timestamp"] = self._parse_timestamp_string(
tombstone_pb.timestamp
)
tombstone_dict["file_name"] = file_name
tombstone_dict["file_timestamp"] = convert_datetime_to_iso(file_timestamp)
tombstone_dict["process_name"] = self._proccess_name_from_thread(tombstone_dict)

# Confirm the tombstone is valid, and matches the output model
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
self.results.append(tombstone.model_dump())

def parse(
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
) -> None:
"""
Parse text Android tombstone crash files.
"""

# Split the tombstone file into a dictonary
tombstone_dict = {
"file_name": file_name,
"file_timestamp": convert_datetime_to_iso(file_timestamp),
}
lines = content.decode("utf-8").splitlines()
for line in lines:
if not line.strip() or TOMBSTONE_DELIMITER in line:
continue
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
self._parse_tombstone_line(line, key, destination_key, tombstone_dict)

# Validate the tombstone and add it to the results
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
self.results.append(tombstone.model_dump())

def _parse_tombstone_line(
self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool:
if not line.startswith(f"{key}"):
return None

if key == "pid":
return self._load_pid_line(line, tombstone)
elif key == "signal":
return self._load_signal_line(line, tombstone)
elif key == "Timestamp":
return self._load_timestamp_line(line, tombstone)
else:
return self._load_key_value_line(line, key, destination_key, tombstone)

def _load_key_value_line(
self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool:
line_key, value = line.split(":", 1)
if line_key != key:
raise ValueError(f"Expected key {key}, got {line_key}")

value_clean = value.strip().strip("'")
if destination_key in ["uid", "revision"]:
tombstone[destination_key] = int(value_clean)
elif destination_key == "process_uptime":
# eg. "Process uptime: 40s"
tombstone[destination_key] = int(value_clean.rstrip("s"))
elif destination_key == "command_line":
# XXX: Check if command line should be a single string in a list, or a list of strings.
tombstone[destination_key] = [value_clean]
else:
tombstone[destination_key] = value_clean
return True

def _load_pid_line(self, line: str, tombstone: dict) -> bool:
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]

pid_key, pid_value = pid_part.split(":", 1)
if pid_key != "pid":
raise ValueError(f"Expected key pid, got {pid_key}")
pid_value = int(pid_value.strip())

tid_key, tid_value = tid_part.split(":", 1)
if tid_key != "tid":
raise ValueError(f"Expected key tid, got {tid_key}")
tid_value = int(tid_value.strip())

name_key, name_value = name_part.split(":", 1)
if name_key != "name":
raise ValueError(f"Expected key name, got {name_key}")
name_value = name_value.strip()
process_name, binary_path = self._parse_process_name(name_value, tombstone)

tombstone["pid"] = pid_value
tombstone["tid"] = tid_value
tombstone["process_name"] = process_name
tombstone["binary_path"] = binary_path
return True

def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
process_name, process_path = process_name_part.split(">>>")
process_name = process_name.strip()
binary_path = process_path.strip().split(" ")[0]
return process_name, binary_path

def _load_signal_line(self, line: str, tombstone: dict) -> bool:
signal, code, _ = [part.strip() for part in line.split(",", 2)]
signal = signal.split("signal ")[1]
signal_code, signal_name = signal.split(" ")
signal_name = signal_name.strip("()")

code_part = code.split("code ")[1]
code_number, code_name = code_part.split(" ")
code_name = code_name.strip("()")

tombstone["signal_info"] = {
"code": int(code_number),
"code_name": code_name,
"name": signal_name,
"number": int(signal_code),
}
return True

def _load_timestamp_line(self, line: str, tombstone: dict) -> bool:
timestamp = line.split(":", 1)[1].strip()
tombstone["timestamp"] = self._parse_timestamp_string(timestamp)
return True

@staticmethod
def _parse_timestamp_string(timestamp: str) -> str:
timestamp_date, timezone = timestamp.split("+")
# Truncate microseconds before parsing
timestamp_without_micro = timestamp_date.split(".")[0] + "+" + timezone
timestamp_parsed = datetime.datetime.strptime(
timestamp_without_micro, "%Y-%m-%d %H:%M:%S%z"
)
return convert_datetime_to_iso(timestamp_parsed)

@staticmethod
def _proccess_name_from_thread(tombstone_dict: dict) -> str:
if tombstone_dict.get("threads"):
for thread in tombstone_dict["threads"].values():
if thread.get("id") == tombstone_dict["tid"] and thread.get("name"):
return thread["name"]
return "Unknown"
2 changes: 2 additions & 0 deletions src/mvt/android/modules/bugreport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .platform_compat import PlatformCompat
from .receivers import Receivers
from .adb_state import DumpsysADBState
from .tombstones import Tombstones

BUGREPORT_MODULES = [
Accessibility,
Expand All @@ -27,4 +28,5 @@
PlatformCompat,
Receivers,
DumpsysADBState,
Tombstones,
]
10 changes: 9 additions & 1 deletion src/mvt/android/modules/bugreport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2021-2023 The MVT Authors.
# See the file 'LICENSE' for usage and copying permissions, or find a copy at
# https://github.com/mvt-project/mvt/blob/main/LICENSE

import datetime
import fnmatch
import logging
import os
Expand Down Expand Up @@ -91,3 +91,11 @@ def _get_dumpstate_file(self) -> bytes:
return None

return self._get_file_content(dumpstate_logs[0])

def _get_file_modification_time(self, file_path: str) -> dict:
if self.zip_archive:
file_timetuple = self.zip_archive.getinfo(file_path).date_time
return datetime.datetime(*file_timetuple)
else:
file_stat = os.stat(os.path.join(self.extract_path, file_path))
return datetime.datetime.fromtimestamp(file_stat.st_mtime)
64 changes: 64 additions & 0 deletions src/mvt/android/modules/bugreport/tombstones.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

import logging
from typing import Optional

from mvt.android.artifacts.tombstone_crashes import TombstoneCrashArtifact
from .base import BugReportModule


class Tombstones(TombstoneCrashArtifact, BugReportModule):
"""This module extracts records from battery daily updates."""

slug = "tombstones"

def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)

def run(self) -> None:
tombstone_files = self._get_files_by_pattern("*/tombstone_*")
if not tombstone_files:
self.log.error(
"Unable to find any tombstone files. "
"Did you provide a valid bugreport archive?"
)
return

for tombstone_file in sorted(tombstone_files):
tombstone_filename = tombstone_file.split("/")[-1]
modification_time = self._get_file_modification_time(tombstone_file)
tombstone_data = self._get_file_content(tombstone_file)

try:
if tombstone_file.endswith(".pb"):
self.parse_protobuf(
tombstone_filename, modification_time, tombstone_data
)
else:
self.parse(tombstone_filename, modification_time, tombstone_data)
except ValueError as e:
# Catch any exceptions raised during parsing or validation.
self.log.error(f"Error parsing tombstone file {tombstone_file}: {e}")

self.log.info(
"Extracted a total of %d tombstone files",
len(self.results),
)
Empty file.
Loading

0 comments on commit e5865b1

Please sign in to comment.