Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Security policy convert and push (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
radkrawczyk authored Jun 27, 2024
1 parent 43580a3 commit 503517c
Show file tree
Hide file tree
Showing 8 changed files with 420 additions and 36 deletions.
17 changes: 16 additions & 1 deletion catalystwan/models/configuration/config_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
from catalystwan.models.policy.definition.ssl_decryption import NetworkDecryptionRuleSequence, UrlProfile
from catalystwan.models.policy.definition.zone_based_firewall import ZoneBasedFWPolicyEntry
from catalystwan.models.policy.localized import LocalizedPolicyInfo
from catalystwan.models.policy.security import AnySecurityPolicyInfo
from catalystwan.models.policy.security import (
AnySecurityPolicyInfo,
HighSpeedLoggingEntry,
HighSpeedLoggingList,
LoggingEntry,
ZoneToNoZoneInternet,
)
from catalystwan.models.templates import FeatureTemplateInformation, TemplateInformation
from catalystwan.version import parse_api_version

Expand Down Expand Up @@ -552,6 +558,14 @@ class SslDecryptioneResidues:
profiles: List[UrlProfile]


@dataclass
class SecurityPolicyResidues:
high_speed_logging_setting: Optional[Union[HighSpeedLoggingEntry, HighSpeedLoggingList]]
logging_setting: Optional[List[LoggingEntry]] = None
zone_to_no_zone_internet_setting: Optional[ZoneToNoZoneInternet] = None
platform_match_setting: Optional[str] = None


@dataclass
class PolicyConvertContext:
# conversion input
Expand All @@ -569,6 +583,7 @@ class PolicyConvertContext:
ssl_profile_residues: Dict[UUID, SslProfileResidues] = field(default_factory=dict)
url_filtering_target_vpns: Dict[UUID, List[VpnId]] = field(default_factory=dict)
zone_based_firewall_residues: Dict[UUID, List[ZoneBasedFWPolicyEntry]] = field(default_factory=dict)
security_policy_residues: Dict[UUID, SecurityPolicyResidues] = field(default_factory=dict)

def get_vpn_id_to_vpn_name_map(self) -> Dict[Union[str, int], List[str]]:
vpn_map: Dict[Union[str, int], List[str]] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing_extensions import Annotated

from .ngfirewall import NgfirewallParcel
from .policy import PolicyParcel
from .policy import PolicyParcel, PolicySettings

AnyEmbeddedSecurityParcel = Annotated[
Union[
Expand All @@ -16,7 +16,7 @@
Field(discriminator="type_"),
]

__all__ = ("AnyEmbeddedSecurityParcel", "PolicyParcel", "NgfirewallParcel")
__all__ = ("AnyEmbeddedSecurityParcel", "PolicyParcel", "PolicySettings", "NgfirewallParcel")


def __dir__() -> "List[str]":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Copyright 2023 Cisco Systems, Inc. and its affiliates
# Copyright 2024 Cisco Systems, Inc. and its affiliates
from typing import List, Literal, Union
from typing import List, Literal, Optional, Union
from uuid import UUID

from pydantic import AliasPath, BaseModel, ConfigDict, Field
from typing_extensions import Self

from catalystwan.api.configuration_groups.parcel import Global, Variable, _ParcelBase
from catalystwan.api.configuration_groups.parcel import Global, Variable, _ParcelBase, as_global, as_optional_global
from catalystwan.models.configuration.feature_profile.common import RefIdItem

PredefinedZone = Literal["self", "default", "untrusted"]
Expand All @@ -28,17 +29,61 @@ class NetworkSettings(BaseModel):

class PolicySettings(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="forbid")
tcp_syn_flood_limit: Global[str] = Field(default=None, validation_alias="tcpSynFloodLimit")
max_incomplete_tcp_limit: Global[str] = Field(default=None, validation_alias="maxIncompleteTcpLimit")
max_incomplete_udp_limit: Global[str] = Field(default=None, validation_alias="maxIncompleteUdpLimit")
tcp_syn_flood_limit: Optional[Global[str]] = Field(
default=None, validation_alias="tcpSynFloodLimit", serialization_alias="tcpSynFloodLimit"
)
max_incomplete_tcp_limit: Optional[Global[str]] = Field(
default=None, validation_alias="maxIncompleteTcpLimit", serialization_alias="maxIncompleteTcpLimit"
)
max_incomplete_udp_limit: Optional[Global[str]] = Field(
default=None, validation_alias="maxIncompleteUdpLimit", serialization_alias="maxIncompleteUdpLimit"
)
max_incomplete_icmp_limit: Optional[Global[str]] = Field(
default=None, validation_alias="maxIncompleteIcmpLimit", serialization_alias="maxIncompleteIcmpLimit"
)
audit_trail: Optional[Global[SettingOn]] = Field(
default=None, validation_alias="auditTrail", serialization_alias="auditTrail"
)
unified_logging: Optional[Global[SettingOn]] = Field(
default=None, validation_alias="unifiedLogging", serialization_alias="unifiedLogging"
)
session_reclassify_allow: Optional[Global[SettingOn]] = Field(
default=None, validation_alias="sessionReclassifyAllow", serialization_alias="sessionReclassifyAllow"
)
icmp_unreachable_allow: Optional[Global[SettingOn]] = Field(
default=None, validation_alias="icmpUnreachableAllow", serialization_alias="icmpUnreachableAllow"
)
failure_mode: Optional[Global[FailureMode]] = Field(
default=None, validation_alias="failureMode", serialization_alias="failureMode"
)
security_logging: NetworkSettings = Field(
default=None, validation_alias="securityLogging", serialization_alias="securityLogging"
)

max_incomplete_icmp_limit: Global[str] = Field(default=None, validation_alias="maxIncompleteIcmpLimit")
audit_trail: Global[SettingOn] = Field(default=None, validation_alias="auditTrail")
unified_logging: Global[SettingOn] = Field(default=None, validation_alias="unifiedLogging")
session_reclassify_allow: Global[SettingOn] = Field(default=None, validation_alias="sessionReclassifyAllow")
icmp_unreachable_allow: Global[SettingOn] = Field(default=None, validation_alias="icmpUnreachableAllow")
failure_mode: Global[FailureMode] = Field(default=None, validation_alias="failureMode")
security_logging: NetworkSettings = Field(default=None, validation_alias="securityLogging")
@classmethod
def create(
cls,
tcp_syn_flood_limit: Optional[str] = None,
max_incomplete_tcp_limit: Optional[str] = None,
max_incomplete_udp_limit: Optional[str] = None,
max_incomplete_icmp_limit: Optional[str] = None,
unified_logging: Optional[SettingOn] = None,
session_reclassify_allow: Optional[SettingOn] = None,
failure_mode: Optional[FailureMode] = None,
audit_trail: Optional[SettingOn] = None,
icmp_unreachable_allow: Optional[SettingOn] = None,
) -> Self:
return cls(
tcp_syn_flood_limit=as_optional_global(tcp_syn_flood_limit),
max_incomplete_tcp_limit=as_optional_global(max_incomplete_tcp_limit),
max_incomplete_udp_limit=as_optional_global(max_incomplete_udp_limit),
max_incomplete_icmp_limit=as_optional_global(max_incomplete_icmp_limit),
unified_logging=as_optional_global(unified_logging, SettingOn),
session_reclassify_allow=as_optional_global(session_reclassify_allow, SettingOn),
failure_mode=as_optional_global(failure_mode, FailureMode),
audit_trail=as_optional_global(audit_trail, SettingOn),
icmp_unreachable_allow=as_optional_global(icmp_unreachable_allow, SettingOn),
)


class NgFirewallEntry(BaseModel):
Expand All @@ -50,6 +95,23 @@ class NgFirewallEntry(BaseModel):
validation_alias="dstZone", serialization_alias="dstZone"
)

@classmethod
def create(cls, src_zone: Union[UUID, PredefinedZone], dst_zone: Union[UUID, PredefinedZone]) -> Self:
if type(src_zone) is UUID:
_src_zone = RefIdItem.from_uuid(src_zone)
else:
_src_zone = as_global(src_zone, PredefinedZone)

if type(dst_zone) is UUID:
_dst_zone = RefIdItem.from_uuid(dst_zone)
else:
_dst_zone = as_global(dst_zone, PredefinedZone)

return cls(
src_zone=_src_zone,
dst_zone=_dst_zone,
)


class NgFirewall(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="forbid")
Expand All @@ -59,18 +121,26 @@ class NgFirewall(BaseModel):

class NgFirewallContainer(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="forbid")
ngfirewall: NgFirewall = Field(validation_alias="ngfirewall", serialization_alias="ngfirewall")
ng_firewall: NgFirewall = Field(validation_alias="ngfirewall", serialization_alias="ngfirewall")


class SslDecryption(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="forbid")
ssl_decryption: RefIdItem = Field(validation_alias="sslDecryption", serialization_alias="sslDecryption")

@classmethod
def from_uuid(cls, uuid: UUID) -> Self:
return cls(ssl_decryption=RefIdItem.from_uuid(uuid))


class AdvancedInspectionProfile(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="forbid")
advanced_inspection_profile: RefIdItem = Field(validation_alias="advancedInspectionProfile")

@classmethod
def from_uuid(cls, uuid: UUID) -> Self:
return cls(advanced_inspection_profile=RefIdItem.from_uuid(uuid))


class PolicyParcel(_ParcelBase):
type_: Literal["policy"] = Field(default="policy", exclude=True)
Expand All @@ -81,7 +151,19 @@ class PolicyParcel(_ParcelBase):
description="Set the parcel description",
)
assembly: List[Union[NgFirewallContainer, SslDecryption, AdvancedInspectionProfile]] = Field(
validation_alias=AliasPath("data", "assembly"), min_length=1
default=[], validation_alias=AliasPath("data", "assembly")
)
settings: PolicySettings = Field(default=None, validation_alias=AliasPath("data", "settings"))

