From cea2b4de872f2ab2580fec08ff31bf387a7d848c Mon Sep 17 00:00:00 2001 From: tanguyvda Date: Mon, 16 Sep 2024 17:26:50 +0200 Subject: [PATCH 1/6] add warp10 metric sc --- .../warp10/warp10-events-apiv2.lua | 595 ++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 centreon-certified/warp10/warp10-events-apiv2.lua diff --git a/centreon-certified/warp10/warp10-events-apiv2.lua b/centreon-certified/warp10/warp10-events-apiv2.lua new file mode 100644 index 00000000..81c7362f --- /dev/null +++ b/centreon-certified/warp10/warp10-events-apiv2.lua @@ -0,0 +1,595 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") +local sc_metrics = require("centreon-stream-connectors-lib.sc_metrics") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "warp10_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/warp10-metrics.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + metric_boundaries = false, + metric_thresholds = false + }, + labels = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.warp10_address = params.warp10_http_address + self.sc_params.params.warp10_api_endpoint = params.warp10_api_endpoint or "/api/v0/update" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + self.sc_params.params.metric_name_regex = params.metric_name_regex or "[.*]" + self.sc_params.params.metric_replacement_character = params.metric_replacement_character or "_" + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end + } + } + + self.format_metric = { + [categories.neb.id] = { + [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, + [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + self.sc_logger:debug("[EventQueue:format_event_host]: call build_metric ") + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + self.sc_logger:debug("[EventQueue:format_event_service]: call build_metric ") + local event = self.sc_event.event + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_host method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_host(metric) + self.sc_logger:debug("[EventQueue:format_metric_host]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_service(metric) + self.sc_logger:debug("[EventQueue:format_metric_service]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +------------------------------------------------------------------------------- +function EventQueue:format_metric_event(metric) + self.sc_logger:debug("[EventQueue:format_metric]: start real format metric ") + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// " .. broker.url_encode(metric.metric_name) .. self:build_labels(metric) .. self:build_attributes(metric) .. " " .. metric.value + } + + self:add() + self.sc_logger:debug("[EventQueue:format_metric]: end real format metric ") +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @param metric {table} a single metric data +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels(metric) + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_metric_info("labels", labels_string, metric) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @param metric {table} a single metric data +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes(metric) + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_metric_info("attributes", attributes_string, metric) + + if metric.min and metric.min == metric.min and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",min=" .. metric.min + end + + if metric.max and metric.max == metric.max and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",max=" .. metric.max + end + + -- add warning to attributes + if metric.warning_high and metric.warning_high == metric.warning_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.warning_high + end + + -- add critical to attributes + if metric.critical_high and metric.critical_high == metric.critical_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.critical_high + end + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_metric_info: build a string with all the required info for a metric. Info that can be either used as a metric label or metric attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common metric info +-- @param metric (table) the metric table +-- @return dimension_string (string) the dimension_string with all the required common metric info +function EventQueue:get_common_metric_info(dimension_type, dimension_string, metric) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + -- add metric instance + if metric.instance ~= "" and self.warp10_format[dimension_type].metric_instance then + dimension_string = dimension_string .. ",metric_instance=" .. broker.url_encode(metric.instance) + end + + -- add metric subinstances + if metric.subinstance[1] and self.warp10_format[dimension_type].metric_subinstances then + dimension_string = dimension_string .. ",metric_subinstances=" .. broker.url_encode(get_metric_subinstances_string(metric)) + end + + -- add metric unit + if metric.uom and self.warp10_format[dimension_type].metric_unit then + dimension_string = dimension_string .. ",uom=" .. broker.url_encode(metric.uom) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +function EventQueue:get_metric_subinstances_string(metric) + local subinstances_string = "" + + for _, subinstance in ipairs(metric.subinstance) do + if subinstances_string == "" then + subinstances_string = subinstance + else + subinstances_string = subinstances_string .. " " .. subinstance + end + end + + return subinstances_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + if not payload then + payload = event.data + else + payload = payload .. "\n" .. event.data + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + queue_metadata.headers = { + "Transfer-Encoding:chunked", + "X-Warp10-Token:" .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: warp10 address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_metrics = sc_metrics.new(event, queue.sc_params.params, queue.sc_common, queue.sc_broker, queue.sc_logger) + queue.sc_event = queue.sc_metrics.sc_event + + if queue.sc_event:is_valid_category() then + if queue.sc_metrics:is_valid_bbdo_element() then + -- format event if it is validated + if queue.sc_metrics:is_valid_metric_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file From 52c4d6bda8d594a339baacf7f6f6696612d3a29c Mon Sep 17 00:00:00 2001 From: tanguyvda Date: Mon, 16 Sep 2024 18:08:40 +0200 Subject: [PATCH 2/6] add warp10 event --- .../warp10/warp10-events-apiv2.lua | 133 +--- .../warp10/warp10-metrics-apiv2.lua | 595 ++++++++++++++++++ 2 files changed, 622 insertions(+), 106 deletions(-) create mode 100644 centreon-certified/warp10/warp10-metrics-apiv2.lua diff --git a/centreon-certified/warp10/warp10-events-apiv2.lua b/centreon-certified/warp10/warp10-events-apiv2.lua index 81c7362f..62f28f39 100644 --- a/centreon-certified/warp10/warp10-events-apiv2.lua +++ b/centreon-certified/warp10/warp10-events-apiv2.lua @@ -13,7 +13,6 @@ local sc_event = require("centreon-stream-connectors-lib.sc_event") local sc_params = require("centreon-stream-connectors-lib.sc_params") local sc_macros = require("centreon-stream-connectors-lib.sc_macros") local sc_flush = require("centreon-stream-connectors-lib.sc_flush") -local sc_metrics = require("centreon-stream-connectors-lib.sc_metrics") -------------------------------------------------------------------------------- -- Classe event_queue @@ -43,7 +42,7 @@ function EventQueue.new(params) self.fail = false -- set up log configuration - local logfile = params.logfile or "/var/log/centreon-broker/warp10-metrics.log" + local logfile = params.logfile or "/var/log/centreon-broker/warp10-events.log" local log_level = params.log_level or 1 -- initiate mandatory objects @@ -62,19 +61,11 @@ function EventQueue.new(params) attributes = { hg = false, sg = false, - metric_instance = false, - metric_subinstances = false, - metric_unit = false, poller = false, - metric_boundaries = false, - metric_thresholds = false }, labels = { hg = false, sg = false, - metric_instance = false, - metric_subinstances = false, - metric_unit = false, poller = false, _cmaas = false } @@ -95,8 +86,6 @@ function EventQueue.new(params) self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 -- just need to url encode the metric name so we don't need to filter out characters -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines - self.sc_params.params.metric_name_regex = params.metric_name_regex or "[.*]" - self.sc_params.params.metric_replacement_character = params.metric_replacement_character or "_" -- apply users params and check syntax of standard ones self.sc_params:param_override(params) @@ -208,6 +197,7 @@ function EventQueue:format_accepted_event() end self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") + self:add() end -------------------------------------------------------------------------------- @@ -215,52 +205,32 @@ end -------------------------------------------------------------------------------- function EventQueue:format_event_host() local event = self.sc_event.event - self.sc_logger:debug("[EventQueue:format_event_host]: call build_metric ") - self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } end -------------------------------------------------------------------------------- ---- EventQueue:format_event_service method -------------------------------------------------------------------------------- function EventQueue:format_event_service() - self.sc_logger:debug("[EventQueue:format_event_service]: call build_metric ") local event = self.sc_event.event - self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) -end --------------------------------------------------------------------------------- ----- EventQueue:format_metric_host method --- @param metric {table} a single metric data --------------------------------------------------------------------------------- -function EventQueue:format_metric_host(metric) - self.sc_logger:debug("[EventQueue:format_metric_host]: call format_metric ") - self:format_metric_event(metric) -end - --------------------------------------------------------------------------------- ----- EventQueue:format_metric_service method --- @param metric {table} a single metric data --------------------------------------------------------------------------------- -function EventQueue:format_metric_service(metric) - self.sc_logger:debug("[EventQueue:format_metric_service]: call format_metric ") - self:format_metric_event(metric) + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } end --------------------------------------------------------------------------------- ----- EventQueue:format_metric_service method --- @param metric {table} a single metric data -------------------------------------------------------------------------------- -function EventQueue:format_metric_event(metric) - self.sc_logger:debug("[EventQueue:format_metric]: start real format metric ") - local event = self.sc_event.event +-- makes no sense at the moment +-- function EventQueue:format_event_downtime() +-- local event = self.sc_event.event - self.sc_event.event.formated_event = { - data = event.last_check .. "000000// " .. broker.url_encode(metric.metric_name) .. self:build_labels(metric) .. self:build_attributes(metric) .. " " .. metric.value - } +-- self.sc_event.event.formated_event = { +-- data = event.last_check .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. event.state +-- } +-- end - self:add() - self.sc_logger:debug("[EventQueue:format_metric]: end real format metric ") -end -------------------------------------------------------------------------------- ---- EventQueue:build_labels method @@ -279,7 +249,7 @@ function EventQueue:build_labels(metric) labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id end - labels_string = self:get_common_metric_info("labels", labels_string, metric) + labels_string = self:get_common_event_info("labels", labels_string) -- param that we need here at Centreon on our own Warp10 if self.warp10_format.labels._cmaas then @@ -300,25 +270,7 @@ function EventQueue:build_attributes(metric) local event = self.sc_event.event local attributes_string = "" - attributes_string = self:get_common_metric_info("attributes", attributes_string, metric) - - if metric.min and metric.min == metric.min and self.warp10_format.attributes.metric_boundaries then - attributes_string = attributes_string .. ",min=" .. metric.min - end - - if metric.max and metric.max == metric.max and self.warp10_format.attributes.metric_boundaries then - attributes_string = attributes_string .. ",max=" .. metric.max - end - - -- add warning to attributes - if metric.warning_high and metric.warning_high == metric.warning_high and self.warp10_format.attributes.metric_thresholds then - attributes_string = attributes_string .. ",warning=" .. metric.warning_high - end - - -- add critical to attributes - if metric.critical_high and metric.critical_high == metric.critical_high and self.warp10_format.attributes.metric_thresholds then - attributes_string = attributes_string .. ",warning=" .. metric.critical_high - end + attributes_string = self:get_common_event_info("attributes", attributes_string) if attributes_string ~= "" then -- we trim because if the string is not empty we have a string that starts with a "," character @@ -328,12 +280,11 @@ function EventQueue:build_attributes(metric) return attributes_string end ---- get_common_metric_info: build a string with all the required info for a metric. Info that can be either used as a metric label or metric attribute +--- get_common_event_info: build a string with all the required info for a event. Info that can be either used as a event label or event attribute -- @param dimension_type (string) can be "attributes" or "labels" --- @param dimension_string (string) a string to which the function will concatenate common metric info --- @param metric (table) the metric table --- @return dimension_string (string) the dimension_string with all the required common metric info -function EventQueue:get_common_metric_info(dimension_type, dimension_string, metric) +-- @param dimension_string (string) a string to which the function will concatenate common event info +-- @return dimension_string (string) the dimension_string with all the required common event info +function EventQueue:get_common_event_info(dimension_type, dimension_string) local event = self.sc_event.event local params = self.sc_params.params @@ -347,21 +298,6 @@ function EventQueue:get_common_metric_info(dimension_type, dimension_string, met dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) end - -- add metric instance - if metric.instance ~= "" and self.warp10_format[dimension_type].metric_instance then - dimension_string = dimension_string .. ",metric_instance=" .. broker.url_encode(metric.instance) - end - - -- add metric subinstances - if metric.subinstance[1] and self.warp10_format[dimension_type].metric_subinstances then - dimension_string = dimension_string .. ",metric_subinstances=" .. broker.url_encode(get_metric_subinstances_string(metric)) - end - - -- add metric unit - if metric.uom and self.warp10_format[dimension_type].metric_unit then - dimension_string = dimension_string .. ",uom=" .. broker.url_encode(metric.uom) - end - return dimension_string end @@ -394,20 +330,6 @@ function EventQueue:get_sg_string(event) return servicegroups_string end -function EventQueue:get_metric_subinstances_string(metric) - local subinstances_string = "" - - for _, subinstance in ipairs(metric.subinstance) do - if subinstances_string == "" then - subinstances_string = subinstance - else - subinstances_string = subinstances_string .. " " .. subinstance - end - end - - return subinstances_string -end - -------------------------------------------------------------------------------- -- EventQueue:add, add an event to the sending queue -------------------------------------------------------------------------------- @@ -540,20 +462,19 @@ function write (event) end -- initiate event object - queue.sc_metrics = sc_metrics.new(event, queue.sc_params.params, queue.sc_common, queue.sc_broker, queue.sc_logger) - queue.sc_event = queue.sc_metrics.sc_event + queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker) if queue.sc_event:is_valid_category() then - if queue.sc_metrics:is_valid_bbdo_element() then + if queue.sc_event:is_valid_element() then -- format event if it is validated - if queue.sc_metrics:is_valid_metric_event() then + if queue.sc_event:is_valid_event() then queue:format_accepted_event() end - --- log why the event has been dropped + --- log why the event has been dropped else queue.sc_logger:debug("dropping event because element is not valid. Event element is: " .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) - end + end else queue.sc_logger:debug("dropping event because category is not valid. Event category is: " .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) diff --git a/centreon-certified/warp10/warp10-metrics-apiv2.lua b/centreon-certified/warp10/warp10-metrics-apiv2.lua new file mode 100644 index 00000000..81c7362f --- /dev/null +++ b/centreon-certified/warp10/warp10-metrics-apiv2.lua @@ -0,0 +1,595 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") +local sc_metrics = require("centreon-stream-connectors-lib.sc_metrics") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "warp10_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/warp10-metrics.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + metric_boundaries = false, + metric_thresholds = false + }, + labels = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.warp10_address = params.warp10_http_address + self.sc_params.params.warp10_api_endpoint = params.warp10_api_endpoint or "/api/v0/update" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + self.sc_params.params.metric_name_regex = params.metric_name_regex or "[.*]" + self.sc_params.params.metric_replacement_character = params.metric_replacement_character or "_" + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end + } + } + + self.format_metric = { + [categories.neb.id] = { + [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, + [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + self.sc_logger:debug("[EventQueue:format_event_host]: call build_metric ") + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + self.sc_logger:debug("[EventQueue:format_event_service]: call build_metric ") + local event = self.sc_event.event + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_host method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_host(metric) + self.sc_logger:debug("[EventQueue:format_metric_host]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_service(metric) + self.sc_logger:debug("[EventQueue:format_metric_service]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +------------------------------------------------------------------------------- +function EventQueue:format_metric_event(metric) + self.sc_logger:debug("[EventQueue:format_metric]: start real format metric ") + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// " .. broker.url_encode(metric.metric_name) .. self:build_labels(metric) .. self:build_attributes(metric) .. " " .. metric.value + } + + self:add() + self.sc_logger:debug("[EventQueue:format_metric]: end real format metric ") +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @param metric {table} a single metric data +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels(metric) + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_metric_info("labels", labels_string, metric) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @param metric {table} a single metric data +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes(metric) + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_metric_info("attributes", attributes_string, metric) + + if metric.min and metric.min == metric.min and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",min=" .. metric.min + end + + if metric.max and metric.max == metric.max and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",max=" .. metric.max + end + + -- add warning to attributes + if metric.warning_high and metric.warning_high == metric.warning_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.warning_high + end + + -- add critical to attributes + if metric.critical_high and metric.critical_high == metric.critical_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.critical_high + end + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_metric_info: build a string with all the required info for a metric. Info that can be either used as a metric label or metric attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common metric info +-- @param metric (table) the metric table +-- @return dimension_string (string) the dimension_string with all the required common metric info +function EventQueue:get_common_metric_info(dimension_type, dimension_string, metric) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + -- add metric instance + if metric.instance ~= "" and self.warp10_format[dimension_type].metric_instance then + dimension_string = dimension_string .. ",metric_instance=" .. broker.url_encode(metric.instance) + end + + -- add metric subinstances + if metric.subinstance[1] and self.warp10_format[dimension_type].metric_subinstances then + dimension_string = dimension_string .. ",metric_subinstances=" .. broker.url_encode(get_metric_subinstances_string(metric)) + end + + -- add metric unit + if metric.uom and self.warp10_format[dimension_type].metric_unit then + dimension_string = dimension_string .. ",uom=" .. broker.url_encode(metric.uom) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +function EventQueue:get_metric_subinstances_string(metric) + local subinstances_string = "" + + for _, subinstance in ipairs(metric.subinstance) do + if subinstances_string == "" then + subinstances_string = subinstance + else + subinstances_string = subinstances_string .. " " .. subinstance + end + end + + return subinstances_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + if not payload then + payload = event.data + else + payload = payload .. "\n" .. event.data + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + queue_metadata.headers = { + "Transfer-Encoding:chunked", + "X-Warp10-Token:" .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: warp10 address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_metrics = sc_metrics.new(event, queue.sc_params.params, queue.sc_common, queue.sc_broker, queue.sc_logger) + queue.sc_event = queue.sc_metrics.sc_event + + if queue.sc_event:is_valid_category() then + if queue.sc_metrics:is_valid_bbdo_element() then + -- format event if it is validated + if queue.sc_metrics:is_valid_metric_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file From bea8ceaf45dcddd86c80dd32dee0ee97bcb3a38b Mon Sep 17 00:00:00 2001 From: tanguyvda Date: Tue, 17 Sep 2024 20:41:49 +0200 Subject: [PATCH 3/6] try to add a perf analysis --- .../sc_event.lua | 9 +++- .../sc_flush.lua | 52 ++++++++++++++++++- .../sc_params.lua | 3 ++ 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/modules/centreon-stream-connectors-lib/sc_event.lua b/modules/centreon-stream-connectors-lib/sc_event.lua index 3df94c61..9ac41c93 100644 --- a/modules/centreon-stream-connectors-lib/sc_event.lua +++ b/modules/centreon-stream-connectors-lib/sc_event.lua @@ -27,6 +27,8 @@ function sc_event.new(event, params, common, logger, broker) self.sc_broker = broker self.bbdo_version = self.sc_common:get_bbdo_version() + _PERFORMANCE_ANALYSIS.received_events = _PERFORMANCE_ANALYSIS.received_events + 1 + self.event.cache = {} setmetatable(self, { __index = ScEvent }) @@ -78,13 +80,18 @@ function ScEvent:is_valid_event() -- drop the event if it was not valid. Custom code do not have to work on already invalid events if not is_valid_event then + _PERFORMANCE_ANALYSIS.dropped_events = _PERFORMANCE_ANALYSIS.dropped_events + 1 return is_valid_event end -- run custom code if self.params.custom_code and type(self.params.custom_code) == "function" then self, is_valid_event = self.params.custom_code(self) - end + end + + if not is_valid_event then + _PERFORMANCE_ANALYSIS.dropped_events = _PERFORMANCE_ANALYSIS.dropped_events + 1 + end return is_valid_event end diff --git a/modules/centreon-stream-connectors-lib/sc_flush.lua b/modules/centreon-stream-connectors-lib/sc_flush.lua index 71ab72d3..6cf48771 100644 --- a/modules/centreon-stream-connectors-lib/sc_flush.lua +++ b/modules/centreon-stream-connectors-lib/sc_flush.lua @@ -17,6 +17,48 @@ local ScFlush = {} function sc_flush.new(params, logger) local self = {} + _PERFORMANCE_ANALYSIS = { + received_events = 0, + dropped_events = 0, + sent_events = 0, + number_of_flush = 0, + number_of_max_buffer_size_reached = 0, + last_log_time = os.time(), + log_analysis = function () + if _PERFORMANCE_ANALYSIS.last_log_time + params.analysis_frequency <= os.time() then + return + end + + self.sc_logger:notice("[ScFlush:PERFORMANCE_ANALYSIS]: " .. _PERFORMANCE_ANALYSIS:get_analysis_result()) + _PERFORMANCE_ANALYSIS:reset() + end, + get_analysis_result = function () + local analysis = nil + local optimal_max_buffer_size, optimal_max_all_queues_age + + -- max buffer size is too big and never reached + if _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached == 0 then + -- 3 events envoyés, max 5, number of flush = 3 + optimal_max_buffer_size = + + end, + increase_sent_counters = function () + _PERFORMANCE_ANALYSIS.sent_events = _PERFORMANCE_ANALYSIS.sent_events + counter + _PERFORMANCE_ANALYSIS.number_of_flush = _PERFORMANCE_ANALYSIS.number_of_flush + 1 + if params.max_buffer_size % _PERFORMANCE_ANALYSIS.sent_events == 0 then + _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached = _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached + 1 + end + end, + reset = function () + _PERFORMANCE_ANALYSIS.received_events = 0 + _PERFORMANCE_ANALYSIS.dropped_events = 0 + _PERFORMANCE_ANALYSIS.sent_events = 0 + _PERFORMANCE_ANALYSIS.number_of_flush = 0 + _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached = 0 + _PERFORMANCE_ANALYSIS.last_log_time = os.time() + end + } + -- create a default logger if it is not provided self.sc_logger = logger if not self.sc_logger then @@ -122,7 +164,7 @@ function ScFlush:get_queues_size() return queues_size end ---- flush_mixed_payload: flush a payload that contains various type of events (services mixed hosts for example) +--- flush_mixed_payload: flush a payload that contains various type of events (services mixed with hosts for example) -- @return boolean (boolean) true or false depending on the success of the operation function ScFlush:flush_mixed_payload(build_payload_method, send_method) local payload = nil @@ -138,6 +180,8 @@ function ScFlush:flush_mixed_payload(build_payload_method, send_method) -- send events if max buffer size is reached if counter >= self.params.max_buffer_size then + _PERFORMANCE_ANALYSIS:increase_sent_counters() + if not self:flush_payload(send_method, payload, self.queues.global_queues_metadata) then return false end @@ -149,6 +193,8 @@ function ScFlush:flush_mixed_payload(build_payload_method, send_method) end end + _PERFORMANCE_ANALYSIS:increase_sent_counters() + -- we need to empty all queues to not mess with broker retention if not self:flush_payload(send_method, payload, self.queues.global_queues_metadata) then return false @@ -174,6 +220,8 @@ function ScFlush:flush_homogeneous_payload(build_payload_method, send_method) -- send events if max buffer size is reached if counter >= self.params.max_buffer_size then + _PERFORMANCE_ANALYSIS:increase_sent_counters() + if not self:flush_payload( send_method, payload, @@ -188,6 +236,8 @@ function ScFlush:flush_homogeneous_payload(build_payload_method, send_method) end end + _PERFORMANCE_ANALYSIS:increase_sent_counters() + -- make sure there are no events left inside a specific queue if not self:flush_payload( send_method, diff --git a/modules/centreon-stream-connectors-lib/sc_params.lua b/modules/centreon-stream-connectors-lib/sc_params.lua index 0fe62e07..d4ab2032 100644 --- a/modules/centreon-stream-connectors-lib/sc_params.lua +++ b/modules/centreon-stream-connectors-lib/sc_params.lua @@ -125,6 +125,9 @@ function sc_params.new(common, logger) log_level = "", log_curl_commands = 0, + -- performance analysis parameters + analysis_frequency = 300, + -- metric metric_name_regex = "no_forbidden_character_to_replace", metric_replacement_character = "_", From 134c77201919129c2a0c0ea0f96405971f6e8f3b Mon Sep 17 00:00:00 2001 From: tanguyvda Date: Wed, 6 Nov 2024 07:58:44 +0100 Subject: [PATCH 4/6] start centreon-report --- .../centreon-report-events-apiv2.lua | 516 +++++++++++++++ .../centreon-report-metrics-apiv2.lua | 609 ++++++++++++++++++ 2 files changed, 1125 insertions(+) create mode 100644 centreon-certified/centreon-report/centreon-report-events-apiv2.lua create mode 100644 centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua diff --git a/centreon-certified/centreon-report/centreon-report-events-apiv2.lua b/centreon-certified/centreon-report/centreon-report-events-apiv2.lua new file mode 100644 index 00000000..62f28f39 --- /dev/null +++ b/centreon-certified/centreon-report/centreon-report-events-apiv2.lua @@ -0,0 +1,516 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "warp10_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/warp10-events.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + poller = false, + }, + labels = { + hg = false, + sg = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.warp10_address = params.warp10_http_address + self.sc_params.params.warp10_api_endpoint = params.warp10_api_endpoint or "/api/v0/update" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end + } + } + + self.format_metric = { + [categories.neb.id] = { + [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, + [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") + self:add() +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } +end + +-- makes no sense at the moment +-- function EventQueue:format_event_downtime() +-- local event = self.sc_event.event + +-- self.sc_event.event.formated_event = { +-- data = event.last_check .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. event.state +-- } +-- end + + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @param metric {table} a single metric data +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels(metric) + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_event_info("labels", labels_string) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @param metric {table} a single metric data +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes(metric) + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_event_info("attributes", attributes_string) + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_event_info: build a string with all the required info for a event. Info that can be either used as a event label or event attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common event info +-- @return dimension_string (string) the dimension_string with all the required common event info +function EventQueue:get_common_event_info(dimension_type, dimension_string) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + if not payload then + payload = event.data + else + payload = payload .. "\n" .. event.data + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + queue_metadata.headers = { + "Transfer-Encoding:chunked", + "X-Warp10-Token:" .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: warp10 address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker) + + if queue.sc_event:is_valid_category() then + if queue.sc_event:is_valid_element() then + -- format event if it is validated + if queue.sc_event:is_valid_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file diff --git a/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua new file mode 100644 index 00000000..b7fa3c39 --- /dev/null +++ b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua @@ -0,0 +1,609 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") +local sc_metrics = require("centreon-stream-connectors-lib.sc_metrics") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "centreon_report_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/centreon-report-metrics.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + metric_boundaries = false, + metric_thresholds = false + }, + labels = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.warp10_address = params.centreon_report_http_address + self.sc_params.params.warp10_api_endpoint = params.centreon_report_api_endpoint or "/v1" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + self.sc_params.params.metric_name_regex = params.metric_name_regex or "[.*]" + self.sc_params.params.metric_replacement_character = params.metric_replacement_character or "_" + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end + } + } + + self.format_metric = { + [categories.neb.id] = { + [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, + [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + self.sc_logger:debug("[EventQueue:format_event_host]: call build_metric ") + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + self.sc_logger:debug("[EventQueue:format_event_service]: call build_metric ") + local event = self.sc_event.event + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_host method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_host(metric) + self.sc_logger:debug("[EventQueue:format_metric_host]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_service(metric) + self.sc_logger:debug("[EventQueue:format_metric_service]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +------------------------------------------------------------------------------- +function EventQueue:format_metric_event(metric) + self.sc_logger:debug("[EventQueue:format_metric]: start real format metric ") + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// " .. broker.url_encode(metric.metric_name) .. self:build_labels(metric) .. self:build_attributes(metric) .. " " .. metric.value + } + + self:add() + self.sc_logger:debug("[EventQueue:format_metric]: end real format metric ") +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @param metric {table} a single metric data +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels(metric) + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_metric_info("labels", labels_string, metric) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @param metric {table} a single metric data +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes(metric) + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_metric_info("attributes", attributes_string, metric) + + if metric.min and metric.min == metric.min and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",min=" .. metric.min + end + + if metric.max and metric.max == metric.max and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",max=" .. metric.max + end + + -- add warning to attributes + if metric.warning_high and metric.warning_high == metric.warning_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.warning_high + end + + -- add critical to attributes + if metric.critical_high and metric.critical_high == metric.critical_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.critical_high + end + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_metric_info: build a string with all the required info for a metric. Info that can be either used as a metric label or metric attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common metric info +-- @param metric (table) the metric table +-- @return dimension_string (string) the dimension_string with all the required common metric info +function EventQueue:get_common_metric_info(dimension_type, dimension_string, metric) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + -- add metric instance + if metric.instance ~= "" and self.warp10_format[dimension_type].metric_instance then + dimension_string = dimension_string .. ",metric_instance=" .. broker.url_encode(metric.instance) + end + + -- add metric subinstances + if metric.subinstance[1] and self.warp10_format[dimension_type].metric_subinstances then + dimension_string = dimension_string .. ",metric_subinstances=" .. broker.url_encode(get_metric_subinstances_string(metric)) + end + + -- add metric unit + if metric.uom and self.warp10_format[dimension_type].metric_unit then + dimension_string = dimension_string .. ",uom=" .. broker.url_encode(metric.uom) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +function EventQueue:get_metric_subinstances_string(metric) + local subinstances_string = "" + + for _, subinstance in ipairs(metric.subinstance) do + if subinstances_string == "" then + subinstances_string = subinstance + else + subinstances_string = subinstances_string .. " " .. subinstance + end + end + + return subinstances_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + local data_table = { + type = "metric", + timestamp = os.date("!%Y-%m-%dT%TZ",t), + version = "3.0.0", + contentType = "warp10/plaintext", + content = event.data + } + + if not payload then + payload = { + version = "3.0.0", + data = { + data_table + } + } + else + payload = table.insert(payload.data, data_table) + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + payload = broker.json_encode(payload) + local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + queue_metadata.headers = { + "Content-Type: application/vnd.centreon+json", + "x-api-key: " .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: centreon-report address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_metrics = sc_metrics.new(event, queue.sc_params.params, queue.sc_common, queue.sc_broker, queue.sc_logger) + queue.sc_event = queue.sc_metrics.sc_event + + if queue.sc_event:is_valid_category() then + if queue.sc_metrics:is_valid_bbdo_element() then + -- format event if it is validated + if queue.sc_metrics:is_valid_metric_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file From a0f4205504255c94de6827c088fff9d47cb235a5 Mon Sep 17 00:00:00 2001 From: tanguyvda Date: Wed, 6 Nov 2024 16:50:05 +0100 Subject: [PATCH 5/6] fix payload and version --- .../centreon-report/centreon-report-metrics-apiv2.lua | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua index b7fa3c39..941b9dc0 100644 --- a/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua +++ b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua @@ -86,6 +86,7 @@ function EventQueue.new(params) self.sc_params.params.warp10_api_endpoint = params.centreon_report_api_endpoint or "/v1" self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.version = params.version or "1.0.0" self.sc_params.params.cmaas = params.cmaas or "centreon" self.sc_params.params.accepted_categories = params.accepted_categories or "neb" self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" @@ -435,21 +436,21 @@ end function EventQueue:build_payload(payload, event) local data_table = { type = "metric", - timestamp = os.date("!%Y-%m-%dT%TZ",t), - version = "3.0.0", + timestamp = os.date("!%Y-%m-%dT%TZ", t), + version = self.sc_params.params.api_version, contentType = "warp10/plaintext", content = event.data } if not payload then payload = { - version = "3.0.0", + version = self.sc_params.params.api_version, data = { data_table } } else - payload = table.insert(payload.data, data_table) + table.insert(payload.data, data_table) end return payload From 04ff4d04d2fc2c34f6b5d12771e22b03d61772a8 Mon Sep 17 00:00:00 2001 From: tanguyvda Date: Wed, 13 Nov 2024 16:04:33 +0100 Subject: [PATCH 6/6] add centreon-report-events --- .../centreon-report-events-apiv2.lua | 132 ++++++++++++++---- .../centreon-report-metrics-apiv2.lua | 6 +- .../sc_event.lua | 4 +- 3 files changed, 109 insertions(+), 33 deletions(-) diff --git a/centreon-certified/centreon-report/centreon-report-events-apiv2.lua b/centreon-certified/centreon-report/centreon-report-events-apiv2.lua index 62f28f39..6edf48c0 100644 --- a/centreon-certified/centreon-report/centreon-report-events-apiv2.lua +++ b/centreon-certified/centreon-report/centreon-report-events-apiv2.lua @@ -36,13 +36,13 @@ function EventQueue.new(params) local mandatory_parameters = { "api_token", - "warp10_http_address" + "centreon_report_http_address" } self.fail = false -- set up log configuration - local logfile = params.logfile or "/var/log/centreon-broker/warp10-events.log" + local logfile = params.logfile or "/var/log/centreon-broker/centreon-report-events.log" local log_level = params.log_level or 1 -- initiate mandatory objects @@ -62,8 +62,10 @@ function EventQueue.new(params) hg = false, sg = false, poller = false, + bv = false, }, labels = { + bv = false, hg = false, sg = false, poller = false, @@ -73,13 +75,13 @@ function EventQueue.new(params) -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs self.sc_params.params.api_token = params.api_token - self.sc_params.params.warp10_address = params.warp10_http_address - self.sc_params.params.warp10_api_endpoint = params.warp10_api_endpoint or "/api/v0/update" + self.sc_params.params.centreon_report_http_address = params.centreon_report_http_address + self.sc_params.params.centreon_report_api_endpoint = params.centreon_report_api_endpoint or "/v1" self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" self.sc_params.params.cmaas = params.cmaas or "centreon" self.sc_params.params.accepted_categories = params.accepted_categories or "neb" - self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status,downtime" self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 self.sc_params.params.hard_only = params.hard_only or 0 self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 @@ -106,14 +108,12 @@ function EventQueue.new(params) self.format_event = { [categories.neb.id] = { [elements.host_status.id] = function () return self:format_event_host() end, - [elements.service_status.id] = function () return self:format_event_service() end - } - } - - self.format_metric = { - [categories.neb.id] = { - [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, - [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + [elements.service_status.id] = function () return self:format_event_service() end, + [elements.downtime.id] = function () return self:format_event_downtime() end, + [elements.acknowledgement.id] = function () return self:format_event_acknowledgement() end + }, + [categories.bam.id] = { + [elements.ba_status.id] = function () return self:format_event_ba() end } } @@ -222,22 +222,69 @@ function EventQueue:format_event_service() } end --- makes no sense at the moment --- function EventQueue:format_event_downtime() --- local event = self.sc_event.event +function EventQueue:format_event_downtime() + local event = self.sc_event.event + -- 0 = end of downtime, 1 = start of downtime + local downtime_step = 0 + local event_time = event.deletion_time + + if event.downtime_processing_step == "start" then + downtime_step = 1 + event_time = event.actual_start_time + end + + self.sc_event.event.formated_event = { + data = event_time .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. downtime_step + } +end + +function EventQueue:format_event_acknowledgement() + local event = self.sc_event.event + -- 0 = end of ack, 1 = start of ack + local ack_step = 0 + local event_time = event.deletion_time --- self.sc_event.event.formated_event = { --- data = event.last_check .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. event.state --- } --- end + if not event.deletion_time or event.deletion_time == 0 then + event_time = event.entry_time + ack_step = 1 + end + + self.sc_event.event.formated_event = { + data = event_time .. "000000// centreon:acknowledgement" .. self:build_labels() .. self:build_attributes() .. " " .. ack_step + } +end + +function EventQueue:format_event_ba() + local event = self.sc_event.event + + local attributes = "" + local labels = "ba_name=" .. event.cache.ba.ba_name .. ",resource_type=BA,ba_id=" .. event.ba_id + + -- add business views name in attributes if asked to + if event.cache.bvs and event.cache.bvs[1] and self.warp10_format.attributes.bv then + attributes = "{bvs=" .. broker.url_encode(self:get_bv_string(event)) .. "}" + end + + if self.warp10_format.labels._cmaas then + labels = labels .. ",_cmaas=" .. broker.url_encode(self.sc_params.params.cmaas) + end + + -- add business views name in labels if asked to + if event.cache.bvs and event.cache.bvs[1] and self.warp10_format.labels.bv then + labels = labels .. ",bvs=" .. broker.url_encode(self:get_bv_string(event)) + end + + self.sc_event.event.formated_event = { + data = event.last_state_change .. "000000// centreon:ba{" .. labels .. "}" .. attributes .. " " .. event.state + } +end -------------------------------------------------------------------------------- ---- EventQueue:build_labels method --- @param metric {table} a single metric data -- @return labels_string {string} a string with all labels -------------------------------------------------------------------------------- -function EventQueue:build_labels(metric) +function EventQueue:build_labels() local event = self.sc_event.event local params = self.sc_params.params local resource_type = "host" @@ -263,10 +310,9 @@ end -------------------------------------------------------------------------------- ---- EventQueue:build_attributes method --- @param metric {table} a single metric data -- @return tags {table} a string with all attributes -------------------------------------------------------------------------------- -function EventQueue:build_attributes(metric) +function EventQueue:build_attributes() local event = self.sc_event.event local attributes_string = "" @@ -330,6 +376,20 @@ function EventQueue:get_sg_string(event) return servicegroups_string end +function EventQueue:get_bv_string(event) + local bvs_string = "" + + for _, bv in ipairs(event.cache.bvs) do + if bvss_string == "" then + bvs_string = bvs.bv_name + else + bvs_string = bvs_string .. " " .. bv.bv_name + end + end + + return bvs_string +end + -------------------------------------------------------------------------------- -- EventQueue:add, add an event to the sending queue -------------------------------------------------------------------------------- @@ -355,10 +415,23 @@ end -- @return payload {string} json encoded string -------------------------------------------------------------------------------- function EventQueue:build_payload(payload, event) + local data_table = { + type = "metric", + timestamp = os.date("!%Y-%m-%dT%TZ", t), + version = self.sc_params.params.api_version, + contentType = "warp10/plaintext", + content = event.data + } + if not payload then - payload = event.data + payload = { + version = self.sc_params.params.api_version, + data = { + data_table + } + } else - payload = payload .. "\n" .. event.data + table.insert(payload.data, data_table) end return payload @@ -367,10 +440,11 @@ end function EventQueue:send_data(payload, queue_metadata) self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") - local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + payload = broker.json_encode(payload) + local url = tostring(self.sc_params.params.centreon_report_http_address) .. tostring(self.sc_params.params.centreon_report_api_endpoint) queue_metadata.headers = { - "Transfer-Encoding:chunked", - "X-Warp10-Token:" .. self.sc_params.params.api_token + "Content-Type: application/vnd.centreon+json", + "x-api-key: " .. self.sc_params.params.api_token } self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) diff --git a/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua index 941b9dc0..1656275e 100644 --- a/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua +++ b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua @@ -82,8 +82,8 @@ function EventQueue.new(params) -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs self.sc_params.params.api_token = params.api_token - self.sc_params.params.warp10_address = params.centreon_report_http_address - self.sc_params.params.warp10_api_endpoint = params.centreon_report_api_endpoint or "/v1" + self.sc_params.params.centreon_report_http_address = params.centreon_report_http_address + self.sc_params.params.centreon_report_api_endpoint = params.centreon_report_api_endpoint or "/v1" self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" self.sc_params.params.version = params.version or "1.0.0" @@ -460,7 +460,7 @@ function EventQueue:send_data(payload, queue_metadata) self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") payload = broker.json_encode(payload) - local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + local url = tostring(self.sc_params.params.centreon_report_http_address) .. tostring(self.sc_params.params.centreon_report_api_endpoint) queue_metadata.headers = { "Content-Type: application/vnd.centreon+json", "x-api-key: " .. self.sc_params.params.api_token diff --git a/modules/centreon-stream-connectors-lib/sc_event.lua b/modules/centreon-stream-connectors-lib/sc_event.lua index 9ac41c93..f8d497e4 100644 --- a/modules/centreon-stream-connectors-lib/sc_event.lua +++ b/modules/centreon-stream-connectors-lib/sc_event.lua @@ -1021,7 +1021,7 @@ function ScEvent:is_valid_acknowledgement_event() return true end ---- is_vaid_downtime_event: check if the event is a valid downtime event +--- is_valid_downtime_event: check if the event is a valid downtime event -- return true|false (boolean) function ScEvent:is_valid_downtime_event() -- return false if the event is one of all the "fake" start or end downtime event received from broker @@ -1329,6 +1329,7 @@ function ScEvent:is_valid_downtime_event_start() end -- end compat patch + self.event.downtime_processing_step = "start" return true end @@ -1347,6 +1348,7 @@ function ScEvent:is_valid_downtime_event_end() end -- end compat patch + self.event.downtime_processing_step = "end" return true end