diff --git a/doc/api.rst b/doc/api.rst index d1dc6c8e..53b4da7b 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -105,3 +105,10 @@ drgn_tools.rds .. automodule:: drgn_tools.rds :members: + + +drgn_tools.cpuinfo +----------------------- + +.. automodule:: drgn_tools.cpuinfo + :members: diff --git a/drgn_tools/corelens.py b/drgn_tools/corelens.py index b6030522..94bbd760 100644 --- a/drgn_tools/corelens.py +++ b/drgn_tools/corelens.py @@ -184,6 +184,7 @@ def all_corelens_modules() -> Dict[str, CorelensModule]: "drgn_tools.block", "drgn_tools.md", "drgn_tools.rds", + "drgn_tools.cpuinfo", ] for python_module in python_mods: importlib.import_module(python_module) diff --git a/drgn_tools/cpuinfo.py b/drgn_tools/cpuinfo.py new file mode 100644 index 00000000..96972908 --- /dev/null +++ b/drgn_tools/cpuinfo.py @@ -0,0 +1,610 @@ +# Copyright (c) 2023, Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +""" +Helper to view cpuinfo data +""" +import argparse +from typing import Any +from typing import Dict + +from drgn import Object +from drgn import Program +from drgn.helpers.linux.percpu import per_cpu + +from drgn_tools.corelens import CorelensModule +from drgn_tools.table import print_dictionary + + +def get_cpu_info(prog: Program) -> Dict[str, Any]: + """ + Helper to get cpuinfo data + + :returns: a dictionary of the cpuinfo data + """ + # TODO: Add aarch64 support. + # `struct cpuinfo_arm64` being fetched using 'boot_cpu_data' only has + # register values, while the struct fetched using 'cpu_data' has all values + # zero/null + + cpus = int(prog["nr_cpu_ids"]) + + if "cpu_data" in prog: + cpuinfo_struct = prog["cpu_data"] + elif "boot_cpu_data" in prog: + cpuinfo_struct = prog["boot_cpu_data"] + else: + raise Exception( + "Failed to load CPU info: no cpuinfo struct found (tried 'cpu_data' and 'boot_cpu_data')" + ) + + cpu_vendor = cpuinfo_struct.x86_vendor_id.string_().decode("utf-8") + model_name = cpuinfo_struct.x86_model_id.string_().decode("utf-8") + cpu_family = int(cpuinfo_struct.x86) + cpus_numa0 = "0-" + str(cpus - 1) + microcode = hex(cpuinfo_struct.microcode) + cstates = int(prog["max_cstate"]) + + cpu_caps_bugs = cpuinfo_struct.x86_capability # Contains cpu and bug flags + + nbugints = 1 + ncapints = len(cpu_caps_bugs) - nbugints + + cpu_flags = prog["x86_cap_flags"] + bug_flags = prog["x86_bug_flags"] + + cpu_flags_list = [] + bug_flags_list = [] + + for i in range(ncapints): + flags = int(cpu_caps_bugs[i]) + for j in range(32): + if (flags & 1) and cpu_flags[i * 32 + j]: + cpu_flags_list.append( + cpu_flags[i * 32 + j].string_().decode("utf-8") + ) + flags >>= 1 + + for i in range(nbugints): + flags = int(cpu_caps_bugs[i + ncapints]) + for j in range(32): + if (flags & 1) and bug_flags[i * 32 + j]: + bug_flags_list.append( + bug_flags[i * 32 + j].string_().decode("utf-8") + ) + flags >>= 1 + + titles = [ + "CPU VENDOR", + "MODEL NAME", + "CPU FAMILY", + "CPUS", + "CPUS NUMA0", + "MICROCODE", + "CSTATES", + "CPU FLAGS", + "BUG FLAGS", + ] + + values = [ + cpu_vendor, + model_name, + cpu_family, + cpus, + cpus_numa0, + microcode, + cstates, + " ".join(cpu_flags_list), + " ".join(bug_flags_list), + ] + + cpuinfo_data = dict() + + for i in range(len(titles)): + cpuinfo_data[titles[i]] = values[i] + + return cpuinfo_data + + +def check_smt_enabled(prog: Program) -> bool: + """ + Checks if SMT (Simultaneous Multithreading) is enabled + """ + if "sched_smt_present" not in prog: + return prog["cpu_smt_control"] == prog["CPU_SMT_ENABLED"] + else: + return prog["sched_smt_present"].key.enabled.counter > 0 + + +def test_taint(prog: Program, flag: int) -> bool: + """ + Checks if the flag is set in `tainted_mask` + """ + return ((1 << flag) & prog["tainted_mask"]) == 1 + + +def get_meltdown_mitigation(prog: Program, cpu_caps_bugs: Object) -> str: + """ + Extracts Mitigation for Meltdown + """ + mitigation = "" + if (cpu_caps_bugs[7] >> 31) & 1: + mitigation = "Mitigation: PTI" + else: + if "x86_hyper_type" not in prog: + if ( + prog["xen_domain_type"] != prog["XEN_NATIVE"] + and prog["xen_domain_type"] == prog["XEN_PV_DOMAIN"] + ): # This is how it is defined in the kernel + mitigation = ( + "Unknown (XEN PV detected, hypervisor mitigation required)" + ) + elif prog["x86_hyper_type"] == prog["X86_HYPER_XEN_PV"]: + mitigation = ( + "Unknown (XEN PV detected, hypervisor mitigation required)" + ) + + return mitigation + + +def get_spectre_v1_mitigation(prog: Program) -> str: + """ + Extracts Mitigation for Spectre_V1 + """ + return ( + prog["spectre_v1_strings"][prog["spectre_v1_mitigation"]] + .string_() + .decode("utf-8") + ) + + +def get_spectre_v2_mitigation(prog: Program, cpu_caps_bugs: Object) -> str: + """ + Extracts Mitigation for Spectre_V2 + """ + if "SPECTRE_V2_LFENCE" not in prog: + mitigation = ( + prog["spectre_v2_strings"][prog["spectre_v2_enabled"]] + .string_() + .decode("utf-8") + ) + if test_taint(prog, 16) and ( + prog["spectre_v2_enabled"] == prog["SPECTRE_V2_RETPOLINE_GENERIC"] + or prog["spectre_v2_enabled"] == prog["SPECTRE_V2_RETPOLINE_AMD"] + ): + mitigation += " (non-retpoline module(s) has been loaded)" + if prog["use_ibrs"] & (1 << 3): + mitigation += ", IBRS_FW" + if prog["use_ibpb"] & 1: + mitigation += ", IBPB" + else: + if prog["spectre_v2_enabled"] == prog["SPECTRE_V2_LFENCE"]: + mitigation = "Vulnerable: LFENCE" + elif ( + prog["spectre_v2_enabled"] == prog["SPECTRE_V2_EIBRS"] + and not prog["sysctl_unprivileged_bpf_disabled"] + ): + mitigation = "Vulnerable: eIBRS with unprivileged eBPF" + elif ( + prog["sched_smt_present"].key.enabled.counter > 0 + and not prog["sysctl_unprivileged_bpf_disabled"] + and prog["spectre_v2_enabled"] == prog["SPECTRE_V2_EIBRS_LFENCE"] + ): + mitigation = ( + "Vulnerable: eIBRS+LFENCE with unprivileged eBPF and SMT" + ) + else: + mitigation = ( + prog["spectre_v2_strings"][prog["spectre_v2_enabled"]] + .string_() + .decode("utf-8") + ) + if (cpu_caps_bugs[7] >> 26) & 1: + if prog["switch_mm_always_ibpb"].key.enabled.counter > 0: + mitigation += ", IBPB: always-on" + elif prog["switch_mm_cond_ibpb"].key.enabled.counter > 0: + mitigation += ", IBPB: conditional" + else: + mitigation += ", IBPB: disabled" + if ( + "ibrs_firmware_enabled_key" in prog + and prog["ibrs_firmware_enabled_key"].key.enabled.counter > 0 + ): + mitigation += ", IBRS_FW" + elif (cpu_caps_bugs[7] >> 22) & 1: + mitigation += ", IBRS_FW" + + if not (3 < prog["spectre_v2_enabled"] < 7): # Not in EIBRS Mode + if ( + prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_NONE"] + ): + mitigation += ", STIBP: disabled" + elif ( + prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_STRICT"] + ): + mitigation += ", STIBP: forced" + elif ( + prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_STRICT_PREFERRED"] + ): + mitigation += ", STIBP: always-on" + elif ( + prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_PRCTL"] + or prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_SECCOMP"] + ): + if prog["switch_to_cond_stibp"].key.enabled.counter > 0: + mitigation += ", STIBP: conditional" + + if (cpu_caps_bugs[7] >> 19) & 1: + mitigation += ", RSB filling" + + if prog["spectre_v2_bad_module"]: + mitigation += " - vulnerable module loaded" + + return mitigation + + +def get_ssb_mitigation(prog: Program) -> str: + """ + Extracts Mitigation for spec_store_bypass + """ + return prog["ssb_strings"][prog["ssb_mode"]].string_().decode("utf-8") + + +def get_l1tf_mitigation(prog: Program, cpu_caps_bugs: Object) -> str: + """ + Extracts Mitigation for L1TF + """ + mitigation = "Mitigation: PTE Inversion" + if "l1tf_vmx_states" in prog: + if prog["l1tf_vmx_mitigation"] == prog["VMENTER_L1D_FLUSH_AUTO"]: + pass + elif ( + prog["l1tf_vmx_mitigation"] + == prog["VMENTER_L1D_FLUSH_EPT_DISABLED"] + or prog["l1tf_vmx_mitigation"] == prog["VMENTER_L1D_FLUSH_NEVER"] + ) and (check_smt_enabled(prog)): + mitigation += "; VMX: " + mitigation += ( + prog["l1tf_vmx_states"][prog["l1tf_vmx_mitigation"]] + .string_() + .decode("utf-8") + ) + else: + mitigation += "; VMX: " + mitigation += ( + prog["l1tf_vmx_states"][prog["l1tf_vmx_mitigation"]] + .string_() + .decode("utf-8") + ) + mitigation += ", SMT " + if check_smt_enabled(prog): + mitigation += "vulnerable" + else: + mitigation += "disabled" + + return mitigation + + +def get_mds_mitigation(prog: Program, cpu_caps_bugs: Object) -> str: + """ + Extracts Mitigation for MDS + """ + mitigation = ( + prog["mds_strings"][prog["mds_mitigation"]].string_().decode("utf-8") + ) + mitigation += "; SMT " + nbugints = 1 + ncapints = len(cpu_caps_bugs) - nbugints + if (cpu_caps_bugs[4] >> 31) & 1: + mitigation += "Host state unknown" + elif (cpu_caps_bugs[ncapints] >> 20) & 1: + if prog["mds_mitigation"] == prog["MDS_MITIGATION_OFF"]: + mitigation += "vulnerable" + elif check_smt_enabled(prog): + mitigation += "mitigated" + else: + mitigation += "disabled" + else: + if check_smt_enabled(prog): + mitigation += "vulnerable" + else: + mitigation += "disabled" + + return mitigation + + +def get_taa_mitigation(prog: Program, cpu_caps_bugs: Object) -> str: + """ + Extracts Mitigation for tsx_async_abort + """ + mitigation = ( + prog["taa_strings"][prog["taa_mitigation"]].string_().decode("utf-8") + ) + + if ( + prog["taa_mitigation"] == prog["TAA_MITIGATION_TSX_DISABLED"] + or prog["taa_mitigation"] == prog["TAA_MITIGATION_OFF"] + ): + pass + elif (cpu_caps_bugs[4] >> 31) & 1: + mitigation += "; SMT Host state unknown" + else: + mitigation += "; SMT " + if check_smt_enabled(prog): + mitigation += "vulnerable" + else: + mitigation += "disabled" + + return mitigation + + +def get_itlb_multihit_mitigation(prog: Program, cpu_caps_bugs: Object) -> str: + """ + Extracts Mitigation for itlb_multihit + """ + if "l1tf_vmx_states" not in prog: + mitigation = "Processor vulnerable" + elif "cr4_read_shadow" in prog: + if (not ((cpu_caps_bugs[7] >> 31) & 1)) or ( + not ((cpu_caps_bugs[4] >> 5) & 1) + ): + mitigation = "KVM: Mitigation: VMX unsupported" + elif not per_cpu(prog["cpu_tlbstate"], 1).cr4 & (1 << 13): + mitigation = "KVM: Mitigation: VMX disabled" + elif prog["itlb_multihit_kvm_mitigation"]: + mitigation = "KVM: Mitigation: Split huge pages" + else: + mitigation = "KVM: Vulnerable" + + return mitigation + + +def get_srbds_mitigation(prog: Program) -> str: + """ + Extracts Mitigation for SRBDS + """ + return ( + prog["srbds_strings"][prog["srbds_mitigation"]] + .string_() + .decode("utf-8") + ) + + +def get_mmio_stale_data_mitigation( + prog: Program, cpu_caps_bugs: Object +) -> str: + """ + Extracts Mitigation for mmio_stale_data and mmio_unknown + """ + nbugints = 1 + ncapints = len(cpu_caps_bugs) - nbugints + if (cpu_caps_bugs[ncapints] >> 26) & 1: + mitigation = "Unknown: No mitigations" + else: + mitigation = ( + prog["mmio_strings"][prog["mmio_mitigation"]] + .string_() + .decode("utf-8") + ) + if prog["mmio_mitigation"] == prog["MMIO_MITIGATION_OFF"]: + pass + elif (cpu_caps_bugs[4] >> 31) & 1: + mitigation += "; SMT Host state unknown" + else: + mitigation += "; SMT " + if check_smt_enabled(prog): + mitigation += "vulnerable" + else: + mitigation += "disabled" + + return mitigation + + +def get_retbleed_mitigation(prog: Program, cpuinfo: Object) -> str: + """ + Extracts Mitigation for Retbleed + """ + if "retbleed_state" in prog: + if prog["retbleed_state"] == prog["RETBLEED_MITIGATION_UNRET"]: + if cpuinfo.x86_vendor != 2 and cpuinfo.x86_vendor != 3: + mitigation = ( + "Vulnerable: untrained return thunk on non-Zen uarch" + ) + else: + mitigation = ( + prog["retbleed_strings"][prog["retbleed_state"]] + .string_() + .decode("utf-8") + + "; SMT " + ) + if not check_smt_enabled(prog): + mitigation += "disabled" + elif ( + prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_STRICT"] + or prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_STRICT_PREFERRED"] + ): + mitigation += "enabled with STIBP protection" + else: + mitigation += "vulnerable" + else: + mitigation = ( + prog["retbleed_strings"][prog["retbleed_state"]] + .string_() + .decode("utf-8") + ) + else: + if ( + prog["retbleed_mitigation"] == prog["RETBLEED_MITIGATION_UNRET"] + or prog["retbleed_mitigation"] == prog["RETBLEED_MITIGATION_IBPB"] + ): + if cpuinfo.x86_vendor != 2 and cpuinfo.x86_vendor != 3: + mitigation = "Vulnerable: untrained return thunk / IBPB on non-AMD based uarch" + else: + mitigation = ( + prog["retbleed_strings"][prog["retbleed_mitigation"]] + .string_() + .decode("utf-8") + + "; SMT " + ) + if not check_smt_enabled(prog): + mitigation += "disabled" + elif ( + prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_STRICT"] + or prog["spectre_v2_user_stibp"] + == prog["SPECTRE_V2_USER_STRICT_PREFERRED"] + ): + mitigation += "enabled with STIBP protection" + else: + mitigation += "vulnerable" + else: + mitigation = ( + prog["retbleed_strings"][prog["retbleed_mitigation"]] + .string_() + .decode("utf-8") + ) + + return mitigation + + +def get_gds_mitigation(prog: Program) -> str: + """ + Extracts Mitigation for GDS + """ + return ( + prog["gds_strings"][prog["gds_mitigation"]].string_().decode("utf-8") + ) + + +def get_cpu_mitigations(prog: Program) -> Dict[str, str]: + """ + Helper to get mitigations for vulnerabilities + + :returns: a dictionary of vulnerabilities with their mitigations + """ + vulns = [ + "Meltdown", + "Spectre_V1", + "Spectre_V2", + "SSBD", + "L1TF", + "MDS", + "tsx_async_abort", + "itlb_multihit", + "SRBDS", + "mmio_stale_data", + "mmio_unknown", + "Retbleed", + "spec_store_bypass", + "GDS", + "SRSO", + ] + + if "cpu_data" in prog: + cpuinfo_struct = prog["cpu_data"] + elif "boot_cpu_data" in prog: + cpuinfo_struct = prog["boot_cpu_data"] + else: + raise Exception( + "Failed to load CPU info: no cpuinfo struct found (tried 'cpu_data' and 'boot_cpu_data')" + ) + + cpu_caps_bugs = cpuinfo_struct.x86_capability + + mitigations = dict.fromkeys(vulns, "Not Affected") + + cpuinfo_data = get_cpu_info(prog) + bugs = cpuinfo_data["BUG FLAGS"].split() + + for bug in bugs: + if bug == "cpu_meltdown": + mitigations["Meltdown"] = get_meltdown_mitigation( + prog, cpu_caps_bugs + ) + + elif bug == "spectre_v1": + mitigations["Spectre_V1"] = get_spectre_v1_mitigation(prog) + + elif bug == "spectre_v2": + mitigations["Spectre_V2"] = get_spectre_v2_mitigation( + prog, cpu_caps_bugs + ) + + elif bug == "spec_store_bypass": + mitigations["spec_store_bypass"] = get_ssb_mitigation(prog) + + elif bug == "l1tf": + mitigations["L1TF"] = get_l1tf_mitigation(prog, cpu_caps_bugs) + + elif bug == "mds": + mitigations["MDS"] = get_mds_mitigation(prog, cpu_caps_bugs) + + elif bug == "taa": + mitigations["tsx_async_abort"] = get_taa_mitigation( + prog, cpu_caps_bugs + ) + + elif bug == "itlb_multihit": + mitigations["itlb_multihit"] = get_itlb_multihit_mitigation( + prog, cpu_caps_bugs + ) + + elif bug == "srbds": + mitigations["SRBDS"] = get_srbds_mitigation(prog) + + elif bug == "mmio_stale_data": + mitigations["mmio_stale_data"] = get_mmio_stale_data_mitigation( + prog, cpu_caps_bugs + ) + + elif bug == "mmio_unknown": + mitigations["mmio_unknown"] = get_mmio_stale_data_mitigation( + prog, cpu_caps_bugs + ) + + elif bug == "retbleed": + mitigations["Retbleed"] = get_retbleed_mitigation( + prog, cpuinfo_struct + ) + + elif bug == "gds": + mitigations["GDS"] = get_gds_mitigation(prog) + + elif bug == "srso": + mitigations["SRSO"] = "Vulnerable (Status Unknown)" + # Currently cannot handle this mitigation data because the kernel + # uses assembly code to fetch it + + else: + mitigations[bug] = "Vulnerable" + + return mitigations + + +def print_cpu_info(prog: Program) -> None: + """ + Prints the cpuinfo data + """ + cpuinfo_data = get_cpu_info(prog) + print_dictionary(cpuinfo_data) + + mitigation_data = get_cpu_mitigations(prog) + print("\nVULNERABILITIES:") + print_dictionary(mitigation_data) + + +class Cpu(CorelensModule): + """ + Corelens Module for cpuinfo + """ + + name = "cpuinfo" + + def run(self, prog: Program, args: argparse.Namespace) -> None: + print_cpu_info(prog) diff --git a/tests/test_cpuinfo.py b/tests/test_cpuinfo.py new file mode 100644 index 00000000..55fd2f00 --- /dev/null +++ b/tests/test_cpuinfo.py @@ -0,0 +1,44 @@ +# Copyright (c) 2023, Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from drgn import ProgramFlags + +from drgn_tools import cpuinfo + + +def test_cpuinfo(prog): + cpuinfo.print_cpu_info(prog) + + if not (ProgramFlags.IS_LIVE & prog.flags): + return + + file = open("/proc/cpuinfo", "r") + lines = file.readlines() + cpu_data_from_proc = dict() + for line in lines: + try: + title, value = line.split(":") + title, value = title.strip(), value.strip() + cpu_data_from_proc[title] = value + except Exception: + continue + + cpu_data_from_corelens = cpuinfo.get_cpu_info(prog) + + assert ( + cpu_data_from_corelens["CPU VENDOR"] == cpu_data_from_proc["vendor_id"] + ) + assert ( + cpu_data_from_corelens["MODEL NAME"] + == cpu_data_from_proc["model name"] + ) + assert ( + str(cpu_data_from_corelens["CPU FAMILY"]) + == cpu_data_from_proc["cpu family"] + ) + assert ( + str(cpu_data_from_corelens["MICROCODE"]) + == cpu_data_from_proc["microcode"] + ) + assert cpu_data_from_corelens["CSTATES"] == prog["max_cstate"] + assert cpu_data_from_corelens["CPU FLAGS"] == cpu_data_from_proc["flags"] + assert cpu_data_from_corelens["BUG FLAGS"] == cpu_data_from_proc["bugs"]