Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sbasan committed Jul 3, 2024
1 parent b265541 commit c8daa91
Showing 1 changed file with 49 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import Field
from typing_extensions import Annotated

from catalystwan.api.builders.feature_profiles.report import FeatureProfileBuildReport
from catalystwan.endpoints.configuration_group import ConfigGroup
from catalystwan.exceptions import ManagerHTTPError
from catalystwan.models.configuration.config_migration import (
Expand All @@ -15,6 +16,7 @@
)
from catalystwan.models.configuration.feature_profile.sdwan.acl.ipv4acl import Ipv4AclParcel
from catalystwan.models.configuration.feature_profile.sdwan.acl.ipv6acl import Ipv6AclParcel
from catalystwan.models.configuration.feature_profile.sdwan.service.route_policy import RoutePolicyParcel
from catalystwan.models.configuration.feature_profile.sdwan.system.device_access import DeviceAccessIPv4Parcel
from catalystwan.models.configuration.feature_profile.sdwan.system.device_access_ipv6 import DeviceAccessIPv6Parcel
from catalystwan.models.configuration.profile_type import ProfileType
Expand All @@ -37,6 +39,8 @@
Field(discriminator="type_"),
]

ProfileInfo = Tuple[ProfileType, UUID, str, List[TransformedParcel]]

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -75,47 +79,63 @@ def _create_parcel_by_id_lookup(self) -> Dict[UUID, TransformedParcel]:
lookup[transformed_parcel.header.origin] = transformed_parcel
return lookup

def get_parcels_to_push(self, parcel_ids: List[UUID]) -> List[TransformedParcel]:
def _create_profile_report_by_id_lookup(self, profile_ids: List[UUID]) -> Dict[UUID, FeatureProfileBuildReport]:
lookup: Dict[UUID, FeatureProfileBuildReport] = dict()
for cg_report in self._push_result.report.config_groups:
for fp_report in cg_report.feature_profiles:
if fp_report.profile_uuid in profile_ids:
lookup[fp_report.profile_uuid] = fp_report
return lookup

def _get_parcels_to_push(self, parcel_ids: List[UUID]) -> List[TransformedParcel]:
result: List[TransformedParcel] = list()
for parcel_id in parcel_ids:
if parcel := self._parcel_by_id.get(parcel_id):
result.append(parcel)
return result

def find_config_groups_to_update(self) -> List[UUID]:
def _find_config_groups_to_update(self) -> List[UUID]:
result: List[UUID] = list()
for transformed_cg in self._ux2_config.config_groups:
if transformed_cg.header.localized_policy_subelements is not None:
updated_id = self.push_context.id_lookup[transformed_cg.header.origin]
result.append(updated_id)
return result

def get_config_group_contents(self, cg_ids: List[UUID]) -> Dict[UUID, ConfigGroup]:
def _get_config_group_contents(self, cg_ids: List[UUID]) -> Dict[UUID, ConfigGroup]:
result: Dict[UUID, ConfigGroup] = dict()
for cg_id in cg_ids:
cg = self._cg_api.get(cg_id)
result[cg_id] = cg
return result

def find_profiles_to_update(self) -> List[Tuple[ProfileType, UUID, List[TransformedParcel]]]:
profiles: List[Tuple[ProfileType, UUID, List[TransformedParcel]]] = list()
def _find_profiles_to_update(self) -> List[ProfileInfo]:
profiles: List[ProfileInfo] = list()
for transformed_profile in self._ux2_config.feature_profiles:
if transformed_profile.header.localized_policy_subelements is not None:
profile_type = cast(ProfileType, transformed_profile.header.type)
name = transformed_profile.feature_profile.name
updated_id = self.push_context.id_lookup[transformed_profile.header.origin]
parcels = self.get_parcels_to_push(list(transformed_profile.header.localized_policy_subelements))
profiles.append((cast(ProfileType, transformed_profile.header.type), updated_id, parcels))
parcels = self._get_parcels_to_push(list(transformed_profile.header.localized_policy_subelements))
profiles.append((profile_type, updated_id, name, parcels))
return profiles

def update_system_profile(self, profile_id: UUID, device_access: AnyDeviceAccessParcel):
def update_system_profile(
self, profile_id: UUID, device_access: AnyDeviceAccessParcel, report: FeatureProfileBuildReport
):
try:
self._system_api.create_parcel(
parcel_id = self._system_api.create_parcel(
profile_id=profile_id, payload=update_parcel_references(device_access, self.push_context.id_lookup)
)
).id
report.add_created_parcel(parcel_name=device_access.parcel_name, parcel_uuid=parcel_id)
except ManagerHTTPError as e:
logger.error(f"Error occured during creation of {device_access.type_} {device_access.parcel_name}: {e}")
report.add_failed_parcel(
parcel_name=device_access.parcel_name, parcel_type=device_access.type_, error_info=e.info
)

def associate_config_groups_with_default_policy_object_profile(self):
for cg_id, cg in self.get_config_group_contents(self.find_config_groups_to_update()).items():
for cg_id, cg in self._get_config_group_contents(self._find_config_groups_to_update()).items():
profile_ids = [p.id for p in cg.profiles]
profile_ids.append(self.push_context.default_policy_object_profile_id)
try:
Expand All @@ -130,10 +150,24 @@ def associate_config_groups_with_default_policy_object_profile(self):
logger.error(f"Error occured during config group edit: {e}")

def push(self):
self._progress("Associating Config Groups with Default Policy Object Profile", 0, 1)
self.associate_config_groups_with_default_policy_object_profile()
for profile_tuple in self.find_profiles_to_update():
type_, uuid_, transformed_parcels = profile_tuple
self._progress("Associating Config Groups with Default Policy Object Profile", 1, 1)
profile_infos = self._find_profiles_to_update()
profile_ids = [t[1] for t in profile_infos]
profile_reports = self._create_profile_report_by_id_lookup(profile_ids)
for i, profile_info in enumerate(profile_infos):
profile_type, profile_id, profile_name, transformed_parcels = profile_info
self._progress(f"Updating {profile_name} profile with policy parcels", i + 1, len(profile_ids))
for transformed_parcel in transformed_parcels:
parcel = transformed_parcel.parcel
if type_ == "system" and isinstance(parcel, (DeviceAccessIPv4Parcel, DeviceAccessIPv6Parcel)):
self.update_system_profile(profile_id=uuid_, device_access=parcel)
if profile_type == "system" and isinstance(parcel, (DeviceAccessIPv4Parcel, DeviceAccessIPv6Parcel)):
self.update_system_profile(
profile_id=profile_id, device_access=parcel, report=profile_reports[profile_id]
)
elif profile_type == "service" and isinstance(parcel, RoutePolicyParcel):
logger.warning(f"not implemented: adding {parcel.parcel_name} parcel to profile: {profile_id}")
elif profile_type == "transport" and isinstance(parcel, (Ipv4AclParcel, Ipv6AclParcel)):
logger.warning(f"not implemented: adding {parcel.parcel_name} parcel to profile: {profile_id}")
else:
logger.warning(f"Unexpected profile type {profile_type} to add Localized Policy items, skipping")

0 comments on commit c8daa91

Please sign in to comment.