Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
progress. todo: figureout workaround for optional parcel name
Browse files Browse the repository at this point in the history
  • Loading branch information
jpkrajewski committed Jul 17, 2024
1 parent 2b753a2 commit c7fd896
Show file tree
Hide file tree
Showing 13 changed files with 427 additions and 21 deletions.
2 changes: 1 addition & 1 deletion catalystwan/api/configuration_groups/parcel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 Cisco Systems, Inc. and its affiliates
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from enum import Enum
from typing import Any, Dict, Generic, List, Literal, Optional, Tuple, TypeVar, get_origin
Expand Down
23 changes: 21 additions & 2 deletions catalystwan/endpoints/configuration/network_hierarchy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

# mypy: disable-error-code="empty-body"
from catalystwan.endpoints import APIEndpoints, get, versions
from catalystwan.models.configuration.network_hierarchy import NodeInfo
from uuid import UUID

from catalystwan.endpoints import APIEndpoints, delete, get, post, versions
from catalystwan.models.configuration.feature_profile.parcel import Parcel, ParcelCreationResponse
from catalystwan.models.configuration.network_hierarchy.cflowd import CflowdParcel
from catalystwan.models.configuration.network_hierarchy.node import NodeInfo
from catalystwan.typed_list import DataSequence


Expand All @@ -11,3 +15,18 @@ class NetworkHierarchy(APIEndpoints):
@versions(">=20.10")
def list_nodes(self) -> DataSequence[NodeInfo]:
...

@post("/v1/network-hierarchy/{node_id}/network-settings/cflowd")
@versions(">20.12")
def create_cflowd(self, node_id: UUID, payload: CflowdParcel) -> ParcelCreationResponse:
...

@get("/v1/network-hierarchy/{node_id}/network-settings/cflowd", resp_json_key="data")
@versions(">20.12")
def get_cflowd(self, node_id: UUID) -> DataSequence[Parcel[CflowdParcel]]:
...

@delete("/v1/network-hierarchy/{node_id}/network-settings/cflowd/{parcel_id}")
@versions(">20.12")
def delete_cflowd(self, node_id: UUID, parcel_id: UUID) -> None:
...
23 changes: 14 additions & 9 deletions catalystwan/integration_tests/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates
import logging
import os
import unittest
from typing import Union
from uuid import UUID, uuid4
Expand All @@ -25,21 +24,27 @@
RUN_ID: str = str(uuid4())[:4]


def create_session() -> ManagerSession:
"""Try to create a session with the environment variables, if it fails, raise an exception"""
def load_config() -> dict:
"""Load the configuration from the environment variables"""
url = os.environ.get("VMANAGE_URL")
port = os.environ.get("VMANAGE_PORT")
username = os.environ.get("VMANAGE_USERNAME")
password = os.environ.get("VMANAGE_PASSWORD")
if url is None or port is None or username is None or password is None:
raise CatalystwanException("Missing environment variables")
return dict(
url=url,
port=port,
username=username,
password=password,
)



def create_session() -> ManagerSession:
"""Try to create a session with the environment variables, if it fails, raise an exception"""
try:
session = create_manager_session(
url=url,
port=int(port),
username=username,
password=password,
)
session = create_manager_session(**load_config())
session.is_for_testing = True
return session
except Exception:
Expand Down
82 changes: 82 additions & 0 deletions catalystwan/integration_tests/test_network_hierarchy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates
import unittest
from typing import Optional
from uuid import UUID

from catalystwan.integration_tests.base import IS_API_20_12, TestCaseBase
from catalystwan.models.configuration.network_hierarchy.cflowd import CflowdParcel


@unittest.skipIf(IS_API_20_12, "cflowd is not supported in 20.12")
class TestCflowd(TestCaseBase):
parcel_id: Optional[UUID] = None
global_node_id: UUID

@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
nodes = cls.session.endpoints.network_hierarchy.list_nodes()
for node in nodes:
if node.data.label == "GLOBAL":
cls.global_node_id = UUID(node.id)
return
raise ValueError("Global node not found")

