diff --git a/dracut/parse-kickstart b/dracut/parse-kickstart index 0ae24bf8339..91bc1b9cebc 100755 --- a/dracut/parse-kickstart +++ b/dracut/parse-kickstart @@ -66,6 +66,8 @@ TMPDIR = "/tmp" ARPHRD_ETHER = "1" ARPHRD_INFINIBAND = "32" +CERT_TRANSPORT_DIR = "/run/install/certificates" + # Helper function for reading simple files in /sys def readsysfile(f): '''Return the contents of f, or "" if missing.''' @@ -403,6 +405,44 @@ def ksnet_to_dracut(args, lineno, net, bootdev=False): return " ".join(line) + +def _dump_certificate(cert, root="/", dump_dir=None): + """Dump the certificate into specified file.""" + dump_dir = dump_dir or cert.dir + if not dump_dir: + log.error("Certificate destination is missing for %s", cert.filename) + return + + dst_dir = os.path.join(root, dump_dir.lstrip('/')) + log.debug("Dumping certificate %s into %s.", cert.filename, dst_dir) + if not os.path.exists(dst_dir): + log.debug("Path %s for certificate does not exist, creating.", dst_dir) + os.makedirs(dst_dir) + + dst = os.path.join(dst_dir, cert.filename) + + if os.path.exists(dst): + log.warning("Certificate file %s already exists, replacing.", dst) + + with open(dst, 'w') as f: + f.write(cert.cert) + f.write('\n') + + +def process_certificates(handler): + """Import certificates defined in %certificate sections.""" + for cert in handler.certificates: + log.info("Processing kickstart certificate %s", cert.filename) + + if not cert.filename: + log.error("Missing certificate file name, skipping.") + continue + + _dump_certificate(cert) + # Dump for transport to switchroot + _dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/") + + def process_kickstart(ksfile): handler = DracutHandler() try: @@ -422,6 +462,7 @@ def process_kickstart(ksfile): with open(TMPDIR+"/ks.info", "a") as f: f.write('parsed_kickstart="%s"\n' % processed_file) log.info("finished parsing kickstart") + process_certificates(handler) return processed_file, handler.output if __name__ == '__main__': diff --git a/tests/unit_tests/dracut_tests/test_parse_kickstart.py b/tests/unit_tests/dracut_tests/test_parse_kickstart.py index d444af0fcef..e02639a69a6 100644 --- a/tests/unit_tests/dracut_tests/test_parse_kickstart.py +++ b/tests/unit_tests/dracut_tests/test_parse_kickstart.py @@ -22,6 +22,19 @@ import subprocess import re +CERT_CONTENT = """-----BEGIN CERTIFICATE----- +MIIBjTCCATOgAwIBAgIUWR5HO3v/0I80Ne0jQWVZFODuWLEwCgYIKoZIzj0EAwIw +FDESMBAGA1UEAwwJUlZURVNUIENBMB4XDTI0MTEyMDEzNTk1N1oXDTM0MTExODEz +NTk1N1owFDESMBAGA1UEAwwJUlZURVNUIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0D +AQcDQgAELghFKGEgS8+5/2nx50W0xOqTrKc2Jz/rD/jfL0m4z4fkeAslCOkIKv74 +0wfBXMngxi+OF/b3Vh8FmokuNBQO5qNjMGEwHQYDVR0OBBYEFOJarl9Xkd13sLzI +mHqv6aESlvuCMB8GA1UdIwQYMBaAFOJarl9Xkd13sLzImHqv6aESlvuCMA8GA1Ud +EwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMAoGCCqGSM49BAMCA0gAMEUCIAet +7nyre42ReoRKoyHWLDsQmQDzoyU3FQdC0cViqOtrAiEAxYIL+XTTp7Xy9RNE4Xg7 +yNWXfdraC/AfMM8fqsxlVJM= +-----END CERTIFICATE-----""" + + class BaseTestCase(unittest.TestCase): def setUp(self): # create the directory used for file/folder tests @@ -242,3 +255,60 @@ def test_bootloader(self): lines = self.execParseKickstart(ks_file.name) assert lines[0] == "inst.extlinux", lines + + def _check_cert_file(self, cert_file, content): + with open(cert_file) as f: + # Anaconda adds `\n` to the value when dumping it + assert f.read() == content+'\n' + + def test_certificate(self): + filename = "rvtest.pem" + cdir = os.path.join(self.tmpdir, "cert_dir/subdir") + content = CERT_CONTENT + ks_cert = f""" +%certificate --filename={filename} --dir={cdir} +{content} +%end +""" + cert_file = os.path.join(cdir, filename) + + with tempfile.NamedTemporaryFile(mode="w+t") as ks_file: + ks_file.write(ks_cert) + ks_file.flush() + lines = self.execParseKickstart(ks_file.name) + assert lines == [] + + self._check_cert_file(cert_file, content) + + # Check existence for file for transport to root + CERT_TRANSPORT_DIR = "/run/install/certificates" + transport_file = os.path.join(CERT_TRANSPORT_DIR, cert_file) + self._check_cert_file(transport_file, content) + + def test_certificate_existing(self): + filename = "rvtest.pem" + cdir = os.path.join(self.tmpdir, "cert_dir/subdir") + content = CERT_CONTENT + ks_cert = f""" +%certificate --filename={filename} --dir={cdir} +{content} +%end +""" + cert_file = os.path.join(cdir, filename) + + # Existing file should be overwritten + os.makedirs(cdir) + open(cert_file, 'w') + + with tempfile.NamedTemporaryFile(mode="w+t") as ks_file: + ks_file.write(ks_cert) + ks_file.flush() + lines = self.execParseKickstart(ks_file.name) + assert lines == [] + + self._check_cert_file(cert_file, content) + + # Check existence for file for transport to root + CERT_TRANSPORT_DIR = "/run/install/certificates" + transport_file = os.path.join(CERT_TRANSPORT_DIR, cert_file) + self._check_cert_file(transport_file, content)