Skip to content

Commit

Permalink
Version 2.0.1 (#97)
Browse files Browse the repository at this point in the history
* Fix image upload v2 (#91)

* Add upload_image functionality to new beta script

* Updated for new location of files with Boolean flag for front/rear image

* fixing the requests verify missing issue #95 (#96)

---------

Co-authored-by: J vanBemmel <[email protected]>
  • Loading branch information
danner26 and jbemmel authored Jul 31, 2023
1 parent 362056e commit 8ad3ed7
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 45 deletions.
1 change: 1 addition & 0 deletions nb-dt-import.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def main():
settings.handle.verbose_log(
f'Script took {(datetime.now() - startTime)} to run')
settings.handle.log(f'{netbox.counter["added"]} devices created')
settings.handle.log(f'{netbox.counter["images"]} images uploaded')
settings.handle.log(
f'{netbox.counter["updated"]} interfaces/ports updated')
settings.handle.log(
Expand Down
137 changes: 92 additions & 45 deletions netbox_api.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from collections import Counter
import pynetbox
import requests
import os
import glob
# from pynetbox import RequestError as APIRequestError

class NetBox:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)

def __init__(self, settings):
self.counter = Counter(
added=0,
updated=0,
manufacturer=0,
module_added=0,
module_port_added=0,
images=0,
)
self.url = settings.NETBOX_URL
self.token = settings.NETBOX_TOKEN
Expand All @@ -25,7 +28,7 @@ def __init__(self, settings):
self.verify_compatibility()
self.existing_manufacturers = self.get_manufacturers()
self.device_types = DeviceTypes(self.netbox, self.handle, self.counter)

def connect_api(self):
try:
self.netbox = pynetbox.api(self.url, token=self.token)
Expand All @@ -35,13 +38,13 @@ def connect_api(self):
self.netbox.http_session.verify = False
except Exception as e:
self.handle.exception("Exception", 'NetBox API Error', e)

def get_api(self):
return self.netbox

def get_counter(self):
return self.counter

def verify_compatibility(self):
# nb.version should be the version in the form '3.2'
version_split = [int(x) for x in self.netbox.version.split('.')]
Expand All @@ -50,10 +53,10 @@ def verify_compatibility(self):
# Might want to check for the module-types entry as well?
if version_split[0] > 3 or (version_split[0] == 3 and version_split[1] >= 2):
self.modules = True

def get_manufacturers(self):
return {str(item): item for item in self.netbox.dcim.manufacturers.all()}

def create_manufacturers(self, vendors):
to_create = []
self.existing_manufacturers = self.get_manufacturers()
Expand All @@ -64,7 +67,7 @@ def create_manufacturers(self, vendors):
except KeyError:
to_create.append(vendor)
self.handle.verbose_log(f"Manufacturer queued for addition: {vendor['name']}")

if to_create:
try:
created_manufacturers = self.netbox.dcim.manufacturers.create(to_create)
Expand All @@ -75,9 +78,28 @@ def create_manufacturers(self, vendors):
except pynetbox.RequestError as request_error:
self.handle.log("Error creating manufacturers")
self.handle.verbose_log(f"Error during manufacturer creation. - {request_error.error}")

def create_device_types(self, device_types_to_add):
for device_type in device_types_to_add:

# Remove file base path
src_file = device_type["src"]
del device_type["src"]

# Pre-process front/rear_image flag, remove it if present
saved_images = {}
image_base = os.path.dirname(src_file).replace("device-types","elevation-images")
for i in ["front_image","rear_image"]:
if i in device_type:
if device_type[i]:
image_glob = f"{image_base}/{device_type['slug']}.{i.split('_')[0]}.*"
images = glob.glob(image_glob, recursive=False)
if images:
saved_images[i] = images[0]
else:
self.handle.log(f"Error locating image file using '{image_glob}'")
del device_type[i]

try:
dt = self.device_types.existing_device_types[device_type["model"]]
self.handle.verbose_log(f'Device Type Exists: {dt.manufacturer.name} - '
Expand Down Expand Up @@ -114,6 +136,10 @@ def create_device_types(self, device_types_to_add):
if self.modules and 'module-bays' in device_type:
self.device_types.create_module_bays(device_type['module-bays'], dt.id)

# Finally, update images if any
if saved_images:
self.device_types.upload_images(self.url, self.token, saved_images, dt.id)

def create_module_types(self, module_types):
all_module_types = {}
for curr_nb_mt in self.netbox.dcim.module_types.all():
Expand Down Expand Up @@ -152,46 +178,46 @@ def create_module_types(self, module_types):
self.device_types.create_module_rear_ports(curr_mt["rear-ports"], module_type_res.id)
if "front-ports" in curr_mt:
self.device_types.create_module_front_ports(curr_mt["front-ports"], module_type_res.id)

class DeviceTypes:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)

def __init__(self, netbox, handle, counter):
self.netbox = netbox
self.handle = handle
self.counter = counter
self.existing_device_types = self.get_device_types()

def get_device_types(self):
return {str(item): item for item in self.netbox.dcim.device_types.all()}

def get_power_ports(self, device_type):
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(devicetype_id=device_type)}

def get_rear_ports(self, device_type):
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(devicetype_id=device_type)}

def get_module_power_ports(self, module_type):
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(moduletype_id=module_type)}

def get_module_rear_ports(self, module_type):
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(moduletype_id=module_type)}

def get_device_type_ports_to_create(self, dcim_ports, device_type, existing_ports):
to_create = [port for port in dcim_ports if port['name'] not in existing_ports]
for port in to_create:
port['device_type'] = device_type