def test_create_cflowd(self):
# Arrange
protocol = "ipv4"
active_timeout = 1000
inactive_timeout = 3000
refresh_time = 5000
sampling_interval = 6000
collect_tos = False
collect_dscp_output = True
vpn_id = 50
address = "10.0.2.3"
port = 9900
export_spread = True
bfd_metrics_export = True
export_interval = 1000
cflowd = CflowdParcel()
cflowd.add_collector(
address=address,
bfd_metrics_export=bfd_metrics_export,
export_interval=export_interval,
export_spread=export_spread,
udp_port=port,
vpn_id=vpn_id,
)
cflowd.set_customized_ipv4_record_fields(collect_tos=collect_tos, collect_dscp_output=collect_dscp_output)
cflowd.set_flow(
active_timeout=active_timeout,
inactive_timeout=inactive_timeout,
refresh_time=refresh_time,
sampling_interval=sampling_interval,
)
cflowd.set_protocol(protocol)
# Act
self.parcel_id = self.session.endpoints.network_hierarchy.create_cflowd(self.global_node_id, cflowd).id
parcel = (
self.session.endpoints.network_hierarchy.get_cflowd(self.global_node_id)
.find(parcel_id=self.parcel_id)
.payload
)
# Assert
assert parcel.protocol.value == protocol
assert parcel.flow_active_timeout.value == active_timeout
assert parcel.flow_inactive_timeout.value == inactive_timeout
assert parcel.flow_refresh_time.value == refresh_time
assert parcel.flow_sampling_interval.value == sampling_interval
assert parcel.customized_ipv4_record_fields.collect_dscp_output.value == collect_dscp_output
assert parcel.customized_ipv4_record_fields.collect_tos.value == collect_tos
assert parcel.collectors[0].vpn_id.value == vpn_id
assert parcel.collectors[0].address.value == address
assert parcel.collectors[0].udp_port.value == port
assert parcel.collectors[0].bfd_metrics_export.value == bfd_metrics_export
assert parcel.collectors[0].export_interval.value == export_interval
assert parcel.collectors[0].export_spread.value == export_spread

def tearDown(self) -> None:
if self.parcel_id:
self.session.endpoints.network_hierarchy.delete_cflowd(self.global_node_id, self.parcel_id)
super().tearDown()
2 changes: 1 addition & 1 deletion catalystwan/models/configuration/config_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from catalystwan.models.configuration.feature_profile.parcel import AnyParcel, list_types
from catalystwan.models.configuration.feature_profile.sdwan.policy_object import AnyPolicyObjectParcel
from catalystwan.models.configuration.feature_profile.sdwan.service.lan.vpn import LanVpnParcel
from catalystwan.models.configuration.network_hierarchy import NodeInfo
from catalystwan.models.configuration.network_hierarchy.node import NodeInfo
from catalystwan.models.configuration.topology_group import TopologyGroup
from catalystwan.models.policy import AnyPolicyDefinitionInfo, AnyPolicyListInfo, URLAllowListInfo, URLBlockListInfo
from catalystwan.models.policy.centralized import CentralizedPolicyInfo
Expand Down
21 changes: 20 additions & 1 deletion catalystwan/models/configuration/feature_profile/common.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates

from datetime import datetime
from functools import wraps
from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
from typing import List, Literal, Optional, Union
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing_extensions import Self

from catalystwan.api.configuration_groups.parcel import Default, Global, Variable, as_default, as_global
from catalystwan.api.configuration_groups.parcel import (
Default,
Global,
Variable,
as_default,
as_global,
as_optional_global,
)
from catalystwan.models.common import (
CableLengthLongValue,
CableLengthShortValue,
Expand Down Expand Up @@ -817,3 +825,14 @@ class MultilinkNimList(BaseModel):
"time-exceeded",
"unreachable",
]


def arguments_as_optional_global(func):
wraps(func)

