forked from Netflix/lemur
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
41e946f
commit 349fdd5
Showing
4 changed files
with
126 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,16 +5,17 @@ | |
:license: Apache, see LICENSE for more details. | ||
.. moduleauthor:: Kevin Glisson <[email protected]> | ||
""" | ||
import arrow | ||
import sys | ||
import click | ||
from time import sleep | ||
|
||
import arrow | ||
import click | ||
from flask import current_app | ||
from flask_principal import Identity, identity_changed | ||
from sentry_sdk import capture_exception | ||
from sqlalchemy import or_ | ||
from sqlalchemy.orm.exc import MultipleResultsFound | ||
from tabulate import tabulate | ||
from time import sleep | ||
from sentry_sdk import capture_exception | ||
|
||
from lemur import database | ||
from lemur.authorities.models import Authority | ||
|
@@ -35,7 +36,7 @@ | |
list_recent_valid_certs_issued_by_authority, | ||
get_certificates_with_same_cn_with_rotate_on, | ||
identify_and_persist_expiring_deployed_certificates, | ||
send_certificate_expiration_metrics | ||
send_certificate_expiration_metrics, get_by_serial | ||
) | ||
from lemur.certificates.verify import verify_string | ||
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS, CRLReason | ||
|
@@ -46,7 +47,6 @@ | |
from lemur.notifications.messaging import send_rotation_notification, send_reissue_no_endpoints_notification, \ | ||
send_reissue_failed_notification | ||
from lemur.plugins.base import plugins | ||
from sqlalchemy.orm.exc import MultipleResultsFound | ||
|
||
|
||
@click.group(name="certificates", help="Handles all certificate related tasks.") | ||
|
@@ -96,6 +96,25 @@ def validate_certificate(certificate_name): | |
return cert | ||
|
||
|
||
def validate_certificates_by_serial_numbers(serial_numbers): | ||
""" | ||
Ensuring that the specified certificate(s) exist. | ||
:param serial_numbers: | ||
:return: | ||
""" | ||
if serial_numbers: | ||
all_certs = [] | ||
for serial_number in serial_numbers: | ||
certs = get_by_serial(serial_number) | ||
|
||
if not certs or len(certs) == 0: | ||
click.echo(f"[-] No certificate found with serial number: {serial_number}") | ||
sys.exit(1) | ||
all_certs.extend(certs) | ||
|
||
return all_certs | ||
|
||
|
||
def validate_endpoint(endpoint_name): | ||
""" | ||
Ensuring that the specified endpoint exists. | ||
|
@@ -534,6 +553,13 @@ def rotate_region(endpoint_name, new_certificate_name, old_certificate_name, mes | |
"old_certificate_name", | ||
help="Name of the certificate you wish to reissue.", | ||
) | ||
@click.option( | ||
"-s", | ||
"--serial-numbers", | ||
"serial_numbers", | ||
multiple=True, | ||
help="Serial number(s) of the certificate(s) you wish to reissue.", | ||
) | ||
@click.option( | ||
"-a", | ||
"--notify", | ||
|
@@ -550,15 +576,17 @@ def rotate_region(endpoint_name, new_certificate_name, old_certificate_name, mes | |
default=False, | ||
help="Persist changes.", | ||
) | ||
def reissue_command(old_certificate_name, notify, commit): | ||
reissue(old_certificate_name, notify, commit) | ||
def reissue_command(old_certificate_name, serial_numbers, notify, commit): | ||
reissue(old_certificate_name, serial_numbers, notify, commit) | ||
|
||
|
||
def reissue(old_certificate_name, notify, commit): | ||
def reissue(old_certificate_name, serial_numbers, notify, commit): | ||
""" | ||
Reissues certificate with the same parameters as it was originally issued with. | ||
If not time period is provided, reissues certificate as valid from today to | ||
If no time period is provided, reissues certificate as valid from today to | ||
today + length of original. | ||
Accepts both name and a list of serial numbers; if both are provided, both will be used. | ||
If neither name nor serial numbers are provided, all certs pending reissue will be reissued. | ||
""" | ||
if commit: | ||
click.echo("[!] Running in COMMIT mode.") | ||
|
@@ -568,12 +596,24 @@ def reissue(old_certificate_name, notify, commit): | |
status = FAILURE_METRIC_STATUS | ||
|
||
try: | ||
old_cert = validate_certificate(old_certificate_name) | ||
certs_to_reissue = [] | ||
|
||
if not old_cert: | ||
for certificate in get_all_pending_reissue(): | ||
request_reissue(certificate, notify, commit) | ||
else: | ||
old_cert_by_name = validate_certificate(old_certificate_name) | ||
if old_cert_by_name: | ||
certs_to_reissue.append(old_cert_by_name) | ||
click.echo("[+] Reissuing 1 certificate by name") | ||
|
||
old_certs_by_serial_numbers = validate_certificates_by_serial_numbers(serial_numbers) | ||
if old_certs_by_serial_numbers: | ||
certs_to_reissue.extend(old_certs_by_serial_numbers) | ||
click.echo(f"[+] Reissuing {len(old_certs_by_serial_numbers)} certificates by serial number") | ||
|
||
# if neither name nor serial numbers were specified, reissue all pending reissues | ||
if not certs_to_reissue: | ||
certs_to_reissue = get_all_pending_reissue() | ||
click.echo(f"[+] Reissuing all {len(certs_to_reissue)} pending certificates.") | ||
|
||
for old_cert in certs_to_reissue: | ||
request_reissue(old_cert, notify, commit) | ||
|
||
status = SUCCESS_METRIC_STATUS | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -979,6 +979,73 @@ def test_reissue_certificate( | |
assert new_cert.organization == certificate.organization | ||
|
||
|
||
def test_reissue_command_by_name( | ||
issuer_plugin, crypto_authority, logged_in_user | ||
): | ||
from lemur.certificates.cli import reissue | ||
from lemur.tests.conf import LEMUR_DEFAULT_ORGANIZATION | ||
from lemur.tests.factories import CertificateFactory | ||
|
||
certificate = CertificateFactory(name="to_be_reissued_cert", authority=crypto_authority) | ||
|
||
reissue(certificate.name, None, False, True) | ||
|
||
new_cert = certificate.replaced[0] | ||
assert new_cert | ||
assert new_cert.key_type == "RSA2048" | ||
assert new_cert.organization != certificate.organization | ||
# Check for default value since authority does not have cab_compliant option set | ||
assert new_cert.organization == LEMUR_DEFAULT_ORGANIZATION | ||
assert new_cert.description.startswith(f"Reissued by Lemur for cert ID {certificate.id}") | ||
|
||
|
||
def test_reissue_command_by_serial_numbers( | ||
issuer_plugin, crypto_authority, logged_in_user | ||
): | ||
from lemur.certificates.cli import reissue | ||
from lemur.tests.conf import LEMUR_DEFAULT_ORGANIZATION | ||
from lemur.tests.factories import CertificateFactory | ||
|
||
cert1 = CertificateFactory(name="to_be_reissued_cert_1", authority=crypto_authority) | ||
cert2 = CertificateFactory(name="to_be_reissued_cert_2", authority=crypto_authority) | ||
cert3 = CertificateFactory(name="to_be_reissued_cert_3", authority=crypto_authority) | ||
|
||
reissue(None, [cert1.serial, cert2.serial, cert3.serial], False, True) | ||
|
||
for cert in [cert1, cert2, cert3]: | ||
new_cert = cert.replaced[0] | ||
assert new_cert | ||
assert new_cert.key_type == "RSA2048" | ||
assert new_cert.organization != cert.organization | ||
# Check for default value since authority does not have cab_compliant option set | ||
assert new_cert.organization == LEMUR_DEFAULT_ORGANIZATION | ||
assert new_cert.description.startswith(f"Reissued by Lemur for cert ID {cert.id}") | ||
|
||
|
||
def test_reissue_command_by_name_and_serial_numbers( | ||
issuer_plugin, crypto_authority, logged_in_user | ||
): | ||
from lemur.certificates.cli import reissue | ||
from lemur.tests.conf import LEMUR_DEFAULT_ORGANIZATION | ||
from lemur.tests.factories import CertificateFactory | ||
|
||
cert1 = CertificateFactory(name="to_be_reissued_cert_1", authority=crypto_authority) | ||
cert2 = CertificateFactory(name="to_be_reissued_cert_2", authority=crypto_authority) | ||
cert3 = CertificateFactory(name="to_be_reissued_cert_3", authority=crypto_authority) | ||
cert4 = CertificateFactory(name="to_be_reissued_cert_4", authority=crypto_authority) | ||
|
||
reissue(cert1.name, [cert2.serial, cert3.serial, cert4.serial], False, True) | ||
|
||
for cert in [cert1, cert2, cert3, cert4]: | ||
new_cert = cert.replaced[0] | ||
assert new_cert | ||
assert new_cert.key_type == "RSA2048" | ||
assert new_cert.organization != cert.organization | ||
# Check for default value since authority does not have cab_compliant option set | ||
assert new_cert.organization == LEMUR_DEFAULT_ORGANIZATION | ||
assert new_cert.description.startswith(f"Reissued by Lemur for cert ID {cert.id}") | ||
|
||
|
||
def test_create_csr(): | ||
csr, private_key = create_csr( | ||
owner="[email protected]", | ||
|