settings: Optional[PolicySettings] = Field(default=None, validation_alias=AliasPath("data", "settings"))
app_hosting: AppHosting = Field(default=None, validation_alias=AliasPath("data", "appHosting"))

def add_ng_firewall_assembly(self, ng_firewall_id: UUID, entries: List[NgFirewallEntry] = []) -> None:
self.assembly.append(
NgFirewallContainer(ng_firewall=NgFirewall(ref_id=as_global(ng_firewall_id), entries=entries))
)

def add_ssl_decryption_assembly(self, ssl_decryption_profile_id: UUID) -> None:
self.assembly.append(SslDecryption.from_uuid(ssl_decryption_profile_id))

def add_advanced_inspection_profile_assembly(self, aip_id: UUID) -> None:
self.assembly.append(AdvancedInspectionProfile.from_uuid(aip_id))
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates
from typing import List, Union
from uuid import UUID

from catalystwan.models.configuration.config_migration import (
ConvertResult,
PolicyConvertContext,
SecurityPolicyResidues,
)
from catalystwan.models.configuration.feature_profile.sdwan.embedded_security import PolicyParcel, PolicySettings
from catalystwan.models.configuration.feature_profile.sdwan.embedded_security.policy import NgFirewallEntry
from catalystwan.models.policy.security import (
AnySecurityPolicyInfo,
NGFirewallAssemblyItem,
SecurityPolicySettings,
ZoneBasedFWAssemblyItem,
)