def wrapper(self, *args, **kwargs):
new_args = [as_optional_global(a) for a in args]
new_kwargs = {k: as_optional_global(v) for k, v in kwargs.items()}
return func(self, *new_args, **new_kwargs)

return wrapper
25 changes: 19 additions & 6 deletions catalystwan/models/configuration/feature_profile/parcel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2023 Cisco Systems, Inc. and its affiliates
# Copyright 2024 Cisco Systems, Inc. and its affiliates
from functools import lru_cache
from typing import Generic, List, Literal, Sequence, TypeVar, Union, cast
from typing import Generic, List, Literal, Optional, Sequence, TypeVar, Union, cast
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field, model_validator
Expand All @@ -18,6 +18,7 @@
from catalystwan.models.configuration.feature_profile.sdwan.system import AnySystemParcel
from catalystwan.models.configuration.feature_profile.sdwan.topology import AnyTopologyParcel
from catalystwan.models.configuration.feature_profile.sdwan.transport import AnyTransportParcel
from catalystwan.models.configuration.network_hierarchy import AnyNetworkHierarchy
from catalystwan.utils.model import resolve_nested_base_model_unions

ParcelType = Literal[
Expand All @@ -31,6 +32,7 @@
"bfd",
"bgp",
"cellular-controller",
"cflowd",
"class",
"color",
"config",
Expand Down Expand Up @@ -128,6 +130,7 @@
AnyApplicationPriorityParcel,
AnyTopologyParcel,
AnyRoutingParcel,
AnyNetworkHierarchy,
],
Field(discriminator="type_"),
]
Expand All @@ -136,21 +139,31 @@


class Parcel(BaseModel, Generic[T]):
parcel_id: Union[str, UUID] = Field(alias="parcelId")
parcel_id: UUID = Field(alias="parcelId")
parcel_type: ParcelType = Field(alias="parcelType")
created_by: str = Field(alias="createdBy")
last_updated_by: str = Field(alias="lastUpdatedBy")
last_updated_by: Optional[str] = Field(default=None, alias="lastUpdatedBy")
created_on: int = Field(alias="createdOn")
last_updated_on: int = Field(alias="lastUpdatedOn")
last_updated_on: Optional[int] = Field(default=None, alias="lastUpdatedOn")
payload: T

@model_validator(mode="before")
def validate_payload(cls, data):
if not isinstance(data, dict):
return data
data["payload"]["type_"] = data["parcelType"]
type_ = data.get("parcelType") or data.get("type")
id_ = data.get("parcelId") or data.get("id")
assert type_ is not None
assert id_ is not None
data["parcelType"] = type_
data["payload"]["type_"] = type_
data["parcelId"] = id_
return data

def model_post_init(self, __context):
"""We do not want to store the JSON as dict data in the 'data' field, as it is already deserialized"""
self.payload.data = None


class Header(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates
from ipaddress import IPv4Address, IPv4Interface
from typing import List, Literal, Optional, Tuple, Union
from uuid import UUID
Expand Down Expand Up @@ -313,7 +314,7 @@ class Ipv4AclParcel(_ParcelBase):
default=Default[Literal["drop"]](value="drop"), validation_alias=AliasPath("data", "defaultAction")
)
sequences: List[Sequence] = Field(
default=[], validation_alias=AliasPath("data", "sequences"), description="Access Control List"
default_factory=list, validation_alias=AliasPath("data", "sequences"), description="Access Control List"
)

def set_default_action(self, action: AcceptDropActionType):
Expand Down
22 changes: 22 additions & 0 deletions catalystwan/models/configuration/network_hierarchy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2024 Cisco Systems, Inc. and its affiliates
from typing import List, Union

from pydantic import Field
from typing_extensions import Annotated

from .cflowd import CflowdParcel
from .node import NodeInfo

AnyNetworkHierarchy = Annotated[
Union[CflowdParcel],
Field(discriminator="type_"),
]

__all__ = [
"CflowdParcel",
"NodeInfo",
]


def __dir__() -> "List[str]":
return list(__all__)
Loading

0 comments on commit c7fd896

Please sign in to comment.