Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Generic references updater (#742)
Browse files Browse the repository at this point in the history
  • Loading branch information
radkrawczyk authored Jun 28, 2024
1 parent 503517c commit 5594193
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 206 deletions.
1 change: 1 addition & 0 deletions catalystwan/api/configuration_groups/parcel.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ParcelAttribute(BaseModel):
# https://github.com/pydantic/pydantic/discussions/6090
# Usage: Global[str](value="test")
class Global(ParcelAttribute, Generic[T]):
model_config = ConfigDict(populate_by_name=True)
option_type: OptionType = Field(
default=OptionType.GLOBAL, serialization_alias="optionType", validation_alias="optionType"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class PolicySettings(BaseModel):
failure_mode: Optional[Global[FailureMode]] = Field(
default=None, validation_alias="failureMode", serialization_alias="failureMode"
)
security_logging: NetworkSettings = Field(
security_logging: Optional[NetworkSettings] = Field(
default=None, validation_alias="securityLogging", serialization_alias="securityLogging"
)

Expand Down Expand Up @@ -155,7 +155,7 @@ class PolicyParcel(_ParcelBase):
)

settings: Optional[PolicySettings] = Field(default=None, validation_alias=AliasPath("data", "settings"))
app_hosting: AppHosting = Field(default=None, validation_alias=AliasPath("data", "appHosting"))
app_hosting: Optional[AppHosting] = Field(default=None, validation_alias=AliasPath("data", "appHosting"))

def add_ng_firewall_assembly(self, ng_firewall_id: UUID, entries: List[NgFirewallEntry] = []) -> None:
self.assembly.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ class UrlFilteringParcel(_ParcelBase):
url_allowed_list: Optional[RefIdItem] = Field(default=None, validation_alias=AliasPath("data", "urlAllowedList"))
url_blocked_list: Optional[RefIdItem] = Field(default=None, validation_alias=AliasPath("data", "urlBlockedList"))
block_page_action: Global[BlockPageAction] = Field(validation_alias=AliasPath("data", "blockPageAction"))
block_page_contents: Global[str] = Field(default=None, validation_alias=AliasPath("data", "blockPageContents"))
redirect_url: Global[str] = Field(default=None, validation_alias=AliasPath("data", "redirectUrl"))
block_page_contents: Optional[Global[str]] = Field(
default=None, validation_alias=AliasPath("data", "blockPageContents")
)
redirect_url: Optional[Global[str]] = Field(default=None, validation_alias=AliasPath("data", "redirectUrl"))
enable_alerts: Global[bool] = Field(validation_alias=AliasPath("data", "enableAlerts"))
alerts: Optional[Global[List[Alerts]]] = Field(default=None, validation_alias=AliasPath("data", "alerts"))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates
import unittest
from typing import Dict
from uuid import UUID, uuid4

from catalystwan.models.configuration.feature_profile.sdwan.policy_object.security.aip import (
AdvancedInspectionProfileParcel,
)
from catalystwan.utils.config_migration.creators.references_updater import update_parcel_references


class TestReferencesUpdater(unittest.TestCase):
def test_update_parcel_references(self):
intrusion_prevention_origin_ref = uuid4()
amp_origin_ref = uuid4()
url_filtering_origin_ref = uuid4()

pushed_objects: Dict[UUID, UUID] = {
intrusion_prevention_origin_ref: uuid4(),
amp_origin_ref: uuid4(),
url_filtering_origin_ref: uuid4(),
}

aip_parcel = AdvancedInspectionProfileParcel.create(
"aip1",
"description1",
intrusion_prevention=intrusion_prevention_origin_ref,
advanced_malware_protection=amp_origin_ref,
url_filtering=url_filtering_origin_ref,
)

updated_parcel = update_parcel_references(aip_parcel, pushed_objects)

assert updated_parcel is not aip_parcel
assert updated_parcel.intrusion_prevention.ref_id.value == str(pushed_objects[intrusion_prevention_origin_ref])
assert updated_parcel.advanced_malware_protection.ref_id.value == str(pushed_objects[amp_origin_ref])
assert updated_parcel.url_filtering.ref_id.value == str(pushed_objects[url_filtering_origin_ref])
assert updated_parcel.ssl_decryption_profile is None
assert updated_parcel.tls_decryption_action == aip_parcel.tls_decryption_action
assert updated_parcel.parcel_name == aip_parcel.parcel_name
assert updated_parcel.parcel_description == aip_parcel.parcel_description

def test_not_update_parcel_references(self):
intrusion_prevention_origin_ref = uuid4()
amp_origin_ref = uuid4()
url_filtering_origin_ref = uuid4()

pushed_objects: Dict[UUID, UUID] = {}

aip_parcel = AdvancedInspectionProfileParcel.create(
"aip1",
"description1",
intrusion_prevention=intrusion_prevention_origin_ref,
advanced_malware_protection=amp_origin_ref,
url_filtering=url_filtering_origin_ref,
)

updated_parcel = update_parcel_references(aip_parcel, pushed_objects)

assert updated_parcel is aip_parcel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from catalystwan.api.feature_profile_api import PolicyObjectFeatureProfileAPI
from catalystwan.exceptions import ManagerHTTPError
from catalystwan.models.configuration.config_migration import PushContext, UX2Config, UX2ConfigPushResult
from catalystwan.models.configuration.feature_profile.common import FeatureProfileCreationPayload, RefIdItem
from catalystwan.models.configuration.feature_profile.common import FeatureProfileCreationPayload
from catalystwan.models.configuration.feature_profile.parcel import AnyParcel, Parcel, list_types
from catalystwan.models.configuration.feature_profile.sdwan.policy_object import (
AdvancedInspectionProfileParcel,
Expand All @@ -17,66 +17,10 @@
from catalystwan.session import ManagerSession
from catalystwan.typed_list import DataSequence

from .references_updater import ReferencesUpdater, update_parcels_references
from .references_updater import update_parcel_references

logger = logging.getLogger(__name__)


class UrlFilteringReferencesUpdater(ReferencesUpdater):
def update_references(self):
if allowed_list := self.parcel.url_allowed_list:
v2_uuid = self.get_target_uuid(UUID(allowed_list.ref_id.value))
self.parcel.url_allowed_list = RefIdItem.from_uuid(v2_uuid)

if blocked_list := self.parcel.url_blocked_list:
v2_uuid = self.get_target_uuid(UUID(blocked_list.ref_id.value))
self.parcel.url_blocked_list = RefIdItem.from_uuid(v2_uuid)


class SslProfileReferencesUpdater(ReferencesUpdater):
def update_references(self):
if allowed_list := self.parcel.url_allowed_list:
v2_uuid = self.get_target_uuid(UUID(allowed_list.ref_id.value))
self.parcel.url_allowed_list = RefIdItem.from_uuid(v2_uuid)

if blocked_list := self.parcel.url_blocked_list:
v2_uuid = self.get_target_uuid(UUID(blocked_list.ref_id.value))
self.parcel.url_blocked_list = RefIdItem.from_uuid(v2_uuid)


class IntrusionPreventionReferencesUpdater(ReferencesUpdater):
def update_references(self):
if allowed_list := self.parcel.signature_allowed_list:
v2_uuid = self.get_target_uuid(UUID(allowed_list.ref_id.value))
self.parcel.signature_allowed_list = RefIdItem.from_uuid(v2_uuid)


class AdvancedInspectionProfileReferencesUpdater(ReferencesUpdater):
def update_references(self):
if advanced_malware_protection := self.parcel.advanced_malware_protection:
v2_uuid = self.get_target_uuid(UUID(advanced_malware_protection.ref_id.value))
self.parcel.advanced_malware_protection = RefIdItem.from_uuid(v2_uuid)

if intrusion_prevention := self.parcel.intrusion_prevention:
v2_uuid = self.get_target_uuid(UUID(intrusion_prevention.ref_id.value))
self.parcel.intrusion_prevention = RefIdItem.from_uuid(v2_uuid)

if ssl_decryption_profile := self.parcel.ssl_decryption_profile:
v2_uuid = self.get_target_uuid(UUID(ssl_decryption_profile.ref_id.value))
self.parcel.ssl_decryption_profile = RefIdItem.from_uuid(v2_uuid)

if url_filtering := self.parcel.url_filtering:
v2_uuid = self.get_target_uuid(UUID(url_filtering.ref_id.value))
self.parcel.url_filtering = RefIdItem.from_uuid(v2_uuid)


REFERENCES_UPDATER_MAPPING: Mapping[type, Type[ReferencesUpdater]] = {
UrlFilteringParcel: UrlFilteringReferencesUpdater,
SslDecryptionProfileParcel: SslProfileReferencesUpdater,
IntrusionPreventionParcel: IntrusionPreventionReferencesUpdater,
AdvancedInspectionProfileParcel: AdvancedInspectionProfileReferencesUpdater,
}

POLICY_OBJECTS_PUSH_ORDER: Mapping[Type[AnyParcel], int] = {
UrlFilteringParcel: 1,
SslDecryptionProfileParcel: 1,
Expand Down Expand Up @@ -153,8 +97,7 @@ def push(self) -> None:
continue

try:
update_parcels_references(parcel, self.push_context.id_lookup, REFERENCES_UPDATER_MAPPING)

parcel = update_parcel_references(parcel, self.push_context.id_lookup)
parcel_id = self._policy_object_api.create(profile_id=profile_id, payload=parcel).id
profile_rollback.add_parcel(parcel.type_, parcel_id)
self._push_result.report.groups_of_interest.add_created(parcel.parcel_name, parcel_id)
Expand Down
47 changes: 21 additions & 26 deletions catalystwan/utils/config_migration/creators/references_updater.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Mapping, Type, Union, cast
import logging
from typing import Dict, TypeVar
from uuid import UUID

from catalystwan.models.configuration.feature_profile.parcel import AnyParcel
from catalystwan.utils.config_migration.converters.exceptions import CatalystwanConverterCantConvertException
from pydantic import BaseModel, ValidationError
from pydantic_core import from_json

logger = logging.getLogger(__name__)

@dataclass
class ReferencesUpdater(ABC):
parcel: AnyParcel
pushed_objects_map: Dict[UUID, UUID]
T = TypeVar("T", bound=BaseModel)

@abstractmethod
def update_references(self):
pass

def get_target_uuid(self, origin_uuid: Union[str, UUID]) -> UUID:
_origin_uuid: UUID = cast(UUID, UUID(origin_uuid) if type(origin_uuid) is str else origin_uuid)
if target_uuid := self.pushed_objects_map.get(_origin_uuid):
return target_uuid
def update_parcel_references(parcel: T, uuid_map: Dict[UUID, UUID]) -> T:
t = type(parcel)
origin_dump = target_dump = parcel.model_dump_json(by_alias=True)
pattern = '"{}"'

raise CatalystwanConverterCantConvertException(
f"Cannot find transferred policy object based on v1 API id: {_origin_uuid}"
)
for origin_uuid, target_uuid in uuid_map.items():
origin_uuid_str = pattern.format(str(origin_uuid))
target_uuid_str = pattern.format(str(target_uuid))
target_dump = target_dump.replace(origin_uuid_str, target_uuid_str)

if origin_dump == target_dump:
return parcel

def update_parcels_references(
parcel: AnyParcel,
pushed_objects_map: Dict[UUID, UUID],
updaters_map: Mapping[Type[AnyParcel], Type[ReferencesUpdater]],
) -> None:
if ref_updater := updaters_map.get(type(parcel)):
ref_updater(parcel, pushed_objects_map=pushed_objects_map).update_references()
try:
return t.model_validate(from_json(target_dump))
except ValidationError as e:
logging.error(f"Cannot validate model after references update: {e}")
raise e
Loading

0 comments on commit 5594193

Please sign in to comment.