Skip to content

Commit

Permalink
[NDM] [Cisco ACI] Refactor batched payloads + use interface ID if nam…
Browse files Browse the repository at this point in the history
…e not available (#18360) (#18370)

* Refactor batch payloads for sending NDM metadata

* Interfaces use id if name is not specified

* Add changelog

(cherry picked from commit cad9149)

Co-authored-by: zoe ✨ <[email protected]>
  • Loading branch information
1 parent 54ce108 commit 54ea3e2
Show file tree
Hide file tree
Showing 6 changed files with 1,099 additions and 560 deletions.
1 change: 1 addition & 0 deletions cisco_aci/changelog.d/18360.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[NDM] [Cisco ACI] Refactor batched payloads to fix incorrect status + use interface ID if name not available
88 changes: 12 additions & 76 deletions cisco_aci/datadog_checks/cisco_aci/fabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,15 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)


from six import PY3, iteritems

from datadog_checks.base.utils.serialization import json

if PY3:
import time

from datadog_checks.cisco_aci.models import DeviceMetadata, InterfaceMetadata, NetworkDevicesMetadata, Node, PhysIf

else:
DeviceMetadata = None
Eth = None
InterfaceMetadata = None
Node = None

from . import aci_metrics, exceptions, helpers
from . import aci_metrics, exceptions, helpers, ndm

VENDOR_CISCO = 'cisco'
PAYLOAD_METADATA_BATCH_SIZE = 100
Expand Down Expand Up @@ -58,7 +51,7 @@ def collect(self):
devices, interfaces = self.submit_nodes_health_and_metadata(fabric_nodes, pods)
if self.ndm_enabled():
collect_timestamp = int(time.time())
batches = self.batch_payloads(devices, interfaces, collect_timestamp)
batches = ndm.batch_payloads(self.namespace, devices, interfaces, collect_timestamp)
for batch in batches:
self.event_platform_event(json.dumps(batch.model_dump(exclude_none=True)), "network-devices-metadata")

Expand Down Expand Up @@ -94,7 +87,7 @@ def submit_nodes_health_and_metadata(self, nodes, pods):

user_tags = self.instance.get('tags', [])
tags = self.tagger.get_fabric_tags(n, 'fabricNode')
tags.extend(self.ndm_common_tags(node_attrs.get('address', ''), hostname, self.namespace))
tags.extend(ndm.common_tags(node_attrs.get('address', ''), hostname, self.namespace))
self.external_host_tags[hostname] = tags + self.check_tags + user_tags

pod_id = helpers.get_pod_from_dn(node_attrs['dn'])
Expand All @@ -103,7 +96,7 @@ def submit_nodes_health_and_metadata(self, nodes, pods):
self.log.info("processing node %s on pod %s", node_id, pod_id)
try:
if self.ndm_enabled():
device_metadata.append(self.submit_node_metadata(node_attrs, tags))
device_metadata.append(ndm.create_node_metadata(node_attrs, tags, self.namespace))
self.submit_process_metric(n, tags + self.check_tags + user_tags, hostname=hostname)
except (exceptions.APIConnectionException, exceptions.APIParsingException):
pass
Expand All @@ -123,7 +116,7 @@ def process_eth(self, node):
self.log.info("processing ethernet ports for %s", node.get('id'))
hostname = helpers.get_fabric_hostname(node)
pod_id = helpers.get_pod_from_dn(node['dn'])
common_tags = self.ndm_common_tags(node.get('address', ''), hostname, self.namespace)
common_tags = ndm.common_tags(node.get('address', ''), hostname, self.namespace)
try:
eth_list = self.api.get_eth_list(pod_id, node['id'])
except (exceptions.APIConnectionException, exceptions.APIParsingException):
Expand All @@ -135,7 +128,9 @@ def process_eth(self, node):
tags = self.tagger.get_fabric_tags(e, 'l1PhysIf')
tags.extend(common_tags)
if self.ndm_enabled():
interfaces.append(self.create_interface_metadata(e, node.get('address', ''), tags, hostname))
interface_metadata = ndm.create_interface_metadata(e, node.get('address', ''), self.namespace)
interfaces.append(interface_metadata)
self.submit_interface_status_metric(interface_metadata.status, tags, hostname)
try:
stats = self.api.get_eth_stats(pod_id, node['id'], eth_id)
self.submit_fabric_metric(stats, tags, 'l1PhysIf', hostname=hostname)
Expand Down Expand Up @@ -253,67 +248,8 @@ def get_fabric_type(self, obj_type):
if obj_type == 'l1PhysIf':
return 'port'

def batch_payloads(self, devices, interfaces, collect_ts):
for device in devices:
yield NetworkDevicesMetadata(namespace=self.namespace, devices=[device], collect_timestamp=collect_ts)

payloads = []
for interface in interfaces:
if len(payloads) == PAYLOAD_METADATA_BATCH_SIZE:
yield NetworkDevicesMetadata(
namespace=self.namespace, interfaces=payloads, collect_timestamp=collect_ts
)
payloads = []
payloads.append(interface)
if payloads:
yield NetworkDevicesMetadata(namespace=self.namespace, interfaces=payloads, collect_timestamp=collect_ts)

def submit_node_metadata(self, node_attrs, tags):
node = Node(attributes=node_attrs)
hostname = helpers.get_hostname_from_dn(node.attributes.dn)
id_tags = self.ndm_common_tags(node.attributes.address, hostname, self.namespace)
device_tags = [
'device_vendor:{}'.format(VENDOR_CISCO),
"source:cisco-aci",
]
device = DeviceMetadata(
id='{}:{}'.format(self.namespace, node.attributes.address),
id_tags=id_tags,
tags=device_tags + tags,
name=hostname,
ip_address=node.attributes.address,
model=node.attributes.model,
fabric_st=node.attributes.fabric_st,
vendor=VENDOR_CISCO,
version=node.attributes.version,
serial_number=node.attributes.serial,
device_type=node.attributes.device_type,
)
return device.model_dump(exclude_none=True)

def create_interface_metadata(self, phys_if, address, tags, hostname):
eth = PhysIf(**phys_if.get('l1PhysIf', {}))
interface = InterfaceMetadata(
device_id='{}:{}'.format(self.namespace, address),
id_tags=['interface:{}'.format(eth.attributes.name)],
index=eth.attributes.id,
name=eth.attributes.name,
description=eth.attributes.desc,
mac_address=eth.attributes.router_mac,
admin_status=eth.attributes.admin_st,
)
if eth.ethpm_phys_if:
interface.oper_status = eth.ethpm_phys_if.attributes.oper_st
if interface.status:
def submit_interface_status_metric(self, status, tags, hostname):
if status:
new_tags = tags.copy()
new_tags.extend(["port.status:{}".format(interface.status)])
new_tags.extend(["port.status:{}".format(status)])
self.gauge('cisco_aci.fabric.port.status', 1, tags=new_tags, hostname=hostname)
return interface.model_dump(exclude_none=True)

def ndm_common_tags(self, address, hostname, namespace):
return [
'device_ip:{}'.format(address),
'device_namespace:{}'.format(namespace),
'device_hostname:{}'.format(hostname),
'device_id:{}:{}'.format(namespace, address),
]
30 changes: 29 additions & 1 deletion cisco_aci/datadog_checks/cisco_aci/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from enum import IntEnum, StrEnum
from typing import Optional

from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator
from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator, model_validator

"""
Cisco ACI Response Models
"""

class NodeAttributes(BaseModel):
address: Optional[str] = None
Expand Down Expand Up @@ -45,6 +49,16 @@ class L1PhysIfAttributes(BaseModel):
desc: Optional[str] = None
router_mac: Optional[str] = Field(default=None, alias="routerMac")

@model_validator(mode='before')
@classmethod
def validate_name(cls, data: dict) -> dict:
if isinstance(data, dict):
name = data.get('name')
id = data.get('id')
if not name or name == '':
data['name'] = id
return data

class PhysIf(BaseModel):
attributes: L1PhysIfAttributes
children: Optional[list] = Field(default_factory=list)
Expand All @@ -57,6 +71,10 @@ def ethpm_phys_if(self) -> Optional[EthpmPhysIf]:
return EthpmPhysIf(**child['ethpmPhysIf'])
return None

"""
NDM Models
"""

class DeviceMetadata(BaseModel):
id: Optional[str] = Field(default=None)
id_tags: list = Field(default_factory=list)
Expand Down Expand Up @@ -158,3 +176,13 @@ class NetworkDevicesMetadata(BaseModel):
devices: Optional[list[DeviceMetadata]] = Field(default_factory=list)
interfaces: Optional[list[InterfaceMetadata]] = Field(default_factory=list)
collect_timestamp: Optional[int] = None
size: Optional[int] = Field(default=0, exclude=True)

model_config = ConfigDict(validate_assignment=True, use_enum_values=True)

def append_metadata(self, metadata: DeviceMetadata | InterfaceMetadata):
if isinstance(metadata, DeviceMetadata):
self.devices.append(metadata)
if isinstance(metadata, InterfaceMetadata):
self.interfaces.append(metadata)
self.size += 1
123 changes: 123 additions & 0 deletions cisco_aci/datadog_checks/cisco_aci/ndm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# (C) Datadog, Inc. 2024-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)


from six import PY3

if PY3:
from datadog_checks.cisco_aci.models import (
DeviceMetadata,
InterfaceMetadata,
NetworkDevicesMetadata,
Node,
PhysIf,
)

from . import helpers

VENDOR_CISCO = 'cisco'
PAYLOAD_METADATA_BATCH_SIZE = 100


def create_node_metadata(node_attrs, tags, namespace):
"""
Create a DeviceMetadata object from a node's attributes
"""
node = Node(attributes=node_attrs)
hostname = helpers.get_hostname_from_dn(node.attributes.dn)
id_tags = common_tags(node.attributes.address, hostname, namespace)
device_tags = [
'device_vendor:{}'.format(VENDOR_CISCO),
"source:cisco-aci",
]
device = DeviceMetadata(
id='{}:{}'.format(namespace, node.attributes.address),
id_tags=id_tags,
tags=device_tags + tags,
name=hostname,
ip_address=node.attributes.address,
model=node.attributes.model,
fabric_st=node.attributes.fabric_st,
vendor=VENDOR_CISCO,
version=node.attributes.version,
serial_number=node.attributes.serial,
device_type=node.attributes.device_type,
)
return device


def create_interface_metadata(phys_if, address, namespace):
"""
Create an InterfaceMetadata object from a physical interface
"""
eth = PhysIf(**phys_if.get('l1PhysIf', {}))
interface = InterfaceMetadata(
device_id='{}:{}'.format(namespace, address),
id_tags=['interface:{}'.format(eth.attributes.name)],
index=eth.attributes.id,
name=eth.attributes.name,
description=eth.attributes.desc,
mac_address=eth.attributes.router_mac,
admin_status=eth.attributes.admin_st,
)
if eth.ethpm_phys_if:
interface.oper_status = eth.ethpm_phys_if.attributes.oper_st

return interface


def get_device_info(device):
"""
Get device ID and node ID from a device object
"""
for tag in device.tags:
if tag.startswith('node_id'):
node_id = tag.split(':')[1]
break
return device.id, node_id


def batch_payloads(namespace, devices, interfaces, collect_ts):
"""
Batch payloads into NetworkDevicesMetadata objects
"""
network_devices_metadata = NetworkDevicesMetadata(namespace=namespace, collect_timestamp=collect_ts)
for device in devices:
current_payload, new_payload = append_to_payload(device, network_devices_metadata, namespace, collect_ts)
if new_payload:
yield current_payload
network_devices_metadata = new_payload

for interface in interfaces:
current_payload, new_payload = append_to_payload(interface, network_devices_metadata, namespace, collect_ts)
if new_payload:
yield current_payload
network_devices_metadata = new_payload

yield network_devices_metadata


def append_to_payload(item, current_payload, namespace, collect_ts):
"""
Append metadata to a NetworkDevicesMetadata payload, creating a new payload if batch size is reached
"""
if current_payload.size < PAYLOAD_METADATA_BATCH_SIZE:
current_payload.append_metadata(item)
return current_payload, None
else:
new_payload = NetworkDevicesMetadata(namespace=namespace, collect_timestamp=collect_ts)
new_payload.append_metadata(item)
return current_payload, new_payload


def common_tags(address, hostname, namespace):
"""
Return a list of common tags (following NDM standards) for a device
"""
return [
'device_ip:{}'.format(address),
'device_namespace:{}'.format(namespace),
'device_hostname:{}'.format(hostname),
'device_id:{}:{}'.format(namespace, address),
]
Loading

0 comments on commit 54ea3e2

Please sign in to comment.