return to_create

def get_module_type_ports_to_create(self, module_ports, module_type, existing_ports):
to_create = [port for port in module_ports if port['name'] not in existing_ports]
for port in to_create:
port['module_type'] = module_type

return to_create

def create_interfaces(self, interfaces, device_type):
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(
devicetype_id=device_type)}
Expand All @@ -206,11 +232,11 @@ def create_interfaces(self, interfaces, device_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Interface")

def create_power_ports(self, power_ports, device_type):
existing_power_ports = self.get_power_ports(device_type)
to_create = self.get_device_type_ports_to_create(power_ports, device_type, existing_power_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -219,11 +245,11 @@ def create_power_ports(self, power_ports, device_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Power Port")

def create_console_ports(self, console_ports, device_type):
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(console_ports, device_type, existing_console_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -245,7 +271,7 @@ def create_power_outlets(self, power_outlets, device_type):
outlet['power_port'] = power_port.id
except KeyError:
pass

try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
Expand Down Expand Up @@ -283,7 +309,7 @@ def create_rear_ports(self, rear_ports, device_type):
def create_front_ports(self, front_ports, device_type):
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(front_ports, device_type, existing_front_ports)

if to_create:
all_rearports = self.get_rear_ports(device_type)
for port in to_create:
Expand All @@ -293,7 +319,7 @@ def create_front_ports(self, front_ports, device_type):
except KeyError:
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
+ f'{port["type"]} - {device_type}')

try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
Expand All @@ -305,7 +331,7 @@ def create_front_ports(self, front_ports, device_type):
def create_device_bays(self, device_bays, device_type):
existing_device_bays = {str(item): item for item in self.netbox.dcim.device_bay_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(device_bays, device_type, existing_device_bays)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -314,11 +340,11 @@ def create_device_bays(self, device_bays, device_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Device Bay")

def create_module_bays(self, module_bays, device_type):
existing_module_bays = {str(item): item for item in self.netbox.dcim.module_bay_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(module_bays, device_type, existing_module_bays)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -331,7 +357,7 @@ def create_module_bays(self, module_bays, device_type):
def create_module_interfaces(self, module_interfaces, module_type):
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(module_interfaces, module_type, existing_interfaces)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -340,11 +366,11 @@ def create_module_interfaces(self, module_interfaces, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Interface")

def create_module_power_ports(self, power_ports, module_type):
existing_power_ports = self.get_module_power_ports(module_type)
to_create = self.get_module_type_ports_to_create(power_ports, module_type, existing_power_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -353,11 +379,11 @@ def create_module_power_ports(self, power_ports, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Power Port")

def create_module_console_ports(self, console_ports, module_type):
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(console_ports, module_type, existing_console_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -366,11 +392,11 @@ def create_module_console_ports(self, console_ports, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Console Port")

def create_module_power_outlets(self, power_outlets, module_type):
existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(power_outlets, module_type, existing_power_outlets)

if to_create:
existing_power_ports = self.get_module_power_ports(module_type)
for outlet in to_create:
Expand All @@ -379,19 +405,19 @@ def create_module_power_outlets(self, power_outlets, module_type):
outlet['power_port'] = power_port.id
except KeyError:
pass

try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.power_outlet_templates.create(to_create), "Module Power Outlet")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Power Outlet")

def create_module_console_server_ports(self, console_server_ports, module_type):
existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(console_server_ports, module_type, existing_console_server_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -400,11 +426,11 @@ def create_module_console_server_ports(self, console_server_ports, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Console Server Port")

def create_module_rear_ports(self, rear_ports, module_type):
existing_rear_ports = self.get_module_rear_ports(module_type)
to_create = self.get_module_type_ports_to_create(rear_ports, module_type, existing_rear_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -417,7 +443,7 @@ def create_module_rear_ports(self, rear_ports, module_type):
def create_module_front_ports(self, front_ports, module_type):
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(front_ports, module_type, existing_front_ports)

if to_create:
existing_rear_ports = self.get_module_rear_ports(module_type)
for port in to_create:
Expand All @@ -427,11 +453,32 @@ def create_module_front_ports(self, front_ports, module_type):
except KeyError:
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
+ f'{port["type"]} - {module_type}')

try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.front_port_templates.create(to_create), "Module Front Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Front Port")
self.handle.log(f"Error '{excep.error}' creating Module Front Port")

def upload_images(self,baseurl,token,images,device_type):
'''Upload front_image and/or rear_image for the given device type
Args:
baseurl: URL for Netbox instance
token: Token to access Netbox instance
images: map of front_image and/or rear_image filename
device_type: id for the device-type to update
Returns:
None
'''
url = f"{baseurl}/api/dcim/device-types/{device_type}/"
headers = { "Authorization": f"Token {token}" }

files = { i: (os.path.basename(f), open(f,"rb") ) for i,f in images.items() }
response = requests.patch(url, headers=headers, files=files, verify=False)

self.handle.log( f'Images {images} updated at {url}: {response}' )
self.counter["images"] += len(images)
3 changes: 3 additions & 0 deletions repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ def parse_files(self, files: list, slugs: list = None):
data['manufacturer'] = {
'name': manufacturer, 'slug': self.slug_format(manufacturer)}

# Save file location to resolve any relative paths for images
data['src'] = file

if slugs and True not in [True if s.casefold() in data['slug'].casefold() else False for s in slugs]:
self.handle.verbose_log(f"Skipping {data['model']}")
continue
Expand Down

0 comments on commit 8ad3ed7

Please sign in to comment.