Skip to content

Commit

Permalink
Merge PKIDeployer.retrieve_cert_chain()
Browse files Browse the repository at this point in the history
The PKIDeployer.retrieve_cert_chain() has been merged into
import_cert_chain() such that the cert chain will only be
imported once, either from file, issuing CA, or master CA.
  • Loading branch information
edewata committed May 17, 2024
1 parent bbe6472 commit 7a4103c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 49 deletions.
79 changes: 31 additions & 48 deletions base/server/python/pki/server/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2272,78 +2272,61 @@ def import_certs_and_keys(self, nssdb):
pkcs12_password = self.mdict['pki_external_pkcs12_password']
nssdb.import_pkcs12(pkcs12_file, pkcs12_password)

def import_cert_chain(self, nssdb):
def import_cert_chain(self, nssdb, subsystem):

logger.debug('PKIDeployer.import_cert_chain()')

param = 'pki_cert_chain_path'
cert_chain_path = self.mdict.get(param)

if not cert_chain_path:
# no cert chain to import
return

if not os.path.exists(cert_chain_path):
raise Exception('Certificate chain not found: %s' % cert_chain_path)

logger.info('Importing cert chain from %s', cert_chain_path)

nickname = self.mdict['pki_cert_chain_nickname']

nssdb.import_cert_chain(
nickname=nickname,
cert_chain_file=cert_chain_path,
trust_attributes='CT,C,C')

def retrieve_cert_chain(self, nssdb, subsystem):

external = self.configuration_file.external
standalone = self.configuration_file.standalone
subordinate = self.configuration_file.subordinate
clone = self.configuration_file.clone

system_certs_imported = \
self.mdict['pki_server_pkcs12_path'] != '' or \
self.mdict['pki_clone_pkcs12_path'] != ''
cert_chain = None

if subsystem.type == 'CA' and external or \
subsystem.type in ['KRA', 'OCSP'] and standalone:
cert_chain_path = self.mdict.get('pki_cert_chain_path')
if cert_chain_path:

# For external sub-CA and standalone KRA/OCSP, no need to retrieve
# the cert chain since it's already imported from a local file by
# import_cert_chain().
# Load the cert chain from file specified in pki_cert_chain_path.

return
if not os.path.exists(cert_chain_path):
raise Exception('Certificate chain not found: %s' % cert_chain_path)

logger.info('Loading cert chain from %s', cert_chain_path)
with open(cert_chain_path, 'r', encoding='utf-8') as f:
cert_chain = f.read()

elif (subsystem.type == 'CA' and subordinate or subsystem.type != 'CA') and \
not clone and not system_certs_imported:
elif (subsystem.type == 'CA' and subordinate or subsystem.type != 'CA') \
and not clone \
and not self.mdict['pki_server_pkcs12_path']:

# For primary (not clone) sub-CA and KRA, OCSP, TKS, and TPS,
# retrieve the cert chain from the issuing CA unless it's already
# imported from PKCS #12 file by import_server_pkcs12().
# imported from PKCS #12 file specified in pki_server_pkcs12_path.

url = self.mdict['pki_issuing_ca']
logger.info('Retrieving cert chain from %s', url)
cert_chain = self.get_ca_signing_cert(url)

elif subsystem.type == 'CA' and clone and not system_certs_imported:
elif subsystem.type == 'CA' \
and clone \
and not self.mdict['pki_clone_pkcs12_path']:

# For root CA and sub-CA clone, retrieve the cert chain from the
# primary server unless it's already imported from a PKCS #12 file
# by import_clone_pkcs12().
# specified in pki_clone_pkcs12_path.

url = self.mdict['pki_clone_uri']
logger.info('Retrieving cert chain from %s', url)
cert_chain = self.get_ca_signing_cert(url)

else:
if not cert_chain:
return

logger.info('Retrieving cert chain from %s', url)
cert_chain = self.get_ca_signing_cert(url)

base64_chain = pki.nssdb.convert_pkcs7(cert_chain, 'pem', 'base64')
subsystem.set_config('preop.ca.pkcs7', base64_chain)
nickname = self.mdict['pki_cert_chain_nickname']
logger.info('Importing cert chain as %s', nickname)
logger.info('- cert chain:\n%s', cert_chain)

logger.info('Importing cert chain from %s', url)
nssdb.import_pkcs7(
pkcs7_data=cert_chain,
nssdb.import_cert_chain(
nickname=nickname,
cert_chain_data=cert_chain,
trust_attributes='CT,C,C')

def import_system_certs(self, nssdb, subsystem):
Expand Down Expand Up @@ -2387,7 +2370,7 @@ def import_system_certs(self, nssdb, subsystem):
# to ensure that the system certs are imported with
# the correct nicknames.

self.import_cert_chain(nssdb)
self.import_cert_chain(nssdb, subsystem)

def update_system_certs(self, subsystem):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def spawn(self, deployer):
nssdb = instance.open_nssdb()
try:
deployer.import_system_certs(nssdb, subsystem)
deployer.retrieve_cert_chain(nssdb, subsystem)

finally:
nssdb.close()
Expand Down

0 comments on commit 7a4103c

Please sign in to comment.