-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a tool to convert legacy policy files to new format
- Loading branch information
Showing
4 changed files
with
244 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
# -*- encoding: utf8 -*- | ||
# | ||
# The Qubes OS Project, http://www.qubes-os.org | ||
# | ||
# Copyright (C) 2023 Marta Marczykowska-Górecka | ||
# <[email protected]> | ||
# | ||
# This program is free software; you can redistribute it and/or modify | ||
# it under the terms of the GNU Lesser General Public License as published by | ||
# the Free Software Foundation; either version 2.1 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Lesser General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Lesser General Public License along | ||
# with this program; if not, see <http://www.gnu.org/licenses/>. | ||
# pylint: disable=too-many-locals,too-many-branches,too-many-statements | ||
import pathlib | ||
import shutil | ||
import subprocess | ||
import sys | ||
|
||
from qrexec.policy import parser | ||
|
||
NEW_POLICY_DIR = pathlib.Path('/etc/qubes/policy.d') | ||
OLD_POLICY_DIR = pathlib.Path('/etc/qubes-rpc/policy') | ||
|
||
CONFIG_FILE = '30-user' | ||
|
||
|
||
class NoCompatPolicy(parser.FilePolicy): | ||
""" | ||
This class loads policy without loading 4.0 compatible policy format | ||
""" | ||
def handle_compat40(self, *, filepath, lineno): | ||
return | ||
|
||
|
||
class RuleWrapper: | ||
def __init__(self, rule: parser.Rule): | ||
self.rule: parser.Rule = rule | ||
self.service = rule.service | ||
|
||
def __eq__(self, other): | ||
return self.rule.filepath == other.rule.filepath \ | ||
and self.rule.lineno == other.rule.lineno | ||
|
||
def __str__(self): | ||
return str(self.rule) | ||
|
||
def main(_args=None): | ||
# get initial state | ||
initial_state = set(subprocess.check_output( | ||
'qrexec-policy-graph').decode().split('\n')) | ||
|
||
print("Converting old policy files into new format files....") | ||
current_policy = parser.FilePolicy() | ||
current_policy_no_compat = NoCompatPolicy() | ||
|
||
all_rules = [RuleWrapper(rule) for rule in current_policy.rules] | ||
new_rules = [RuleWrapper(rule) for rule in current_policy_no_compat.rules] | ||
|
||
# all rules that exist only in legacy files | ||
legacy_rules = [rule for rule in all_rules if rule not in new_rules] | ||
|
||
# all services for which a legacy rule exists | ||
all_services = {rule.service for rule in legacy_rules} | ||
|
||
# dict of file_name: rules list | ||
rules_to_save = {CONFIG_FILE: []} | ||
|
||
for service in all_services: | ||
legacy = [rule for rule in legacy_rules if rule.service == service] | ||
non_legacy = [rule for rule in new_rules if rule.service == service] | ||
|
||
legacy_str = [str(rule) for rule in legacy] | ||
non_legacy_str = [str(rule) for rule in non_legacy] | ||
|
||
if legacy_str == non_legacy_str or not legacy: | ||
continue | ||
|
||
missing = [rule for rule in legacy if str(rule) not in non_legacy_str] | ||
if not missing: | ||
continue | ||
|
||
last_working_rule = len(missing) | ||
for i, rule in enumerate(missing): | ||
if str(rule.rule.action) == 'deny' \ | ||
and rule.rule.source == '@anyvm' \ | ||
and rule.rule.target == '@anyvm': | ||
last_working_rule = i | ||
break | ||
missing = missing[:last_working_rule] | ||
if not missing: | ||
continue | ||
|
||
# services that have configtool support | ||
if service == 'qubes.ClipboardPaste': | ||
filename = '50-config-clipboard' | ||
rules_to_save[filename] = [] | ||
for rule in missing: | ||
if rule.rule.action == 'allow': | ||
# tool does not support allow-rules | ||
rules_to_save[CONFIG_FILE].append(rule) | ||
continue | ||
rules_to_save[filename].append(rule) | ||
|
||
elif service == 'qubes.Filecopy': | ||
filename = '50-config-filecopy' | ||
rules_to_save[filename] = missing | ||
continue | ||
|
||
elif service == 'qubes.UpdatesProxy': | ||
filename = '50-config-updates' | ||
rules_to_save[filename] = missing | ||
continue | ||
|
||
elif service in ('qubes.OpenInVM', 'qubes.OpenURL'): | ||
filename = '50-config-openinvm' \ | ||
if service == 'qubes.OpenInVM' else '50-config-openurl' | ||
rules_to_save[filename] = [] | ||
for rule in missing: | ||
if rule.rule.target != '@dispvm': | ||
# tool does not support cases other than dispvm-related | ||
rules_to_save[CONFIG_FILE].append(rule) | ||
continue | ||
if str(rule.rule.action) == 'ask': | ||
target = str(rule.rule.action.default_target) | ||
elif str(rule.rule.action) == 'allow': | ||
target = str(rule.rule.action.target) | ||
else: | ||
target = '@dispvm' | ||
if '@dispvm' not in target: | ||
rules_to_save[CONFIG_FILE].append(rule) | ||
continue | ||
rules_to_save[filename].append(rule) | ||
rules_to_save[filename] = missing | ||
|
||
elif service == 'qubes.Gpg': | ||
filename = '50-config-splitgpg' | ||
rules_to_save[filename] = [] | ||
for rule in missing: | ||
if rule.rule.source == '@anyvm': | ||
if rule.rule.target.type == 'keyword': | ||
# not supported main rule | ||
rules_to_save[CONFIG_FILE].append(rule) | ||
else: | ||
rules_to_save[filename].append(rule) | ||
else: | ||
rules_to_save[filename].append(rule) | ||
elif service in ('qubes.InputKeyboard', | ||
'qubes.InputMouse', | ||
'qubes.InputTablet'): | ||
filename = '50-config-input' | ||
if filename not in rules_to_save: | ||
rules_to_save[filename] = [] | ||
|
||
rule = missing.pop() | ||
|
||
if rule.rule.target != '@adminvm': | ||
rules_to_save[CONFIG_FILE].append(rule) | ||
elif rule.rule.action.user: | ||
rules_to_save[CONFIG_FILE].append(rule) | ||
else: | ||
rules_to_save[filename].append(rule) | ||
if missing: | ||
rules_to_save[CONFIG_FILE].extend(missing) | ||
|
||
elif service in ('u2f.Authenticate', 'u2f.Register', | ||
'policy.RegisterArgument'): | ||
filename = '50-config-u2f' | ||
if filename not in rules_to_save: | ||
rules_to_save[filename] = [] | ||
rules_to_save[filename].extend(missing) | ||
else: | ||
rules_to_save[CONFIG_FILE].extend(missing) | ||
|
||
# do actual rule saving | ||
for filename, rules in rules_to_save.items(): | ||
if not rules: | ||
continue | ||
|
||
file = NEW_POLICY_DIR / (filename + ".policy") | ||
text = '\n'.join([str(rule.rule) for rule in rules]) + '\n' | ||
|
||
if file.exists(): | ||
shutil.copy(file, str(file) + '.bak') | ||
# prepend existing rules | ||
current_text = file.read_text() | ||
text = text + current_text | ||
|
||
print("Writing " + str(file) + '...') | ||
file.write_text(text) | ||
|
||
# remove old | ||
for file in OLD_POLICY_DIR.iterdir(): | ||
if file.is_file() and not file.name.endswith('.rpmsave'): | ||
file.rename(str(file) + '.rpmsave') | ||
|
||
# check if state changed | ||
current_state = set(subprocess.check_output( | ||
'qrexec-policy-graph').decode().split('\n')) | ||
|
||
if initial_state != current_state: | ||
print("ERROR: Found the following differences between " | ||
"previous and converted policy states:") | ||
print("OLD STATE") | ||
for line in initial_state.difference(current_state): | ||
print(line) | ||
print("NEW STATE") | ||
for line in current_state.difference(initial_state): | ||
print(line) | ||
if input("Do you want to restore initial state? [Y/n] ").upper() != "N": | ||
# rename all old | ||
for file in OLD_POLICY_DIR.iterdir(): | ||
if file.is_file() and file.name.endswith('.rpmsave'): | ||
file.rename(file.with_suffix('')) | ||
|
||
# remove all new | ||
for filename in rules_to_save: | ||
file = NEW_POLICY_DIR / (filename + ".policy") | ||
if file.exists(): | ||
file.unlink() | ||
|
||
# revert all new | ||
for file in NEW_POLICY_DIR.iterdir(): | ||
if file.is_file() and file.name.endswith('.bak'): | ||
file.rename(file.with_suffix('')) | ||
print("Conversion reverted.") | ||
|
||
sys.exit(1) | ||
|
||
print("Successfully converted old policy to new format.") | ||
|
||
|
||
if __name__ == "__main__": | ||
sys.exit(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters