diff --git a/nb-dt-import.py b/nb-dt-import.py index 34b27280..82555283 100755 --- a/nb-dt-import.py +++ b/nb-dt-import.py @@ -38,6 +38,7 @@ def main(): settings.handle.verbose_log( f'Script took {(datetime.now() - startTime)} to run') settings.handle.log(f'{netbox.counter["added"]} devices created') + settings.handle.log(f'{netbox.counter["images"]} images uploaded') settings.handle.log( f'{netbox.counter["updated"]} interfaces/ports updated') settings.handle.log( diff --git a/netbox_api.py b/netbox_api.py index 09d048c6..539ef8d9 100644 --- a/netbox_api.py +++ b/netbox_api.py @@ -1,12 +1,14 @@ from collections import Counter import pynetbox import requests +import os +import glob # from pynetbox import RequestError as APIRequestError class NetBox: def __new__(cls, *args, **kwargs): return super().__new__(cls) - + def __init__(self, settings): self.counter = Counter( added=0, @@ -14,6 +16,7 @@ def __init__(self, settings): manufacturer=0, module_added=0, module_port_added=0, + images=0, ) self.url = settings.NETBOX_URL self.token = settings.NETBOX_TOKEN @@ -25,7 +28,7 @@ def __init__(self, settings): self.verify_compatibility() self.existing_manufacturers = self.get_manufacturers() self.device_types = DeviceTypes(self.netbox, self.handle, self.counter) - + def connect_api(self): try: self.netbox = pynetbox.api(self.url, token=self.token) @@ -35,13 +38,13 @@ def connect_api(self): self.netbox.http_session.verify = False except Exception as e: self.handle.exception("Exception", 'NetBox API Error', e) - + def get_api(self): return self.netbox - + def get_counter(self): return self.counter - + def verify_compatibility(self): # nb.version should be the version in the form '3.2' version_split = [int(x) for x in self.netbox.version.split('.')] @@ -50,10 +53,10 @@ def verify_compatibility(self): # Might want to check for the module-types entry as well? if version_split[0] > 3 or (version_split[0] == 3 and version_split[1] >= 2): self.modules = True - + def get_manufacturers(self): return {str(item): item for item in self.netbox.dcim.manufacturers.all()} - + def create_manufacturers(self, vendors): to_create = [] self.existing_manufacturers = self.get_manufacturers() @@ -64,7 +67,7 @@ def create_manufacturers(self, vendors): except KeyError: to_create.append(vendor) self.handle.verbose_log(f"Manufacturer queued for addition: {vendor['name']}") - + if to_create: try: created_manufacturers = self.netbox.dcim.manufacturers.create(to_create) @@ -75,9 +78,28 @@ def create_manufacturers(self, vendors): except pynetbox.RequestError as request_error: self.handle.log("Error creating manufacturers") self.handle.verbose_log(f"Error during manufacturer creation. - {request_error.error}") - + def create_device_types(self, device_types_to_add): for device_type in device_types_to_add: + + # Remove file base path + src_file = device_type["src"] + del device_type["src"] + + # Pre-process front/rear_image flag, remove it if present + saved_images = {} + image_base = os.path.dirname(src_file).replace("device-types","elevation-images") + for i in ["front_image","rear_image"]: + if i in device_type: + if device_type[i]: + image_glob = f"{image_base}/{device_type['slug']}.{i.split('_')[0]}.*" + images = glob.glob(image_glob, recursive=False) + if images: + saved_images[i] = images[0] + else: + self.handle.log(f"Error locating image file using '{image_glob}'") + del device_type[i] + try: dt = self.device_types.existing_device_types[device_type["model"]] self.handle.verbose_log(f'Device Type Exists: {dt.manufacturer.name} - ' @@ -114,6 +136,10 @@ def create_device_types(self, device_types_to_add): if self.modules and 'module-bays' in device_type: self.device_types.create_module_bays(device_type['module-bays'], dt.id) + # Finally, update images if any + if saved_images: + self.device_types.upload_images(self.url, self.token, saved_images, dt.id) + def create_module_types(self, module_types): all_module_types = {} for curr_nb_mt in self.netbox.dcim.module_types.all(): @@ -152,46 +178,46 @@ def create_module_types(self, module_types): self.device_types.create_module_rear_ports(curr_mt["rear-ports"], module_type_res.id) if "front-ports" in curr_mt: self.device_types.create_module_front_ports(curr_mt["front-ports"], module_type_res.id) - + class DeviceTypes: def __new__(cls, *args, **kwargs): return super().__new__(cls) - + def __init__(self, netbox, handle, counter): self.netbox = netbox self.handle = handle self.counter = counter self.existing_device_types = self.get_device_types() - + def get_device_types(self): return {str(item): item for item in self.netbox.dcim.device_types.all()} def get_power_ports(self, device_type): return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(devicetype_id=device_type)} - + def get_rear_ports(self, device_type): return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(devicetype_id=device_type)} - + def get_module_power_ports(self, module_type): return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(moduletype_id=module_type)} - + def get_module_rear_ports(self, module_type): return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(moduletype_id=module_type)} - + def get_device_type_ports_to_create(self, dcim_ports, device_type, existing_ports): to_create = [port for port in dcim_ports if port['name'] not in existing_ports] for port in to_create: port['device_type'] = device_type - + return to_create - + def get_module_type_ports_to_create(self, module_ports, module_type, existing_ports): to_create = [port for port in module_ports if port['name'] not in existing_ports] for port in to_create: port['module_type'] = module_type - + return to_create - + def create_interfaces(self, interfaces, device_type): existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter( devicetype_id=device_type)} @@ -206,11 +232,11 @@ def create_interfaces(self, interfaces, device_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Interface") - + def create_power_ports(self, power_ports, device_type): existing_power_ports = self.get_power_ports(device_type) to_create = self.get_device_type_ports_to_create(power_ports, device_type, existing_power_ports) - + if to_create: try: self.counter.update({'updated': @@ -219,11 +245,11 @@ def create_power_ports(self, power_ports, device_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Power Port") - + def create_console_ports(self, console_ports, device_type): existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(devicetype_id=device_type)} to_create = self.get_device_type_ports_to_create(console_ports, device_type, existing_console_ports) - + if to_create: try: self.counter.update({'updated': @@ -245,7 +271,7 @@ def create_power_outlets(self, power_outlets, device_type): outlet['power_port'] = power_port.id except KeyError: pass - + try: self.counter.update({'updated': self.handle.log_device_ports_created( @@ -283,7 +309,7 @@ def create_rear_ports(self, rear_ports, device_type): def create_front_ports(self, front_ports, device_type): existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(devicetype_id=device_type)} to_create = self.get_device_type_ports_to_create(front_ports, device_type, existing_front_ports) - + if to_create: all_rearports = self.get_rear_ports(device_type) for port in to_create: @@ -293,7 +319,7 @@ def create_front_ports(self, front_ports, device_type): except KeyError: self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - ' + f'{port["type"]} - {device_type}') - + try: self.counter.update({'updated': self.handle.log_device_ports_created( @@ -305,7 +331,7 @@ def create_front_ports(self, front_ports, device_type): def create_device_bays(self, device_bays, device_type): existing_device_bays = {str(item): item for item in self.netbox.dcim.device_bay_templates.filter(devicetype_id=device_type)} to_create = self.get_device_type_ports_to_create(device_bays, device_type, existing_device_bays) - + if to_create: try: self.counter.update({'updated': @@ -314,11 +340,11 @@ def create_device_bays(self, device_bays, device_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Device Bay") - + def create_module_bays(self, module_bays, device_type): existing_module_bays = {str(item): item for item in self.netbox.dcim.module_bay_templates.filter(devicetype_id=device_type)} to_create = self.get_device_type_ports_to_create(module_bays, device_type, existing_module_bays) - + if to_create: try: self.counter.update({'updated': @@ -331,7 +357,7 @@ def create_module_bays(self, module_bays, device_type): def create_module_interfaces(self, module_interfaces, module_type): existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(moduletype_id=module_type)} to_create = self.get_module_type_ports_to_create(module_interfaces, module_type, existing_interfaces) - + if to_create: try: self.counter.update({'updated': @@ -340,11 +366,11 @@ def create_module_interfaces(self, module_interfaces, module_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Module Interface") - + def create_module_power_ports(self, power_ports, module_type): existing_power_ports = self.get_module_power_ports(module_type) to_create = self.get_module_type_ports_to_create(power_ports, module_type, existing_power_ports) - + if to_create: try: self.counter.update({'updated': @@ -353,11 +379,11 @@ def create_module_power_ports(self, power_ports, module_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Module Power Port") - + def create_module_console_ports(self, console_ports, module_type): existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(moduletype_id=module_type)} to_create = self.get_module_type_ports_to_create(console_ports, module_type, existing_console_ports) - + if to_create: try: self.counter.update({'updated': @@ -366,11 +392,11 @@ def create_module_console_ports(self, console_ports, module_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Module Console Port") - + def create_module_power_outlets(self, power_outlets, module_type): existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(moduletype_id=module_type)} to_create = self.get_module_type_ports_to_create(power_outlets, module_type, existing_power_outlets) - + if to_create: existing_power_ports = self.get_module_power_ports(module_type) for outlet in to_create: @@ -379,7 +405,7 @@ def create_module_power_outlets(self, power_outlets, module_type): outlet['power_port'] = power_port.id except KeyError: pass - + try: self.counter.update({'updated': self.handle.log_module_ports_created( @@ -387,11 +413,11 @@ def create_module_power_outlets(self, power_outlets, module_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Module Power Outlet") - + def create_module_console_server_ports(self, console_server_ports, module_type): existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(moduletype_id=module_type)} to_create = self.get_module_type_ports_to_create(console_server_ports, module_type, existing_console_server_ports) - + if to_create: try: self.counter.update({'updated': @@ -400,11 +426,11 @@ def create_module_console_server_ports(self, console_server_ports, module_type): }) except pynetbox.RequestError as excep: self.handle.log(f"Error '{excep.error}' creating Module Console Server Port") - + def create_module_rear_ports(self, rear_ports, module_type): existing_rear_ports = self.get_module_rear_ports(module_type) to_create = self.get_module_type_ports_to_create(rear_ports, module_type, existing_rear_ports) - + if to_create: try: self.counter.update({'updated': @@ -417,7 +443,7 @@ def create_module_rear_ports(self, rear_ports, module_type): def create_module_front_ports(self, front_ports, module_type): existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(moduletype_id=module_type)} to_create = self.get_module_type_ports_to_create(front_ports, module_type, existing_front_ports) - + if to_create: existing_rear_ports = self.get_module_rear_ports(module_type) for port in to_create: @@ -427,11 +453,32 @@ def create_module_front_ports(self, front_ports, module_type): except KeyError: self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - ' + f'{port["type"]} - {module_type}') - + try: self.counter.update({'updated': self.handle.log_module_ports_created( self.netbox.dcim.front_port_templates.create(to_create), "Module Front Port") }) except pynetbox.RequestError as excep: - self.handle.log(f"Error '{excep.error}' creating Module Front Port") \ No newline at end of file + self.handle.log(f"Error '{excep.error}' creating Module Front Port") + + def upload_images(self,baseurl,token,images,device_type): + '''Upload front_image and/or rear_image for the given device type + + Args: + baseurl: URL for Netbox instance + token: Token to access Netbox instance + images: map of front_image and/or rear_image filename + device_type: id for the device-type to update + + Returns: + None + ''' + url = f"{baseurl}/api/dcim/device-types/{device_type}/" + headers = { "Authorization": f"Token {token}" } + + files = { i: (os.path.basename(f), open(f,"rb") ) for i,f in images.items() } + response = requests.patch(url, headers=headers, files=files, verify=False) + + self.handle.log( f'Images {images} updated at {url}: {response}' ) + self.counter["images"] += len(images) diff --git a/repo.py b/repo.py index 65cda964..f52e0b00 100644 --- a/repo.py +++ b/repo.py @@ -95,6 +95,9 @@ def parse_files(self, files: list, slugs: list = None): data['manufacturer'] = { 'name': manufacturer, 'slug': self.slug_format(manufacturer)} + # Save file location to resolve any relative paths for images + data['src'] = file + if slugs and True not in [True if s.casefold() in data['slug'].casefold() else False for s in slugs]: self.handle.verbose_log(f"Skipping {data['model']}") continue