From bfb1b91a6f42706625e7f0378e40a41738eec705 Mon Sep 17 00:00:00 2001 From: Nicolai Buchwitz Date: Fri, 10 May 2024 14:20:01 +0200 Subject: [PATCH 1/6] ci/style: Enforce styling and linting Signed-off-by: Nicolai Buchwitz --- .github/workflows/lint.yml | 29 ++ README.md | 2 + check_pve.py | 767 +++++++++++++++++++++++-------------- pyproject.toml | 20 + 4 files changed, 538 insertions(+), 280 deletions(-) create mode 100644 .github/workflows/lint.yml create mode 100644 pyproject.toml diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..c33ab80 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,29 @@ +on: [push, pull_request] +name: Linter + +jobs: + build: + if: + github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "*" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install black ruff + + - name: Check styling with black + run: | + black --check *.py + + - name: Run ruff linter + run: | + ruff check *.py diff --git a/README.md b/README.md index bc47e19..c49817a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # check_pve Icinga check command for Proxmox VE via API +![Linter](https://github.com/nbuchwitz/check_pve/actions/workflows/lint.yml/badge.svg) + ## Setup ### Requirements diff --git a/check_pve.py b/check_pve.py index 8377e46..98bf5e5 100755 --- a/check_pve.py +++ b/check_pve.py @@ -77,30 +77,29 @@ def threshold_type(arg: str): try: thresholds[None] = CheckThreshold(float(arg)) except: - for t in arg.split(','): + for t in arg.split(","): m = re.match("([a-z_0-9]+):([0-9.]+)", t) if m: thresholds[m.group(1)] = CheckThreshold(float(m.group(2))) else: - raise argparse.ArgumentTypeError( - "invalid threshold format: {}".format(t)) + raise argparse.ArgumentTypeError("invalid threshold format: {}".format(t)) return thresholds class CheckPVE: - VERSION = '1.2.2' - API_URL = 'https://{hostname}:{port}/api2/json/{command}' + VERSION = "1.2.2" + API_URL = "https://{hostname}:{port}/api2/json/{command}" UNIT_SCALE = { - "GB": 10**9, - "MB": 10**6, - "KB": 10**3, - "GiB": 2**30, - "MiB": 2**20, - "KiB": 2**10, - "B": 1 - } + "GB": 10**9, + "MB": 10**6, + "KB": 10**3, + "GiB": 2**30, + "MiB": 2**20, + "KiB": 2**10, + "B": 1, + } def check_output(self): message = self.check_message @@ -112,93 +111,105 @@ def check_output(self): @staticmethod def output(rc, message): prefix = rc.name - message = '{} - {}'.format(prefix, message) + message = "{} - {}".format(prefix, message) print(message) sys.exit(rc.value) def get_url(self, command): - return self.API_URL.format(hostname=self.options.api_endpoint, command=command, port=self.options.api_port) + return self.API_URL.format( + hostname=self.options.api_endpoint, command=command, port=self.options.api_port + ) - def request(self, url, method='get', **kwargs): + def request(self, url, method="get", **kwargs): response = None try: - if method == 'post': + if method == "post": response = requests.post( url, verify=not self.options.api_insecure, - data=kwargs.get('data', None), - timeout=5 + data=kwargs.get("data", None), + timeout=5, ) - elif method == 'get': + elif method == "get": response = requests.get( url, verify=not self.options.api_insecure, cookies=self.__cookies, headers=self.__headers, - params=kwargs.get('params', None), + params=kwargs.get("params", None), ) else: self.output(CheckState.CRITICAL, "Unsupport request method: {}".format(method)) except requests.exceptions.ConnectTimeout: self.output(CheckState.UNKNOWN, "Could not connect to PVE API: Connection timeout") except requests.exceptions.SSLError: - self.output(CheckState.UNKNOWN, "Could not connect to PVE API: Certificate validation failed") + self.output( + CheckState.UNKNOWN, "Could not connect to PVE API: Certificate validation failed" + ) except requests.exceptions.ConnectionError: - self.output(CheckState.UNKNOWN, "Could not connect to PVE API: Failed to resolve hostname") + self.output( + CheckState.UNKNOWN, "Could not connect to PVE API: Failed to resolve hostname" + ) if response.ok: - return response.json()['data'] + return response.json()["data"] else: message = "Could not fetch data from API: " if response.status_code == 401: message += "Could not connection to PVE API: invalid username or password" elif response.status_code == 403: - message += "Access denied. Please check if API user has sufficient permissions / the role has been " \ - "assigned." + message += ( + "Access denied. Please check if API user has sufficient permissions / the role has been " + "assigned." + ) else: message += "HTTP error code was {}".format(response.status_code) self.output(CheckState.UNKNOWN, message) def get_ticket(self): - url = self.get_url('access/ticket') + url = self.get_url("access/ticket") data = {"username": self.options.api_user, "password": self.options.api_password} result = self.request(url, "post", data=data) - return result['ticket'] + return result["ticket"] def check_api_value(self, url, message, **kwargs): result = self.request(url) used = None - if 'key' in kwargs: - result = result[kwargs.get('key')] + if "key" in kwargs: + result = result[kwargs.get("key")] if isinstance(result, (dict,)): - used_percent = self.get_value(result['used'], result['total']) - used = self.get_value(result['used']) - total = self.get_value(result['total']) - - self.add_perfdata(kwargs.get('perfkey', 'usage'), used_percent) - self.add_perfdata(kwargs.get('perfkey', 'used'), used, max=total, unit=self.options.unit) + used_percent = self.get_value(result["used"], result["total"]) + used = self.get_value(result["used"]) + total = self.get_value(result["total"]) + + self.add_perfdata(kwargs.get("perfkey", "usage"), used_percent) + self.add_perfdata( + kwargs.get("perfkey", "used"), used, max=total, unit=self.options.unit + ) else: used_percent = round(float(result) * 100, 2) - self.add_perfdata(kwargs.get('perfkey', 'usage'), used_percent) + self.add_perfdata(kwargs.get("perfkey", "usage"), used_percent) if self.options.values_mb: - message += ' {} {}'.format(used, self.options.unit) + message += " {} {}".format(used, self.options.unit) value = used else: - message += ' {} {}'.format(used_percent, '%') + message += " {} {}".format(used_percent, "%") value = used_percent self.check_thresholds(value, message) def check_vm_status(self, idx, **kwargs): - url = self.get_url('cluster/resources', ) - data = self.request(url, params={'type': 'vm'}) + url = self.get_url( + "cluster/resources", + ) + data = self.request(url, params={"type": "vm"}) expected_state = kwargs.get("expected_state", "running") only_status = kwargs.get("only_status", False) @@ -208,40 +219,57 @@ def check_vm_status(self, idx, **kwargs): if idx == vm.get("name", vm.get("vmid", None)): # Check if VM (default) or LXC vm_type = "VM" - if vm['type'] == 'lxc': + if vm["type"] == "lxc": vm_type = "LXC" - if vm['status'] != expected_state: - self.check_message = "{} '{}' is {} (expected: {})".format(vm_type, vm['name'], vm['status'], - expected_state) + if vm["status"] != expected_state: + self.check_message = "{} '{}' is {} (expected: {})".format( + vm_type, vm["name"], vm["status"], expected_state + ) if not self.options.ignore_vm_status: self.check_result = CheckState.CRITICAL else: - if self.options.node and self.options.node != vm['node']: - self.check_message = "{} '{}' is {}, but located on node '{}' instead of '{}'" \ - .format(vm_type, vm['name'], expected_state, vm['node'], self.options.node) + if self.options.node and self.options.node != vm["node"]: + self.check_message = ( + "{} '{}' is {}, but located on node '{}' instead of '{}'".format( + vm_type, vm["name"], expected_state, vm["node"], self.options.node + ) + ) self.check_result = CheckState.WARNING else: - self.check_message = "{} '{}' is {} on node '{}'" \ - .format(vm_type, vm['name'], expected_state, vm['node']) + self.check_message = "{} '{}' is {} on node '{}'".format( + vm_type, vm["name"], expected_state, vm["node"] + ) - if vm['status'] == 'running' and not only_status: - cpu = round(vm['cpu'] * 100, 2) + if vm["status"] == "running" and not only_status: + cpu = round(vm["cpu"] * 100, 2) self.add_perfdata("cpu", cpu) if self.options.values_mb: - memory = self.scale_value(vm['mem']) - self.add_perfdata("memory", memory, unit=self.options.unit, max=self.scale_value(vm['maxmem'])) - disk = self.scale_value(vm['disk']) - self.add_perfdata("disk", disk, unit=self.options.unit, max=self.scale_value(vm['maxdisk'])) + memory = self.scale_value(vm["mem"]) + self.add_perfdata( + "memory", + memory, + unit=self.options.unit, + max=self.scale_value(vm["maxmem"]), + ) + disk = self.scale_value(vm["disk"]) + self.add_perfdata( + "disk", + disk, + unit=self.options.unit, + max=self.scale_value(vm["maxdisk"]), + ) else: - memory = self.get_value(vm['mem'], vm['maxmem']) + memory = self.get_value(vm["mem"], vm["maxmem"]) self.add_perfdata("memory", memory) - disk = self.get_value(vm['disk'], vm['maxdisk']) + disk = self.get_value(vm["disk"], vm["maxdisk"]) self.add_perfdata("disk", disk) - self.check_thresholds({"cpu": cpu, "memory": memory, "disk": disk}, message=self.check_message) + self.check_thresholds( + {"cpu": cpu, "memory": memory, "disk": disk}, message=self.check_message + ) found = True break @@ -251,46 +279,54 @@ def check_vm_status(self, idx, **kwargs): self.check_result = CheckState.WARNING def check_disks(self): - url = self.get_url('nodes/{}/disks'.format(self.options.node)) + url = self.get_url("nodes/{}/disks".format(self.options.node)) failed = [] unknown = [] - disks = self.request(url + '/list') + disks = self.request(url + "/list") for disk in disks: - name = disk['devpath'].replace('/dev/', '') + name = disk["devpath"].replace("/dev/", "") if name in self.options.ignore_disks: continue - if disk['health'] == 'UNKNOWN': + if disk["health"] == "UNKNOWN": self.check_result = CheckState.WARNING - unknown.append({"serial": disk["serial"], "device": disk['devpath']}) + unknown.append({"serial": disk["serial"], "device": disk["devpath"]}) - elif disk['health'] not in ('PASSED', 'OK'): + elif disk["health"] not in ("PASSED", "OK"): self.check_result = CheckState.WARNING - failed.append({"serial": disk["serial"], "device": disk['devpath']}) + failed.append({"serial": disk["serial"], "device": disk["devpath"]}) - if disk['wearout'] != 'N/A': - self.add_perfdata('wearout_{}'.format(name), disk['wearout']) + if disk["wearout"] != "N/A": + self.add_perfdata("wearout_{}".format(name), disk["wearout"]) if failed: - self.check_message = "{} of {} disks failed the health test:\n".format(len(failed), len(disks)) + self.check_message = "{} of {} disks failed the health test:\n".format( + len(failed), len(disks) + ) for disk in failed: - self.check_message += "- {} with serial '{}'\n".format(disk['device'], disk['serial']) + self.check_message += "- {} with serial '{}'\n".format( + disk["device"], disk["serial"] + ) if unknown: - self.check_message += "{} of {} disks have unknown health status:\n".format(len(unknown), len(disks)) + self.check_message += "{} of {} disks have unknown health status:\n".format( + len(unknown), len(disks) + ) for disk in unknown: - self.check_message += "- {} with serial '{}'\n".format(disk['device'], disk['serial']) + self.check_message += "- {} with serial '{}'\n".format( + disk["device"], disk["serial"] + ) if not failed and not unknown: self.check_message = "All disks are healthy" def check_replication(self): - url = self.get_url('nodes/{}/replication'.format(self.options.node)) + url = self.get_url("nodes/{}/replication".format(self.options.node)) if self.options.vmid: - data = self.request(url, params={'guest': self.options.vmid}) + data = self.request(url, params={"guest": self.options.vmid}) else: data = self.request(url) @@ -298,15 +334,22 @@ def check_replication(self): performance_data = [] for job in data: - if job['fail_count'] > 0: - failed_jobs.append({'guest': job['guest'], 'fail_count': job['fail_count'], 'error': job['error']}) + if job["fail_count"] > 0: + failed_jobs.append( + {"guest": job["guest"], "fail_count": job["fail_count"], "error": job["error"]} + ) else: - performance_data.append({'id': job['id'], 'duration': job['duration']}) + performance_data.append({"id": job["id"], "duration": job["duration"]}) if len(failed_jobs) > 0: message = "Failed replication jobs on {}: ".format(self.options.node) for job in failed_jobs: - message = message + "GUEST: {j[guest]}, FAIL_COUNT: {j[fail_count]}, ERROR: {j[error]} ; ".format(j=job) + message = ( + message + + "GUEST: {j[guest]}, FAIL_COUNT: {j[fail_count]}, ERROR: {j[error]} ; ".format( + j=job + ) + ) self.check_message = message self.check_result = CheckState.WARNING else: @@ -315,58 +358,64 @@ def check_replication(self): if len(performance_data) > 0: for metric in performance_data: - self.add_perfdata('duration_' + metric['id'], metric['duration'], unit='s') + self.add_perfdata("duration_" + metric["id"], metric["duration"], unit="s") def check_services(self): - url = self.get_url('nodes/{}/services'.format(self.options.node)) + url = self.get_url("nodes/{}/services".format(self.options.node)) data = self.request(url) failed = {} for service in data: - if service['state'] != 'running' \ - and service.get('active-state', 'active') == 'active' \ - and service['name'] not in self.options.ignore_services: - failed[service['name']] = service['desc'] + if ( + service["state"] != "running" + and service.get("active-state", "active") == "active" + and service["name"] not in self.options.ignore_services + ): + failed[service["name"]] = service["desc"] if failed: self.check_result = CheckState.CRITICAL message = "{} services are not running:\n\n".format(len(failed)) - message += "\n".join(['- {} ({}) is not running'.format(failed[i], i) for i in failed]) + message += "\n".join(["- {} ({}) is not running".format(failed[i], i) for i in failed]) self.check_message = message else: self.check_message = "All services are running" def check_subscription(self): - url = self.get_url('nodes/{}/subscription'.format(self.options.node)) + url = self.get_url("nodes/{}/subscription".format(self.options.node)) data = self.request(url) - if data['status'] == 'NotFound': + if data["status"] == "NotFound": self.check_result = CheckState.WARNING self.check_message = "No valid subscription found" - if data['status'] == 'Inactive': + if data["status"] == "Inactive": self.check_result = CheckState.CRITICAL self.check_message = "Subscription expired" - elif data['status'] == 'Active': - subscription_due_date = data['nextduedate'] - subscription_product_name = data['productname'] + elif data["status"] == "Active": + subscription_due_date = data["nextduedate"] + subscription_product_name = data["productname"] - date_expire = datetime.strptime(subscription_due_date, '%Y-%m-%d') + date_expire = datetime.strptime(subscription_due_date, "%Y-%m-%d") date_today = datetime.today() delta = (date_expire - date_today).days - message = '{} is valid until {}'.format( - subscription_product_name, - subscription_due_date) - message_warning_critical = '{} will expire in {} days ({})'.format( - subscription_product_name, - delta, - subscription_due_date) + message = "{} is valid until {}".format( + subscription_product_name, subscription_due_date + ) + message_warning_critical = "{} will expire in {} days ({})".format( + subscription_product_name, delta, subscription_due_date + ) - self.check_thresholds(delta, message, messageWarning=message_warning_critical, - messageCritical=message_warning_critical, lowerValue=True) + self.check_thresholds( + delta, + message, + messageWarning=message_warning_critical, + messageCritical=message_warning_critical, + lowerValue=True, + ) def check_updates(self): - url = self.get_url('nodes/{}/apt/update'.format(self.options.node)) + url = self.get_url("nodes/{}/apt/update".format(self.options.node)) count = len(self.request(url)) if count: @@ -379,21 +428,21 @@ def check_updates(self): self.check_message = "System up to date" def check_cluster_status(self): - url = self.get_url('cluster/status') + url = self.get_url("cluster/status") data = self.request(url) nodes = {} quorate = None - cluster = '' + cluster = "" for elem in data: - if elem['type'] == 'cluster': - quorate = elem['quorate'] - cluster = elem['name'] - elif elem['type'] == 'node': - nodes[elem['name']] = elem['online'] + if elem["type"] == "cluster": + quorate = elem["quorate"] + cluster = elem["name"] + elif elem["type"] == "node": + nodes[elem["name"]] = elem["online"] if quorate is None: - self.check_message = 'No cluster configuration found' + self.check_message = "No cluster configuration found" elif quorate: node_count = len(nodes) nodes_online_count = len({k: v for k, v in nodes.items() if v}) @@ -401,40 +450,44 @@ def check_cluster_status(self): if node_count > nodes_online_count: diff = node_count - nodes_online_count self.check_result = CheckState.WARNING - self.check_message = "Cluster '{}' is healthy, but {} node(s) offline'".format(cluster, diff) + self.check_message = "Cluster '{}' is healthy, but {} node(s) offline'".format( + cluster, diff + ) else: self.check_message = "Cluster '{}' is healthy'".format(cluster) - self.add_perfdata('nodes_total', node_count, unit='') - self.add_perfdata('nodes_online', nodes_online_count, unit='') + self.add_perfdata("nodes_total", node_count, unit="") + self.add_perfdata("nodes_online", nodes_online_count, unit="") else: self.check_result = CheckState.CRITICAL - self.check_message = 'Cluster is unhealthy - no quorum' + self.check_message = "Cluster is unhealthy - no quorum" def check_zfs_fragmentation(self, name=None): - url = self.get_url('nodes/{}/disks/zfs'.format(self.options.node)) + url = self.get_url("nodes/{}/disks/zfs".format(self.options.node)) data = self.request(url) warnings = [] critical = [] found = name is None for pool in data: - found = found or name == pool['name'] - if (name is not None and name == pool['name']) or name is None: + found = found or name == pool["name"] + if (name is not None and name == pool["name"]) or name is None: key = "fragmentation" if name is None: - key += '_{}'.format(pool['name']) - self.add_perfdata(key, pool['frag']) + key += "_{}".format(pool["name"]) + self.add_perfdata(key, pool["frag"]) - threshold_name = "fragmentation_{}".format(pool['name']) + threshold_name = "fragmentation_{}".format(pool["name"]) threshold_warning = self.threshold_warning(threshold_name) threshold_critical = self.threshold_critical(threshold_name) - if threshold_critical is not None and pool['frag'] > float( - threshold_critical.value): + if threshold_critical is not None and pool["frag"] > float( + threshold_critical.value + ): critical.append(pool) - elif threshold_warning is not None and pool['frag'] > float( - threshold_warning.value): + elif threshold_warning is not None and pool["frag"] > float( + threshold_warning.value + ): warnings.append(pool) if not found: @@ -446,21 +499,34 @@ def check_zfs_fragmentation(self, name=None): if critical: self.check_result = CheckState.CRITICAL if name is not None: - value = critical[0]['frag'] + value = critical[0]["frag"] else: self.check_result = CheckState.WARNING if name is not None: - value = warnings[0]['frag'] + value = warnings[0]["frag"] if name is not None: - self.check_message = "Fragmentation of ZFS pool '{}' is above thresholds: {} %".format(name, value) + self.check_message = ( + "Fragmentation of ZFS pool '{}' is above thresholds: {} %".format( + name, value + ) + ) else: message = "{} of {} ZFS pools are above fragmentation thresholds:\n\n".format( - len(warnings) + len(critical), len(data)) + len(warnings) + len(critical), len(data) + ) message += "\n".join( - ['- {} ({} %) is CRITICAL\n'.format(pool['name'], pool['frag']) for pool in critical]) + [ + "- {} ({} %) is CRITICAL\n".format(pool["name"], pool["frag"]) + for pool in critical + ] + ) message += "\n".join( - ['- {} ({} %) is WARNING\n'.format(pool['name'], pool['frag']) for pool in warnings]) + [ + "- {} ({} %) is WARNING\n".format(pool["name"], pool["frag"]) + for pool in warnings + ] + ) self.check_message = message else: self.check_result = CheckState.OK @@ -470,16 +536,16 @@ def check_zfs_fragmentation(self, name=None): self.check_message = "Fragmentation of all ZFS pools is OK" def check_zfs_health(self, name=None): - url = self.get_url('nodes/{}/disks/zfs'.format(self.options.node)) + url = self.get_url("nodes/{}/disks/zfs".format(self.options.node)) data = self.request(url) unhealthy = [] found = name is None - healthy_conditions = ['online'] + healthy_conditions = ["online"] for pool in data: - found = found or name == pool['name'] - if (name is not None and name == pool['name']) or name is None: - if pool['health'].lower() not in healthy_conditions: + found = found or name == pool["name"] + if (name is not None and name == pool["name"]) or name is None: + if pool["health"].lower() not in healthy_conditions: unhealthy.append(pool) if not found: @@ -490,7 +556,11 @@ def check_zfs_health(self, name=None): self.check_result = CheckState.CRITICAL message = "{} ZFS pools are not healthy:\n\n".format(len(unhealthy)) message += "\n".join( - ['- {} ({}) is not healthy'.format(pool['name'], pool['health']) for pool in unhealthy]) + [ + "- {} ({}) is not healthy".format(pool["name"], pool["health"]) + for pool in unhealthy + ] + ) self.check_message = message else: self.check_result = CheckState.OK @@ -500,23 +570,25 @@ def check_zfs_health(self, name=None): self.check_message = "All ZFS pools are healthy" def check_ceph_health(self): - url = self.get_url('cluster/ceph/status') + url = self.get_url("cluster/ceph/status") data = self.request(url) - ceph_health = data.get('health', {}) + ceph_health = data.get("health", {}) - if 'status' not in ceph_health: + if "status" not in ceph_health: self.check_result = CheckState.UNKNOWN - self.check_message = "Could not fetch Ceph status from API. " \ - "Check the output of 'pvesh get cluster/ceph' on your node" + self.check_message = ( + "Could not fetch Ceph status from API. " + "Check the output of 'pvesh get cluster/ceph' on your node" + ) return - if ceph_health['status'] == 'HEALTH_OK': + if ceph_health["status"] == "HEALTH_OK": self.check_result = CheckState.OK self.check_message = "Ceph Cluster is healthy" - elif ceph_health['status'] == 'HEALTH_WARN': + elif ceph_health["status"] == "HEALTH_WARN": self.check_result = CheckState.WARNING self.check_message = "Ceph Cluster is in warning state" - elif ceph_health['status'] == 'HEALTH_CRIT': + elif ceph_health["status"] == "HEALTH_CRIT": self.check_result = CheckState.CRITICAL self.check_message = "Ceph Cluster is in critical state" else: @@ -525,100 +597,117 @@ def check_ceph_health(self): def check_storage(self, name): # check if storage exists - url = self.get_url('nodes/{}/storage'.format(self.options.node)) + url = self.get_url("nodes/{}/storage".format(self.options.node)) data = self.request(url) - if not any(s['storage'] == name for s in data): + if not any(s["storage"] == name for s in data): self.check_result = CheckState.CRITICAL - self.check_message = "Storage '{}' doesn't exist on node '{}'".format(name, self.options.node) + self.check_message = "Storage '{}' doesn't exist on node '{}'".format( + name, self.options.node + ) return - url = self.get_url('nodes/{}/storage/{}/status'.format(self.options.node, name)) + url = self.get_url("nodes/{}/storage/{}/status".format(self.options.node, name)) self.check_api_value(url, "Usage of storage '{}' is".format(name)) def check_version(self): - url = self.get_url('version') + url = self.get_url("version") data = self.request(url) - if not data['version']: + if not data["version"]: self.check_result = CheckState.UNKNOWN self.check_message = "Unable to determine pve version" - elif self.options.min_version and version.parse(self.options.min_version) > version.parse(data['version']): + elif self.options.min_version and version.parse(self.options.min_version) > version.parse( + data["version"] + ): self.check_result = CheckState.CRITICAL - self.check_message = "Current pve version '{}' ({}) is lower than the min. required version '{}'".format( - data['version'], data['repoid'], self.options.min_version) + self.check_message = ( + "Current pve version '{}' ({}) is lower than the min. required version '{}'".format( + data["version"], data["repoid"], self.options.min_version + ) + ) else: - self.check_message = "Your pve instance version '{}' ({}) is up to date".format(data['version'], - data['repoid']) + self.check_message = "Your pve instance version '{}' ({}) is up to date".format( + data["version"], data["repoid"] + ) + def check_vzdump_backup(self, name=None): - tasks_url = self.get_url('cluster/tasks') + tasks_url = self.get_url("cluster/tasks") tasks = self.request(tasks_url) - tasks = [t for t in tasks if t['type'] == 'vzdump'] + tasks = [t for t in tasks if t["type"] == "vzdump"] # Filter by node id, if one is provided if self.options.node is not None: - tasks = [t for t in tasks if t['node'] == self.options.node] + tasks = [t for t in tasks if t["node"] == self.options.node] # Filter by timestamp, if provided - delta = self.threshold_critical('delta') + delta = self.threshold_critical("delta") if delta is not None: now = datetime.utcnow().timestamp() - tasks = [t for t in tasks if not delta.check(now - t['starttime'])] + tasks = [t for t in tasks if not delta.check(now - t["starttime"])] # absent status = job still running - tasks = [t for t in tasks if 'status' in t] - failed = len([t for t in tasks if t['status'] != 'OK']) + tasks = [t for t in tasks if "status" in t] + failed = len([t for t in tasks if t["status"] != "OK"]) success = len(tasks) - failed - self.check_message = '{} backup tasks successful, {} backup tasks failed'.format(success, failed) + self.check_message = "{} backup tasks successful, {} backup tasks failed".format( + success, failed + ) if failed > 0: self.check_result = CheckState.CRITICAL else: self.check_result = CheckState.OK if delta is not None: - self.check_message += ' within the last {}s'.format(delta.value) + self.check_message += " within the last {}s".format(delta.value) - nbu_url = self.get_url('cluster/backup-info/not-backed-up') + nbu_url = self.get_url("cluster/backup-info/not-backed-up") not_backed_up = self.request(nbu_url) if len(not_backed_up) > 0: - guest_ids = ' '.join([str(guest['vmid']) for guest in not_backed_up]) + guest_ids = " ".join([str(guest["vmid"]) for guest in not_backed_up]) if self.check_result not in [CheckState.CRITICAL, CheckState.UNKNOWN]: self.check_result = CheckState.WARNING - self.check_message += "\nThere are guests not covered by any backup schedule: {}".format(guest_ids) + self.check_message += ( + "\nThere are guests not covered by any backup schedule: {}".format(guest_ids) + ) def check_memory(self): - url = self.get_url('nodes/{}/status'.format(self.options.node)) - self.check_api_value(url, 'Memory usage is', key='memory') + url = self.get_url("nodes/{}/status".format(self.options.node)) + self.check_api_value(url, "Memory usage is", key="memory") def check_swap(self): - url = self.get_url('nodes/{}/status'.format(self.options.node)) - self.check_api_value(url, 'Swap usage is', key='swap') + url = self.get_url("nodes/{}/status".format(self.options.node)) + self.check_api_value(url, "Swap usage is", key="swap") def check_cpu(self): - url = self.get_url('nodes/{}/status'.format(self.options.node)) - self.check_api_value(url, 'CPU usage is', key='cpu') + url = self.get_url("nodes/{}/status".format(self.options.node)) + self.check_api_value(url, "CPU usage is", key="cpu") def check_io_wait(self): - url = self.get_url('nodes/{}/status'.format(self.options.node)) - self.check_api_value(url, 'IO wait is', key='wait', perfkey='wait') + url = self.get_url("nodes/{}/status".format(self.options.node)) + self.check_api_value(url, "IO wait is", key="wait", perfkey="wait") def check_thresholds(self, value, message, **kwargs): is_warning = False is_critical = False if not isinstance(value, dict): - value = { None: value } + value = {None: value} for metric, value in value.items(): value_warning = self.threshold_warning(metric) if value_warning is not None: - is_warning = is_warning or value_warning.check(value, kwargs.get('lowerValue', False)) + is_warning = is_warning or value_warning.check( + value, kwargs.get("lowerValue", False) + ) value_critical = self.threshold_critical(metric) if value_critical is not None: - is_critical = is_critical or value_critical.check(value, kwargs.get('lowerValue', False)) + is_critical = is_critical or value_critical.check( + value, kwargs.get("lowerValue", False) + ) if is_critical: self.check_result = CheckState.CRITICAL - self.check_message = kwargs.get('messageCritical', message) + self.check_message = kwargs.get("messageCritical", message) elif is_warning: self.check_result = CheckState.WARNING - self.check_message = kwargs.get('messageWarning', message) + self.check_message = kwargs.get("messageWarning", message) else: self.check_message = message @@ -626,13 +715,17 @@ def scale_value(self, value): if self.options.unit in self.UNIT_SCALE: return value / self.UNIT_SCALE[self.options.unit] else: - assert('wrong unit') + assert "wrong unit" def threshold_warning(self, name: str): - return self.options.threshold_warning.get(name, self.options.threshold_warning.get(None, None)) + return self.options.threshold_warning.get( + name, self.options.threshold_warning.get(None, None) + ) def threshold_critical(self, name: str): - return self.options.threshold_critical.get(name, self.options.threshold_critical.get(None, None)) + return self.options.threshold_critical.get( + name, self.options.threshold_critical.get(None, None) + ) def get_value(self, value, total=None): value = float(value) @@ -645,62 +738,62 @@ def get_value(self, value, total=None): return round(value, 2) def add_perfdata(self, name, value, **kwargs): - unit = kwargs.get('unit', '%') + unit = kwargs.get("unit", "%") - perfdata = '{}={}{}'.format(name, value, unit) + perfdata = "{}={}{}".format(name, value, unit) threshold_warning = self.threshold_warning(name) threshold_critical = self.threshold_critical(name) - perfdata += ';' + perfdata += ";" if threshold_warning: perfdata += str(threshold_warning.value) - perfdata += ';' + perfdata += ";" if threshold_critical: perfdata += str(threshold_critical.value) - perfdata += ';{}'.format(kwargs.get('min', 0)) - perfdata += ';{}'.format(kwargs.get('max', '')) + perfdata += ";{}".format(kwargs.get("min", 0)) + perfdata += ";{}".format(kwargs.get("max", "")) self.perfdata.append(perfdata) def get_perfdata(self): - perfdata = '' + perfdata = "" if len(self.perfdata): - perfdata = '|' - perfdata += ' '.join(self.perfdata) + perfdata = "|" + perfdata += " ".join(self.perfdata) return perfdata def check(self): self.check_result = CheckState.OK - if self.options.mode == 'cluster': + if self.options.mode == "cluster": self.check_cluster_status() - elif self.options.mode == 'version': + elif self.options.mode == "version": self.check_version() - elif self.options.mode == 'memory': + elif self.options.mode == "memory": self.check_memory() - elif self.options.mode == 'swap': + elif self.options.mode == "swap": self.check_swap() - elif self.options.mode == 'io_wait': + elif self.options.mode == "io_wait": self.check_io_wait() - elif self.options.mode == 'disk-health': + elif self.options.mode == "disk-health": self.check_disks() - elif self.options.mode == 'cpu': + elif self.options.mode == "cpu": self.check_cpu() - elif self.options.mode == 'services': + elif self.options.mode == "services": self.check_services() - elif self.options.mode == 'updates': + elif self.options.mode == "updates": self.check_updates() - elif self.options.mode == 'subscription': + elif self.options.mode == "subscription": self.check_subscription() - elif self.options.mode == 'storage': + elif self.options.mode == "storage": self.check_storage(self.options.name) - elif self.options.mode in ['vm', 'vm_status']: - only_status = self.options.mode == 'vm_status' + elif self.options.mode in ["vm", "vm_status"]: + only_status = self.options.mode == "vm_status" if self.options.name: idx = self.options.name @@ -708,18 +801,20 @@ def check(self): idx = self.options.vmid if self.options.expected_vm_status: - self.check_vm_status(idx, expected_state=self.options.expected_vm_status, only_status=only_status) + self.check_vm_status( + idx, expected_state=self.options.expected_vm_status, only_status=only_status + ) else: self.check_vm_status(idx, only_status=only_status) - elif self.options.mode == 'replication': + elif self.options.mode == "replication": self.check_replication() - elif self.options.mode == 'ceph-health': + elif self.options.mode == "ceph-health": self.check_ceph_health() - elif self.options.mode == 'zfs-health': + elif self.options.mode == "zfs-health": self.check_zfs_health(self.options.name) - elif self.options.mode == 'zfs-fragmentation': + elif self.options.mode == "zfs-fragmentation": self.check_zfs_fragmentation(self.options.name) - elif self.options.mode == 'backup': + elif self.options.mode == "backup": self.check_vzdump_backup(self.options.name) else: message = "Check mode '{}' not known".format(self.options.mode) @@ -728,95 +823,198 @@ def check(self): self.check_output() def parse_args(self): - p = argparse.ArgumentParser(description='Check command for PVE hosts via API') + p = argparse.ArgumentParser(description="Check command for PVE hosts via API") - api_opts = p.add_argument_group('API Options') + api_opts = p.add_argument_group("API Options") - api_opts.add_argument("-e", "-H", "--api-endpoint", - required=True, - help="PVE api endpoint hostname or ip address (no additional data like paths)") + api_opts.add_argument( + "-e", + "-H", + "--api-endpoint", + required=True, + help="PVE api endpoint hostname or ip address (no additional data like paths)", + ) api_opts.add_argument("--api-port", required=False, help="PVE api endpoint port") - api_opts.add_argument("-u", "--username", dest='api_user', required=True, - help="PVE api user (e.g. icinga2@pve or icinga2@pam, depending on which backend you " - "have chosen in proxmox)") + api_opts.add_argument( + "-u", + "--username", + dest="api_user", + required=True, + help="PVE api user (e.g. icinga2@pve or icinga2@pam, depending on which backend you " + "have chosen in proxmox)", + ) group = api_opts.add_mutually_exclusive_group(required=True) - group.add_argument("-p", "--password", dest='api_password', help="PVE API user password") - group.add_argument("-t", "--api-token", dest="api_token", help="PVE API token (format: TOKEN_ID=TOKEN_SECRET") - - api_opts.add_argument("-k", "--insecure", dest='api_insecure', action='store_true', default=False, - help="Don't verify HTTPS certificate") + group.add_argument("-p", "--password", dest="api_password", help="PVE API user password") + group.add_argument( + "-t", + "--api-token", + dest="api_token", + help="PVE API token (format: TOKEN_ID=TOKEN_SECRET", + ) + + api_opts.add_argument( + "-k", + "--insecure", + dest="api_insecure", + action="store_true", + default=False, + help="Don't verify HTTPS certificate", + ) api_opts.set_defaults(api_port=8006) - check_opts = p.add_argument_group('Check Options') - - check_opts.add_argument("-m", "--mode", - choices=( - 'cluster', 'version', 'cpu', 'memory', 'swap', 'storage', 'io_wait', 'updates', 'services', - 'subscription', 'vm', 'vm_status', 'replication', 'disk-health', 'ceph-health', - 'zfs-health', 'zfs-fragmentation', 'backup'), - required=True, - help="Mode to use.") - - check_opts.add_argument('-n', '--node', dest='node', - help='Node to check (necessary for all modes except cluster, version and backup)') - - check_opts.add_argument('--name', dest='name', - help='Name of storage, vm, or container') - - check_opts.add_argument('--vmid', dest='vmid', type=int, - help='ID of virtual machine or container') - - check_opts.add_argument('--expected-vm-status', choices=('running', 'stopped', 'paused'), - help='Expected VM status') - - check_opts.add_argument('--ignore-vm-status', dest='ignore_vm_status', action='store_true', - help='Ignore VM status in checks', - default=False) - - check_opts.add_argument('--ignore-service', dest='ignore_services', action='append', metavar='NAME', - help='Ignore service NAME in checks', default=[]) - - check_opts.add_argument('--ignore-disk', dest='ignore_disks', action='append', metavar='NAME', - help='Ignore disk NAME in health check', default=[]) - - check_opts.add_argument('-w', '--warning', dest='threshold_warning', type=CheckThreshold.threshold_type, - default={}, help='Warning threshold for check value. Mutiple thresholds with name:value,name:value') - check_opts.add_argument('-c', '--critical', dest='threshold_critical', type=CheckThreshold.threshold_type, - default={}, help='Critical threshold for check value. Mutiple thresholds with name:value,name:value') - check_opts.add_argument('-M', dest='values_mb', action='store_true', default=False, - help='Values are shown in the unit which is set with --unit (if available). Thresholds are also treated in this unit') - check_opts.add_argument('-V', '--min-version', dest='min_version', type=str, - help='The minimal pve version to check for. Any version lower than this will return ' - 'CRITICAL.') - - check_opts.add_argument('--unit', choices=self.UNIT_SCALE.keys(), default='MiB', help='Unit which is used for performance data and other values') + check_opts = p.add_argument_group("Check Options") + + check_opts.add_argument( + "-m", + "--mode", + choices=( + "cluster", + "version", + "cpu", + "memory", + "swap", + "storage", + "io_wait", + "updates", + "services", + "subscription", + "vm", + "vm_status", + "replication", + "disk-health", + "ceph-health", + "zfs-health", + "zfs-fragmentation", + "backup", + ), + required=True, + help="Mode to use.", + ) + + check_opts.add_argument( + "-n", + "--node", + dest="node", + help="Node to check (necessary for all modes except cluster, version and backup)", + ) + + check_opts.add_argument("--name", dest="name", help="Name of storage, vm, or container") + + check_opts.add_argument( + "--vmid", dest="vmid", type=int, help="ID of virtual machine or container" + ) + + check_opts.add_argument( + "--expected-vm-status", + choices=("running", "stopped", "paused"), + help="Expected VM status", + ) + + check_opts.add_argument( + "--ignore-vm-status", + dest="ignore_vm_status", + action="store_true", + help="Ignore VM status in checks", + default=False, + ) + + check_opts.add_argument( + "--ignore-service", + dest="ignore_services", + action="append", + metavar="NAME", + help="Ignore service NAME in checks", + default=[], + ) + + check_opts.add_argument( + "--ignore-disk", + dest="ignore_disks", + action="append", + metavar="NAME", + help="Ignore disk NAME in health check", + default=[], + ) + + check_opts.add_argument( + "-w", + "--warning", + dest="threshold_warning", + type=CheckThreshold.threshold_type, + default={}, + help="Warning threshold for check value. Mutiple thresholds with name:value,name:value", + ) + check_opts.add_argument( + "-c", + "--critical", + dest="threshold_critical", + type=CheckThreshold.threshold_type, + default={}, + help="Critical threshold for check value. Mutiple thresholds with name:value,name:value", + ) + check_opts.add_argument( + "-M", + dest="values_mb", + action="store_true", + default=False, + help="Values are shown in the unit which is set with --unit (if available). Thresholds are also treated in this unit", + ) + check_opts.add_argument( + "-V", + "--min-version", + dest="min_version", + type=str, + help="The minimal pve version to check for. Any version lower than this will return " + "CRITICAL.", + ) + + check_opts.add_argument( + "--unit", + choices=self.UNIT_SCALE.keys(), + default="MiB", + help="Unit which is used for performance data and other values", + ) options = p.parse_args() - if not options.node and options.mode not in ['cluster', 'vm', 'vm_status', 'version', 'ceph-health', 'backup']: + if not options.node and options.mode not in [ + "cluster", + "vm", + "vm_status", + "version", + "ceph-health", + "backup", + ]: p.print_usage() - message = "{}: error: --mode {} requires node name (--node)".format(p.prog, options.mode) + message = "{}: error: --mode {} requires node name (--node)".format( + p.prog, options.mode + ) self.output(CheckState.UNKNOWN, message) - if not options.vmid and not options.name and options.mode in ('vm', 'vm_status'): + if not options.vmid and not options.name and options.mode in ("vm", "vm_status"): p.print_usage() - message = "{}: error: --mode {} requires either vm name (--name) or id (--vmid)".format(p.prog, - options.mode) + message = "{}: error: --mode {} requires either vm name (--name) or id (--vmid)".format( + p.prog, options.mode + ) self.output(CheckState.UNKNOWN, message) - if not options.name and options.mode == 'storage': + if not options.name and options.mode == "storage": p.print_usage() - message = "{}: error: --mode {} requires storage name (--name)".format(p.prog, options.mode) + message = "{}: error: --mode {} requires storage name (--name)".format( + p.prog, options.mode + ) self.output(CheckState.UNKNOWN, message) def compare_thresholds(threshold_warning, threshold_critical, comparator): ok = True keys = set(list(threshold_warning.keys()) + list(threshold_critical.keys())) for key in keys: - if (key in threshold_warning and key in threshold_critical) or (None in threshold_warning and None in threshold_critical): + if (key in threshold_warning and key in threshold_critical) or ( + None in threshold_warning and None in threshold_critical + ): ok = ok and comparator(threshold_warning[key], threshold_critical[key]) elif key in threshold_warning and None in threshold_critical: ok = ok and comparator(threshold_warning[key], threshold_critical[None]) @@ -826,9 +1024,13 @@ def compare_thresholds(threshold_warning, threshold_critical, comparator): return ok if options.threshold_warning and options.threshold_critical: - if options.mode != 'subscription' and not compare_thresholds(options.threshold_warning, options.threshold_critical, lambda w,c: w<=c): + if options.mode != "subscription" and not compare_thresholds( + options.threshold_warning, options.threshold_critical, lambda w, c: w <= c + ): p.error("Critical value must be greater than warning value") - elif options.mode == 'subscription' and not compare_thresholds(options.threshold_warning, options.threshold_critical, lambda w,c: w>=c): + elif options.mode == "subscription" and not compare_thresholds( + options.threshold_warning, options.threshold_critical, lambda w, c: w >= c + ): p.error("Critical value must be lower than warning value") self.options = options @@ -847,12 +1049,17 @@ def __init__(self): if self.options.api_insecure: # disable urllib3 warning about insecure requests - requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) + requests.packages.urllib3.disable_warnings( + requests.packages.urllib3.exceptions.InsecureRequestWarning + ) if self.options.api_password is not None: - self.__cookies['PVEAuthCookie'] = self.get_ticket() + self.__cookies["PVEAuthCookie"] = self.get_ticket() elif self.options.api_token is not None: - self.__headers["Authorization"] = "PVEAPIToken={}!{}".format(self.options.api_user, self.options.api_token) + self.__headers["Authorization"] = "PVEAPIToken={}!{}".format( + self.options.api_user, self.options.api_token + ) + pve = CheckPVE() pve.check() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..26d8314 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[tool.black] +line-length = 100 + +[tool.ruff] +line-length = 100 +lint.select = [ + "ANN", # flake8-annotations + "B", # flake8-bugbear + "D", # pydocstyle + "E", # pycodestyle + "F", # Pyflakes + "Q", # flake8-quotes +] +lint.ignore = [ + "ANN101", # missing-type-self + "D107", # undocumented-public-init +] + +[tool.ruff.lint.mccabe] +max-complexity = 10 From 11d9775d1fc0d37a7389634365fdb3c51077a26d Mon Sep 17 00:00:00 2001 From: Nicolai Buchwitz Date: Sun, 12 May 2024 12:51:17 +0200 Subject: [PATCH 2/6] style: Make linter happy Rework some code to make linter happy (esp. use f strings). Signed-off-by: Nicolai Buchwitz --- check_pve.py | 433 ++++++++++++++++++++++++++++----------------------- 1 file changed, 238 insertions(+), 195 deletions(-) diff --git a/check_pve.py b/check_pve.py index 98bf5e5..bdbb598 100755 --- a/check_pve.py +++ b/check_pve.py @@ -23,22 +23,51 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ------------------------------------------------------------------------------ -import sys +"""Proxmox VE monitoring check command for various monitoring systems like Icinga and others.""" + import re +import sys +from typing import Callable, Dict, Optional, Union try: - from enum import Enum - from datetime import datetime - from packaging import version import argparse + from datetime import datetime + from enum import Enum + import requests + from packaging import version + from requests.packages.urllib3.exceptions import InsecureRequestWarning except ImportError as e: - print("Missing python module: {}".format(str(e))) + print(f"Missing python module: {str(e)}") sys.exit(255) +# Timeout for API requests in seconds +CHECK_API_TIMEOUT = 30 + + +def compare_thresholds( + threshold_warning: Dict, threshold_critical: Dict, comparator: Callable +) -> bool: + """Perform sanity checks on thresholds parameters (used for argparse validation).""" + ok = True + keys = set(list(threshold_warning.keys()) + list(threshold_critical.keys())) + for key in keys: + if (key in threshold_warning and key in threshold_critical) or ( + None in threshold_warning and None in threshold_critical + ): + ok = ok and comparator(threshold_warning[key], threshold_critical[key]) + elif key in threshold_warning and None in threshold_critical: + ok = ok and comparator(threshold_warning[key], threshold_critical[None]) + elif key in threshold_critical and None in threshold_warning: + ok = ok and comparator(threshold_warning[None], threshold_critical[key]) + + return ok + class CheckState(Enum): + """Check return values.""" + OK = 0 WARNING = 1 CRITICAL = 2 @@ -46,49 +75,60 @@ class CheckState(Enum): class CheckThreshold: - def __init__(self, value: float): + """Threshold representation used by the check command.""" + + def __init__(self, value: float) -> None: self.value = value - def __eq__(self, other): + def __eq__(self, other: "CheckThreshold") -> bool: + """Threshold is equal to given one.""" return self.value == other.value - def __lt__(self, other): + def __lt__(self, other: "CheckThreshold") -> bool: + """Threshold is lower to given one.""" return self.value < other.value - def __le__(self, other): + def __le__(self, other: "CheckThreshold") -> bool: + """Threshold is lower or equal to given one.""" return self.value <= other.value - def __gt__(self, other): + def __gt__(self, other: "CheckThreshold") -> bool: + """Threshold is greater than given one.""" return self.value > other.value - def __ge__(self, other): + def __ge__(self, other: "CheckThreshold") -> bool: + """Threshold is greater or equal than given one.""" return self.value >= other.value - def check(self, value: float, lower: bool = False): + def check(self, value: float, lower: bool = False) -> bool: + """Check threshold value as upper or lower boundary for given value.""" if lower: return value < self.value - else: - return value > self.value + + return value > self.value @staticmethod - def threshold_type(arg: str): + def threshold_type(arg: str) -> Dict[str, "CheckThreshold"]: + """Convert string argument(s) to threshold dict.""" thresholds = {} try: thresholds[None] = CheckThreshold(float(arg)) - except: + except ValueError: for t in arg.split(","): m = re.match("([a-z_0-9]+):([0-9.]+)", t) if m: thresholds[m.group(1)] = CheckThreshold(float(m.group(2))) else: - raise argparse.ArgumentTypeError("invalid threshold format: {}".format(t)) + raise argparse.ArgumentTypeError(f"Invalid threshold format: {t}") # noqa: B904 return thresholds class CheckPVE: + """Check command for Proxmox VE.""" + VERSION = "1.2.2" API_URL = "https://{hostname}:{port}/api2/json/{command}" UNIT_SCALE = { @@ -101,7 +141,8 @@ class CheckPVE: "B": 1, } - def check_output(self): + def check_output(self) -> None: + """Print check command output with perfdata and return code.""" message = self.check_message if self.perfdata: message += self.get_perfdata() @@ -109,19 +150,20 @@ def check_output(self): self.output(self.check_result, message) @staticmethod - def output(rc, message): + def output(rc: CheckState, message: str) -> None: + """Print message to stdout and exit with given return code.""" prefix = rc.name - message = "{} - {}".format(prefix, message) - - print(message) + print(f"{prefix} - {message}") sys.exit(rc.value) - def get_url(self, command): + def get_url(self, command: str) -> str: + """Get API url for specific command.""" return self.API_URL.format( hostname=self.options.api_endpoint, command=command, port=self.options.api_port ) - def request(self, url, method="get", **kwargs): + def request(self, url: str, method: str = "get", **kwargs: Dict) -> Union[Dict, None]: + """Execute request against Proxmox VE API and return json data.""" response = None try: if method == "post": @@ -138,9 +180,10 @@ def request(self, url, method="get", **kwargs): cookies=self.__cookies, headers=self.__headers, params=kwargs.get("params", None), + timeout=CHECK_API_TIMEOUT, ) else: - self.output(CheckState.CRITICAL, "Unsupport request method: {}".format(method)) + self.output(CheckState.CRITICAL, f"Unsupport request method: {method}") except requests.exceptions.ConnectTimeout: self.output(CheckState.UNKNOWN, "Could not connect to PVE API: Connection timeout") except requests.exceptions.SSLError: @@ -154,29 +197,30 @@ def request(self, url, method="get", **kwargs): if response.ok: return response.json()["data"] + + message = "Could not fetch data from API: " + if response.status_code == 401: + message += "Could not connection to PVE API: invalid username or password" + elif response.status_code == 403: + message += ( + "Access denied. Please check if API user has sufficient permissions / " + "the correct role has been assigned." + ) else: - message = "Could not fetch data from API: " - - if response.status_code == 401: - message += "Could not connection to PVE API: invalid username or password" - elif response.status_code == 403: - message += ( - "Access denied. Please check if API user has sufficient permissions / the role has been " - "assigned." - ) - else: - message += "HTTP error code was {}".format(response.status_code) + message += f"HTTP error code was {response.status_code}" - self.output(CheckState.UNKNOWN, message) + self.output(CheckState.UNKNOWN, message) - def get_ticket(self): + def get_ticket(self) -> str: + """Perform login and fetch ticket for further API calls.""" url = self.get_url("access/ticket") data = {"username": self.options.api_user, "password": self.options.api_password} result = self.request(url, "post", data=data) return result["ticket"] - def check_api_value(self, url, message, **kwargs): + def check_api_value(self, url: StopIteration, message: str, **kwargs: Dict) -> None: + """Perform simple threshold based check command.""" result = self.request(url) used = None @@ -197,15 +241,16 @@ def check_api_value(self, url, message, **kwargs): self.add_perfdata(kwargs.get("perfkey", "usage"), used_percent) if self.options.values_mb: - message += " {} {}".format(used, self.options.unit) + message += f" {used} {self.options.unit}" value = used else: - message += " {} {}".format(used_percent, "%") + message += f" {used_percent} %" value = used_percent self.check_thresholds(value, message) - def check_vm_status(self, idx, **kwargs): + def check_vm_status(self, idx: Union[str, int], **kwargs: str) -> None: + """Check status of virtual machine by vmid or name.""" url = self.get_url( "cluster/resources", ) @@ -223,22 +268,21 @@ def check_vm_status(self, idx, **kwargs): vm_type = "LXC" if vm["status"] != expected_state: - self.check_message = "{} '{}' is {} (expected: {})".format( - vm_type, vm["name"], vm["status"], expected_state + self.check_message = ( + f"{vm_type} '{vm['name']}' is {vm['status']} (expected: {expected_state})" ) if not self.options.ignore_vm_status: self.check_result = CheckState.CRITICAL else: if self.options.node and self.options.node != vm["node"]: self.check_message = ( - "{} '{}' is {}, but located on node '{}' instead of '{}'".format( - vm_type, vm["name"], expected_state, vm["node"], self.options.node - ) + f"{vm_type} '{vm['name']}' is {expected_state}, " + f"but located on node '{vm['node']}' instead of '{self.options.node}'" ) self.check_result = CheckState.WARNING else: - self.check_message = "{} '{}' is {} on node '{}'".format( - vm_type, vm["name"], expected_state, vm["node"] + self.check_message = ( + f"{vm_type} '{vm['name']}' is {expected_state} on node '{vm['node']}'" ) if vm["status"] == "running" and not only_status: @@ -275,11 +319,12 @@ def check_vm_status(self, idx, **kwargs): break if not found: - self.check_message = "VM or LXC '{}' not found".format(idx) + self.check_message = f"VM or LXC '{idx}' not found" self.check_result = CheckState.WARNING - def check_disks(self): - url = self.get_url("nodes/{}/disks".format(self.options.node)) + def check_disks(self) -> None: + """Check disk health on specific Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/disks") failed = [] unknown = [] @@ -299,31 +344,26 @@ def check_disks(self): failed.append({"serial": disk["serial"], "device": disk["devpath"]}) if disk["wearout"] != "N/A": - self.add_perfdata("wearout_{}".format(name), disk["wearout"]) + self.add_perfdata(f"wearout_{name}", disk["wearout"]) if failed: - self.check_message = "{} of {} disks failed the health test:\n".format( - len(failed), len(disks) - ) + self.check_message = f"{len(failed)} of {len(disks)} disks failed the health test:\n" for disk in failed: - self.check_message += "- {} with serial '{}'\n".format( - disk["device"], disk["serial"] - ) + self.check_message += f"- {disk['device']} with serial '{disk['serial']}'\n" if unknown: - self.check_message += "{} of {} disks have unknown health status:\n".format( - len(unknown), len(disks) + self.check_message += ( + f"{len(unknown)} of {len(disks)} disks have unknown health status:\n" ) for disk in unknown: - self.check_message += "- {} with serial '{}'\n".format( - disk["device"], disk["serial"] - ) + self.check_message += f"- {disk['device']} with serial '{disk['serial']}'\n" if not failed and not unknown: self.check_message = "All disks are healthy" - def check_replication(self): - url = self.get_url("nodes/{}/replication".format(self.options.node)) + def check_replication(self) -> None: + """Check replication status for either all or one specific vm / container.""" + url = self.get_url(f"nodes/{self.options.node}/replication") if self.options.vmid: data = self.request(url, params={"guest": self.options.vmid}) @@ -342,7 +382,7 @@ def check_replication(self): performance_data.append({"id": job["id"], "duration": job["duration"]}) if len(failed_jobs) > 0: - message = "Failed replication jobs on {}: ".format(self.options.node) + message = f"Failed replication jobs on {self.options.node}: " for job in failed_jobs: message = ( message @@ -353,15 +393,16 @@ def check_replication(self): self.check_message = message self.check_result = CheckState.WARNING else: - self.check_message = "No failed replication jobs on {}".format(self.options.node) + self.check_message = f"No failed replication jobs on {self.options.node}" self.check_result = CheckState.OK if len(performance_data) > 0: for metric in performance_data: self.add_perfdata("duration_" + metric["id"], metric["duration"], unit="s") - def check_services(self): - url = self.get_url("nodes/{}/services".format(self.options.node)) + def check_services(self) -> None: + """Check state of core services on Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/services") data = self.request(url) failed = {} @@ -375,14 +416,16 @@ def check_services(self): if failed: self.check_result = CheckState.CRITICAL - message = "{} services are not running:\n\n".format(len(failed)) - message += "\n".join(["- {} ({}) is not running".format(failed[i], i) for i in failed]) + message = f"{len(failed)} services are not running:\n\n" + for name, description in failed.items(): + message += f"- {description} ({name}) is not running\n" self.check_message = message else: self.check_message = "All services are running" - def check_subscription(self): - url = self.get_url("nodes/{}/subscription".format(self.options.node)) + def check_subscription(self) -> None: + """Check subscription status on Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/subscription") data = self.request(url) if data["status"] == "NotFound": @@ -399,11 +442,9 @@ def check_subscription(self): date_today = datetime.today() delta = (date_expire - date_today).days - message = "{} is valid until {}".format( - subscription_product_name, subscription_due_date - ) - message_warning_critical = "{} will expire in {} days ({})".format( - subscription_product_name, delta, subscription_due_date + message = f"{subscription_product_name} is valid until {subscription_due_date}" + message_warning_critical = ( + "{subscription_product_name} will expire in {delta} days ({subscription_due_date})" ) self.check_thresholds( @@ -414,8 +455,9 @@ def check_subscription(self): lowerValue=True, ) - def check_updates(self): - url = self.get_url("nodes/{}/apt/update".format(self.options.node)) + def check_updates(self) -> None: + """Check for package updates on Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/apt/update") count = len(self.request(url)) if count: @@ -427,7 +469,8 @@ def check_updates(self): else: self.check_message = "System up to date" - def check_cluster_status(self): + def check_cluster_status(self) -> None: + """Check if cluster is operational.""" url = self.get_url("cluster/status") data = self.request(url) @@ -450,11 +493,9 @@ def check_cluster_status(self): if node_count > nodes_online_count: diff = node_count - nodes_online_count self.check_result = CheckState.WARNING - self.check_message = "Cluster '{}' is healthy, but {} node(s) offline'".format( - cluster, diff - ) + self.check_message = f"Cluster '{cluster}' is healthy, but {diff} node(s) offline'" else: - self.check_message = "Cluster '{}' is healthy'".format(cluster) + self.check_message = f"Cluster '{cluster}' is healthy'" self.add_perfdata("nodes_total", node_count, unit="") self.add_perfdata("nodes_online", nodes_online_count, unit="") @@ -462,8 +503,9 @@ def check_cluster_status(self): self.check_result = CheckState.CRITICAL self.check_message = "Cluster is unhealthy - no quorum" - def check_zfs_fragmentation(self, name=None): - url = self.get_url("nodes/{}/disks/zfs".format(self.options.node)) + def check_zfs_fragmentation(self, name: Optional[str] = None) -> None: + """Check all or one specific ZFS pool for fragmentation.""" + url = self.get_url(f"nodes/{self.options.node}/disks/zfs") data = self.request(url) warnings = [] @@ -474,10 +516,10 @@ def check_zfs_fragmentation(self, name=None): if (name is not None and name == pool["name"]) or name is None: key = "fragmentation" if name is None: - key += "_{}".format(pool["name"]) + key += f"_{pool['name']}" self.add_perfdata(key, pool["frag"]) - threshold_name = "fragmentation_{}".format(pool["name"]) + threshold_name = f"fragmentation_{name}" threshold_warning = self.threshold_warning(threshold_name) threshold_critical = self.threshold_critical(threshold_name) @@ -492,7 +534,7 @@ def check_zfs_fragmentation(self, name=None): if not found: self.check_result = CheckState.UNKNOWN - self.check_message = "Could not fetch fragmentation of ZFS pool '{}'".format(name) + self.check_message = f"Could not fetch fragmentation of ZFS pool '{name}'" else: if warnings or critical: value = None @@ -507,36 +549,31 @@ def check_zfs_fragmentation(self, name=None): if name is not None: self.check_message = ( - "Fragmentation of ZFS pool '{}' is above thresholds: {} %".format( - name, value - ) + f"Fragmentation of ZFS pool '{name}' is above thresholds: {value} %" ) else: - message = "{} of {} ZFS pools are above fragmentation thresholds:\n\n".format( - len(warnings) + len(critical), len(data) + pool_above = len(warnings) + len(critical) + message = ( + f"{pool_above} of {len(data)} ZFS pools are above fragmentation " + "thresholds:\n\n" ) message += "\n".join( - [ - "- {} ({} %) is CRITICAL\n".format(pool["name"], pool["frag"]) - for pool in critical - ] + [f"- {pool['name']} ({pool['frag']} %) is CRITICAL\n" for pool in critical] ) message += "\n".join( - [ - "- {} ({} %) is WARNING\n".format(pool["name"], pool["frag"]) - for pool in warnings - ] + [f"- {pool['name']} ({pool['frag']} %) is WARNING\n" for pool in warnings] ) self.check_message = message else: self.check_result = CheckState.OK if name is not None: - self.check_message = "Fragmentation of ZFS pool '{}' is OK".format(name) + self.check_message = f"Fragmentation of ZFS pool '{name}' is OK" else: self.check_message = "Fragmentation of all ZFS pools is OK" - def check_zfs_health(self, name=None): - url = self.get_url("nodes/{}/disks/zfs".format(self.options.node)) + def check_zfs_health(self, name: Optional[str] = None) -> None: + """Check all or one specific ZFS pool for health.""" + url = self.get_url(f"nodes/{self.options.node}/disks/zfs") data = self.request(url) unhealthy = [] @@ -550,26 +587,24 @@ def check_zfs_health(self, name=None): if not found: self.check_result = CheckState.UNKNOWN - self.check_message = "Could not fetch health of ZFS pool '{}'".format(name) + self.check_message = f"Could not fetch health of ZFS pool '{name}'" else: if unhealthy: self.check_result = CheckState.CRITICAL - message = "{} ZFS pools are not healthy:\n\n".format(len(unhealthy)) + message = f"{len(unhealthy)} ZFS pools are not healthy:\n\n" message += "\n".join( - [ - "- {} ({}) is not healthy".format(pool["name"], pool["health"]) - for pool in unhealthy - ] + [f"- {pool['name']} ({pool['health']}) is not healthy" for pool in unhealthy] ) self.check_message = message else: self.check_result = CheckState.OK if name is not None: - self.check_message = "ZFS pool '{}' is healthy".format(name) + self.check_message = f"ZFS pool '{name}' is healthy" else: self.check_message = "All ZFS pools are healthy" - def check_ceph_health(self): + def check_ceph_health(self) -> None: + """Check health of CEPH cluster.""" url = self.get_url("cluster/ceph/status") data = self.request(url) ceph_health = data.get("health", {}) @@ -595,22 +630,21 @@ def check_ceph_health(self): self.check_result = CheckState.UNKNOWN self.check_message = "Ceph Cluster is in unknown state" - def check_storage(self, name): - # check if storage exists - url = self.get_url("nodes/{}/storage".format(self.options.node)) + def check_storage(self, name: str) -> None: + """Check if storage exists and return usage.""" + url = self.get_url(f"nodes/{self.options.node}/storage") data = self.request(url) if not any(s["storage"] == name for s in data): self.check_result = CheckState.CRITICAL - self.check_message = "Storage '{}' doesn't exist on node '{}'".format( - name, self.options.node - ) + self.check_message = f"Storage '{name}' doesn't exist on node '{self.options.node}'" return - url = self.get_url("nodes/{}/storage/{}/status".format(self.options.node, name)) - self.check_api_value(url, "Usage of storage '{}' is".format(name)) + url = self.get_url(f"nodes/{self.options.node}/storage/{name}/status") + self.check_api_value(url, f"Usage of storage '{name}' is") - def check_version(self): + def check_version(self) -> None: + """Check PVE version.""" url = self.get_url("version") data = self.request(url) if not data["version"]: @@ -621,40 +655,44 @@ def check_version(self): ): self.check_result = CheckState.CRITICAL self.check_message = ( - "Current pve version '{}' ({}) is lower than the min. required version '{}'".format( - data["version"], data["repoid"], self.options.min_version - ) + f"Current PVE version '{data['version']}' " + f"({data['repoid']}) is lower than the min. " + f"required version '{self.options.min_version}'" ) else: - self.check_message = "Your pve instance version '{}' ({}) is up to date".format( - data["version"], data["repoid"] + self.check_message = ( + f"Your PVE instance version '{data['version']}' ({data['repoid']}) is up to date" ) - def check_vzdump_backup(self, name=None): + def check_vzdump_backup(self, name: Optional[str] = None) -> None: + """Check for failed vzdump backup jobs.""" tasks_url = self.get_url("cluster/tasks") tasks = self.request(tasks_url) tasks = [t for t in tasks if t["type"] == "vzdump"] + # Filter by node id, if one is provided if self.options.node is not None: tasks = [t for t in tasks if t["node"] == self.options.node] + # Filter by timestamp, if provided delta = self.threshold_critical("delta") if delta is not None: - now = datetime.utcnow().timestamp() + now = datetime.now(datetime.UTC).timestamp() + tasks = [t for t in tasks if not delta.check(now - t["starttime"])] + # absent status = job still running tasks = [t for t in tasks if "status" in t] failed = len([t for t in tasks if t["status"] != "OK"]) success = len(tasks) - failed - self.check_message = "{} backup tasks successful, {} backup tasks failed".format( - success, failed - ) + self.check_message = f"{success} backup tasks successful, {failed} backup tasks failed" + if failed > 0: self.check_result = CheckState.CRITICAL else: self.check_result = CheckState.OK if delta is not None: - self.check_message += " within the last {}s".format(delta.value) + self.check_message += f" within the last {delta.value}s" nbu_url = self.get_url("cluster/backup-info/not-backed-up") not_backed_up = self.request(nbu_url) @@ -663,33 +701,43 @@ def check_vzdump_backup(self, name=None): if self.check_result not in [CheckState.CRITICAL, CheckState.UNKNOWN]: self.check_result = CheckState.WARNING self.check_message += ( - "\nThere are guests not covered by any backup schedule: {}".format(guest_ids) + f"\nThere are guests not covered by any backup schedule: {guest_ids}" ) - def check_memory(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_memory(self) -> None: + """Check memory usage of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "Memory usage is", key="memory") - def check_swap(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_swap(self) -> None: + """Check swap usage of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "Swap usage is", key="swap") - def check_cpu(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_cpu(self) -> None: + """Check cpu usage of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "CPU usage is", key="cpu") - def check_io_wait(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_io_wait(self) -> None: + """Check io wait of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "IO wait is", key="wait", perfkey="wait") - def check_thresholds(self, value, message, **kwargs): + def check_thresholds( + self, + values: Union[Dict[str, Union[int, float]], Union[int, float]], + message: str, + **kwargs: Dict, + ) -> None: + """Check numeric value against threshold for given metric name.""" is_warning = False is_critical = False - if not isinstance(value, dict): - value = {None: value} + if not isinstance(values, dict): + values = {None: values} - for metric, value in value.items(): + for metric, value in values.items(): value_warning = self.threshold_warning(metric) if value_warning is not None: is_warning = is_warning or value_warning.check( @@ -711,23 +759,29 @@ def check_thresholds(self, value, message, **kwargs): else: self.check_message = message - def scale_value(self, value): + def scale_value(self, value: Union[int, float]) -> float: + """Scale value according to unit.""" if self.options.unit in self.UNIT_SCALE: return value / self.UNIT_SCALE[self.options.unit] - else: - assert "wrong unit" - def threshold_warning(self, name: str): + raise ValueError("wrong unit") + + def threshold_warning(self, name: str) -> CheckThreshold: + """Get warning threshold for metric name (empty if none).""" return self.options.threshold_warning.get( name, self.options.threshold_warning.get(None, None) ) - def threshold_critical(self, name: str): + def threshold_critical(self, name: str) -> CheckThreshold: + """Get critical threshold for metric name (empty if none).""" return self.options.threshold_critical.get( name, self.options.threshold_critical.get(None, None) ) - def get_value(self, value, total=None): + def get_value( + self, value: Union[int, float], total: Optional[Union[int, float]] = None + ) -> float: + """Get value scaled or as percentage.""" value = float(value) if total: @@ -737,10 +791,11 @@ def get_value(self, value, total=None): return round(value, 2) - def add_perfdata(self, name, value, **kwargs): + def add_perfdata(self, name: str, value: Union[int, float], **kwargs: Dict) -> None: + """Add metric to perfdata output.""" unit = kwargs.get("unit", "%") - perfdata = "{}={}{}".format(name, value, unit) + perfdata = f"{name}={value}{unit}" threshold_warning = self.threshold_warning(name) threshold_critical = self.threshold_critical(name) @@ -753,21 +808,23 @@ def add_perfdata(self, name, value, **kwargs): if threshold_critical: perfdata += str(threshold_critical.value) - perfdata += ";{}".format(kwargs.get("min", 0)) - perfdata += ";{}".format(kwargs.get("max", "")) + perfdata += ";" + str(kwargs.get("min", 0)) + perfdata += ";" + str(kwargs.get("max", "")) self.perfdata.append(perfdata) - def get_perfdata(self): + def get_perfdata(self) -> str: + """Get perfdata string.""" perfdata = "" - if len(self.perfdata): + if self.perfdata: perfdata = "|" perfdata += " ".join(self.perfdata) return perfdata - def check(self): + def check(self) -> None: + """Execute the real check command.""" self.check_result = CheckState.OK if self.options.mode == "cluster": @@ -817,12 +874,13 @@ def check(self): elif self.options.mode == "backup": self.check_vzdump_backup(self.options.name) else: - message = "Check mode '{}' not known".format(self.options.mode) + message = f"Check mode '{self.options.mode}' not known" self.output(CheckState.UNKNOWN, message) self.check_output() - def parse_args(self): + def parse_args(self) -> None: + """Parse CLI arguments.""" p = argparse.ArgumentParser(description="Check command for PVE hosts via API") api_opts = p.add_argument_group("API Options") @@ -953,14 +1011,20 @@ def parse_args(self): dest="threshold_critical", type=CheckThreshold.threshold_type, default={}, - help="Critical threshold for check value. Mutiple thresholds with name:value,name:value", + help=( + "Critical threshold for check value. " + "Mutiple thresholds with name:value,name:value" + ), ) check_opts.add_argument( "-M", dest="values_mb", action="store_true", default=False, - help="Values are shown in the unit which is set with --unit (if available). Thresholds are also treated in this unit", + help=( + "Values are shown in the unit which is set with --unit (if available). " + "Thresholds are also treated in this unit" + ), ) check_opts.add_argument( "-V", @@ -989,40 +1053,22 @@ def parse_args(self): "backup", ]: p.print_usage() - message = "{}: error: --mode {} requires node name (--node)".format( - p.prog, options.mode - ) + message = f"{p.prog}: error: --mode {options.mode} requires node name (--node)" self.output(CheckState.UNKNOWN, message) if not options.vmid and not options.name and options.mode in ("vm", "vm_status"): p.print_usage() - message = "{}: error: --mode {} requires either vm name (--name) or id (--vmid)".format( - p.prog, options.mode + message = ( + f"{p.prog}: error: --mode {options.mode} requires either " + "vm name (--name) or id (--vmid)", ) self.output(CheckState.UNKNOWN, message) if not options.name and options.mode == "storage": p.print_usage() - message = "{}: error: --mode {} requires storage name (--name)".format( - p.prog, options.mode - ) + message = f"{p.prog}: error: --mode {options.mode} requires storage name (--name)" self.output(CheckState.UNKNOWN, message) - def compare_thresholds(threshold_warning, threshold_critical, comparator): - ok = True - keys = set(list(threshold_warning.keys()) + list(threshold_critical.keys())) - for key in keys: - if (key in threshold_warning and key in threshold_critical) or ( - None in threshold_warning and None in threshold_critical - ): - ok = ok and comparator(threshold_warning[key], threshold_critical[key]) - elif key in threshold_warning and None in threshold_critical: - ok = ok and comparator(threshold_warning[key], threshold_critical[None]) - elif key in threshold_critical and None in threshold_warning: - ok = ok and comparator(threshold_warning[None], threshold_critical[key]) - - return ok - if options.threshold_warning and options.threshold_critical: if options.mode != "subscription" and not compare_thresholds( options.threshold_warning, options.threshold_critical, lambda w, c: w <= c @@ -1035,7 +1081,7 @@ def compare_thresholds(threshold_warning, threshold_critical, comparator): self.options = options - def __init__(self): + def __init__(self) -> None: self.options = {} self.ticket = None self.perfdata = [] @@ -1049,16 +1095,13 @@ def __init__(self): if self.options.api_insecure: # disable urllib3 warning about insecure requests - requests.packages.urllib3.disable_warnings( - requests.packages.urllib3.exceptions.InsecureRequestWarning - ) + requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) if self.options.api_password is not None: self.__cookies["PVEAuthCookie"] = self.get_ticket() elif self.options.api_token is not None: - self.__headers["Authorization"] = "PVEAPIToken={}!{}".format( - self.options.api_user, self.options.api_token - ) + token = f"{self.options.api_user}!{self.options.api_token}" + self.__headers["Authorization"] = f"PVEAPIToken={token}" pve = CheckPVE() From ec8dd61367819dc1a82f45e95fbf28cf9ce467f8 Mon Sep 17 00:00:00 2001 From: Nicolai Buchwitz Date: Sun, 12 May 2024 12:51:59 +0200 Subject: [PATCH 3/6] chore: Bump version to 1.3.0 and update year Signed-off-by: Nicolai Buchwitz --- check_pve.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/check_pve.py b/check_pve.py index bdbb598..895eb2a 100755 --- a/check_pve.py +++ b/check_pve.py @@ -3,9 +3,9 @@ # ------------------------------------------------------------------------------ # check_pve.py - A check plugin for Proxmox Virtual Environment (PVE). -# Copyright (C) 2018-2022 Nicolai Buchwitz +# Copyright (C) 2018-2024 Nicolai Buchwitz # -# Version: 1.2.2 +# Version: 1.3.0 # # ------------------------------------------------------------------------------ # This program is free software; you can redistribute it and/or @@ -129,7 +129,7 @@ def threshold_type(arg: str) -> Dict[str, "CheckThreshold"]: class CheckPVE: """Check command for Proxmox VE.""" - VERSION = "1.2.2" + VERSION = "1.3.0" API_URL = "https://{hostname}:{port}/api2/json/{command}" UNIT_SCALE = { "GB": 10**9, From eeadf51b1fe5acbb33cf9800872aa6513f5776fb Mon Sep 17 00:00:00 2001 From: Nicolai Buchwitz Date: Sun, 12 May 2024 13:08:42 +0200 Subject: [PATCH 4/6] doc: Replace CentOS with Rocky / Alma Unfortunately Red Hat's CentOS is not longer an open alternative to RHEL, this replace it in the setup instructions with Rock / Alma Linux. Signed-off-by: Nicolai Buchwitz --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c49817a..aef71df 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,9 @@ This check command depends on **Python 3** and the following modules: apt install python3 python3-requests python3-packaging ``` -**Installation on Redhat 7 / CentOS 7** +**Installation on Rocky / Alma Linux 9** ``` -yum install python36 python36-requests python36-packaging +yum install python3 python3-requests python3-packaging ``` **Installation on FreeBSD** From 86805df4be170bd47b64df5026426cb2eb83feb2 Mon Sep 17 00:00:00 2001 From: Nicolai Buchwitz Date: Sun, 12 May 2024 13:20:09 +0200 Subject: [PATCH 5/6] fix(subscription): Make status lower case The status field was camel case in earlier versions of the API, but was somehow changed to lower case. To make it compatible with both versions, make the return value from the API lower case before comparison. Signed-off-by: Nicolai Buchwitz --- check_pve.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/check_pve.py b/check_pve.py index 895eb2a..29e63ce 100755 --- a/check_pve.py +++ b/check_pve.py @@ -428,13 +428,13 @@ def check_subscription(self) -> None: url = self.get_url(f"nodes/{self.options.node}/subscription") data = self.request(url) - if data["status"] == "NotFound": + if data["status"].lower() == "notfound": self.check_result = CheckState.WARNING self.check_message = "No valid subscription found" - if data["status"] == "Inactive": + if data["status"].lower() == "inactive": self.check_result = CheckState.CRITICAL self.check_message = "Subscription expired" - elif data["status"] == "Active": + elif data["status"].lower() == "active": subscription_due_date = data["nextduedate"] subscription_product_name = data["productname"] From c8acabd60d9d2bdaee9d1d5d44a013debc7c629f Mon Sep 17 00:00:00 2001 From: Nicolai Buchwitz Date: Sun, 12 May 2024 13:29:44 +0200 Subject: [PATCH 6/6] feat: Make check mode naming more consistent Introduc aliases for check modes to make naming more consistent: - io_wait: io-wait - vm_status: vm-status Signed-off-by: Nicolai Buchwitz --- README.md | 9 ++++----- check_pve.py | 8 +++++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index aef71df..d739c7a 100644 --- a/README.md +++ b/README.md @@ -102,9 +102,9 @@ The ``icinga2`` folder contains the command definition and service examples for ``` usage: check_pve.py [-h] -e API_ENDPOINT [--api-port API_PORT] -u API_USER (-p API_PASSWORD | -t API_TOKEN) [-k] -m - {cluster,version,cpu,memory,swap,storage,io_wait,updates,services,subscription,vm,vm_status,replication,disk-health,ceph-health,zfs-health,zfs-fragmentation,backup} - [-n NODE] [--name NAME] [--vmid VMID] [--expected-vm-status {running,stopped,paused}] [--ignore-vm-status] [--ignore-service NAME] [--ignore-disk NAME] - [-w THRESHOLD_WARNING] [-c THRESHOLD_CRITICAL] [-M] [-V MIN_VERSION] [--unit {GB,MB,KB,GiB,MiB,KiB,B}] + {cluster,version,cpu,memory,swap,storage,io_wait,io-wait,updates,services,subscription,vm,vm_status,vm-status,replication,disk-health,ceph-health,zfs-health,zfs-fragmentation,backup} [-n NODE] [--name NAME] + [--vmid VMID] [--expected-vm-status {running,stopped,paused}] [--ignore-vm-status] [--ignore-service NAME] [--ignore-disk NAME] [-w THRESHOLD_WARNING] [-c THRESHOLD_CRITICAL] [-M] [-V MIN_VERSION] + [--unit {GB,MB,KB,GiB,MiB,KiB,B}] Check command for PVE hosts via API @@ -124,7 +124,7 @@ API Options: -k, --insecure Don't verify HTTPS certificate Check Options: - -m {cluster,version,cpu,memory,swap,storage,io_wait,updates,services,subscription,vm,vm_status,replication,disk-health,ceph-health,zfs-health,zfs-fragmentation,backup}, --mode {cluster,version,cpu,memory,swap,storage,io_wait,updates,services,subscription,vm,vm_status,replication,disk-health,ceph-health,zfs-health,zfs-fragmentation,backup} + -m {cluster,version,cpu,memory,swap,storage,io_wait,io-wait,updates,services,subscription,vm,vm_status,vm-status,replication,disk-health,ceph-health,zfs-health,zfs-fragmentation,backup}, --mode {cluster,version,cpu,memory,swap,storage,io_wait,io-wait,updates,services,subscription,vm,vm_status,vm-status,replication,disk-health,ceph-health,zfs-health,zfs-fragmentation,backup} Mode to use. -n NODE, --node NODE Node to check (necessary for all modes except cluster, version and backup) --name NAME Name of storage, vm, or container @@ -144,7 +144,6 @@ Check Options: The minimal pve version to check for. Any version lower than this will return CRITICAL. --unit {GB,MB,KB,GiB,MiB,KiB,B} Unit which is used for performance data and other values - ``` ## Check examples diff --git a/check_pve.py b/check_pve.py index 29e63ce..209cf6f 100755 --- a/check_pve.py +++ b/check_pve.py @@ -835,7 +835,7 @@ def check(self) -> None: self.check_memory() elif self.options.mode == "swap": self.check_swap() - elif self.options.mode == "io_wait": + elif self.options.mode in ("io_wait", "io-wait"): self.check_io_wait() elif self.options.mode == "disk-health": self.check_disks() @@ -849,8 +849,8 @@ def check(self) -> None: self.check_subscription() elif self.options.mode == "storage": self.check_storage(self.options.name) - elif self.options.mode in ["vm", "vm_status"]: - only_status = self.options.mode == "vm_status" + elif self.options.mode in ["vm", "vm_status", "vm-status"]: + only_status = self.options.mode in ["vm_status", "vm-status"] if self.options.name: idx = self.options.name @@ -936,11 +936,13 @@ def parse_args(self) -> None: "swap", "storage", "io_wait", + "io-wait", "updates", "services", "subscription", "vm", "vm_status", + "vm-status", "replication", "disk-health", "ceph-health",