def convert_security_policy(
in_: AnySecurityPolicyInfo, uuid: UUID, context: PolicyConvertContext
) -> ConvertResult[PolicyParcel]:
convert_result = ConvertResult[PolicyParcel]()

settings = in_.policy_definition.settings
if type(settings) is SecurityPolicySettings:
residues = SecurityPolicyResidues(
high_speed_logging_setting=settings.high_speed_logging,
logging_setting=settings.logging,
zone_to_no_zone_internet_setting=settings.zone_to_no_zone_internet,
platform_match_setting=settings.platform_match,
)
else:
residues = SecurityPolicyResidues(
high_speed_logging_setting=settings.high_speed_logging,
)

context.security_policy_residues[uuid] = residues

try:
convert_result.output = PolicyParcel(
parcel_name=in_.policy_name,
parcel_description=in_.policy_description,
settings=PolicySettings.create(
**in_.policy_definition.settings.model_dump(
exclude={"high_speed_logging", "logging", "zone_to_no_zone_internet", "platform_match"}
)
),
)

for assembly in in_.policy_definition.assembly:
if assembly.type == "zoneBasedFW":
entries = _convert_security_policy_entries(assembly, context)
convert_result.output.add_ng_firewall_assembly(ng_firewall_id=assembly.definition_id, entries=entries)
elif assembly.type == "advancedInspectionProfile":
convert_result.output.add_advanced_inspection_profile_assembly(assembly.definition_id)
elif assembly.type == "sslDecryption":
convert_result.output.add_ssl_decryption_assembly(assembly.definition_id)
else:
convert_result.update_status(
"partial", f"Unknown conversion of security policy assembly type: {assembly.type}"
)
except Exception as e:
convert_result.update_status("failed", f"Cannot convert SecurityPolicy due to an error {e}")

return convert_result


def _convert_security_policy_entries(
assembly: Union[ZoneBasedFWAssemblyItem, NGFirewallAssemblyItem], context: PolicyConvertContext
) -> List[NgFirewallEntry]:
target_entries = []

if assembly.entries:
for entry in assembly.entries:
target_entries.append(NgFirewallEntry.create(entry.src_zone_list_id, entry.dst_zone_list_id))

if firewall_entries_residues := context.zone_based_firewall_residues.get(assembly.definition_id): # change
for entry_2 in firewall_entries_residues:
target_entries.append(NgFirewallEntry.create(entry_2.source_zone_id, entry_2.destination_zone_id))

return target_entries
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Mapping, Type
from typing import Dict, Mapping, Type, Union, cast
from uuid import UUID

from catalystwan.models.configuration.feature_profile.parcel import AnyParcel
Expand All @@ -16,12 +16,13 @@ class ReferencesUpdater(ABC):
def update_references(self):
pass

def get_target_uuid(self, origin_uuid: UUID) -> UUID:
if v2_uuid := self.pushed_objects_map.get(origin_uuid):
return v2_uuid
def get_target_uuid(self, origin_uuid: Union[str, UUID]) -> UUID:
_origin_uuid: UUID = cast(UUID, UUID(origin_uuid) if type(origin_uuid) is str else origin_uuid)
if target_uuid := self.pushed_objects_map.get(_origin_uuid):
return target_uuid

raise CatalystwanConverterCantConvertException(
f"Cannot find transferred policy object based on v1 API id: {origin_uuid}"
f"Cannot find transferred policy object based on v1 API id: {_origin_uuid}"
)


Expand Down
Loading

0 comments on commit 503517c

Please sign in to comment.