diff --git a/centreon-certified/centreon-report/centreon-report-events-apiv2.lua b/centreon-certified/centreon-report/centreon-report-events-apiv2.lua new file mode 100644 index 00000000..6edf48c0 --- /dev/null +++ b/centreon-certified/centreon-report/centreon-report-events-apiv2.lua @@ -0,0 +1,590 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "centreon_report_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/centreon-report-events.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + poller = false, + bv = false, + }, + labels = { + bv = false, + hg = false, + sg = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.centreon_report_http_address = params.centreon_report_http_address + self.sc_params.params.centreon_report_api_endpoint = params.centreon_report_api_endpoint or "/v1" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status,downtime" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end, + [elements.downtime.id] = function () return self:format_event_downtime() end, + [elements.acknowledgement.id] = function () return self:format_event_acknowledgement() end + }, + [categories.bam.id] = { + [elements.ba_status.id] = function () return self:format_event_ba() end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") + self:add() +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } +end + +function EventQueue:format_event_downtime() + local event = self.sc_event.event + -- 0 = end of downtime, 1 = start of downtime + local downtime_step = 0 + local event_time = event.deletion_time + + if event.downtime_processing_step == "start" then + downtime_step = 1 + event_time = event.actual_start_time + end + + self.sc_event.event.formated_event = { + data = event_time .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. downtime_step + } +end + +function EventQueue:format_event_acknowledgement() + local event = self.sc_event.event + -- 0 = end of ack, 1 = start of ack + local ack_step = 0 + local event_time = event.deletion_time + + if not event.deletion_time or event.deletion_time == 0 then + event_time = event.entry_time + ack_step = 1 + end + + self.sc_event.event.formated_event = { + data = event_time .. "000000// centreon:acknowledgement" .. self:build_labels() .. self:build_attributes() .. " " .. ack_step + } +end + +function EventQueue:format_event_ba() + local event = self.sc_event.event + + local attributes = "" + local labels = "ba_name=" .. event.cache.ba.ba_name .. ",resource_type=BA,ba_id=" .. event.ba_id + + -- add business views name in attributes if asked to + if event.cache.bvs and event.cache.bvs[1] and self.warp10_format.attributes.bv then + attributes = "{bvs=" .. broker.url_encode(self:get_bv_string(event)) .. "}" + end + + if self.warp10_format.labels._cmaas then + labels = labels .. ",_cmaas=" .. broker.url_encode(self.sc_params.params.cmaas) + end + + -- add business views name in labels if asked to + if event.cache.bvs and event.cache.bvs[1] and self.warp10_format.labels.bv then + labels = labels .. ",bvs=" .. broker.url_encode(self:get_bv_string(event)) + end + + self.sc_event.event.formated_event = { + data = event.last_state_change .. "000000// centreon:ba{" .. labels .. "}" .. attributes .. " " .. event.state + } +end + + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels() + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_event_info("labels", labels_string) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes() + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_event_info("attributes", attributes_string) + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_event_info: build a string with all the required info for a event. Info that can be either used as a event label or event attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common event info +-- @return dimension_string (string) the dimension_string with all the required common event info +function EventQueue:get_common_event_info(dimension_type, dimension_string) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +function EventQueue:get_bv_string(event) + local bvs_string = "" + + for _, bv in ipairs(event.cache.bvs) do + if bvss_string == "" then + bvs_string = bvs.bv_name + else + bvs_string = bvs_string .. " " .. bv.bv_name + end + end + + return bvs_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + local data_table = { + type = "metric", + timestamp = os.date("!%Y-%m-%dT%TZ", t), + version = self.sc_params.params.api_version, + contentType = "warp10/plaintext", + content = event.data + } + + if not payload then + payload = { + version = self.sc_params.params.api_version, + data = { + data_table + } + } + else + table.insert(payload.data, data_table) + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + payload = broker.json_encode(payload) + local url = tostring(self.sc_params.params.centreon_report_http_address) .. tostring(self.sc_params.params.centreon_report_api_endpoint) + queue_metadata.headers = { + "Content-Type: application/vnd.centreon+json", + "x-api-key: " .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: warp10 address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker) + + if queue.sc_event:is_valid_category() then + if queue.sc_event:is_valid_element() then + -- format event if it is validated + if queue.sc_event:is_valid_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file diff --git a/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua new file mode 100644 index 00000000..1656275e --- /dev/null +++ b/centreon-certified/centreon-report/centreon-report-metrics-apiv2.lua @@ -0,0 +1,610 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") +local sc_metrics = require("centreon-stream-connectors-lib.sc_metrics") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "centreon_report_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/centreon-report-metrics.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + metric_boundaries = false, + metric_thresholds = false + }, + labels = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.centreon_report_http_address = params.centreon_report_http_address + self.sc_params.params.centreon_report_api_endpoint = params.centreon_report_api_endpoint or "/v1" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.version = params.version or "1.0.0" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + self.sc_params.params.metric_name_regex = params.metric_name_regex or "[.*]" + self.sc_params.params.metric_replacement_character = params.metric_replacement_character or "_" + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end + } + } + + self.format_metric = { + [categories.neb.id] = { + [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, + [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + self.sc_logger:debug("[EventQueue:format_event_host]: call build_metric ") + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + self.sc_logger:debug("[EventQueue:format_event_service]: call build_metric ") + local event = self.sc_event.event + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_host method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_host(metric) + self.sc_logger:debug("[EventQueue:format_metric_host]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_service(metric) + self.sc_logger:debug("[EventQueue:format_metric_service]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +------------------------------------------------------------------------------- +function EventQueue:format_metric_event(metric) + self.sc_logger:debug("[EventQueue:format_metric]: start real format metric ") + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// " .. broker.url_encode(metric.metric_name) .. self:build_labels(metric) .. self:build_attributes(metric) .. " " .. metric.value + } + + self:add() + self.sc_logger:debug("[EventQueue:format_metric]: end real format metric ") +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @param metric {table} a single metric data +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels(metric) + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_metric_info("labels", labels_string, metric) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @param metric {table} a single metric data +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes(metric) + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_metric_info("attributes", attributes_string, metric) + + if metric.min and metric.min == metric.min and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",min=" .. metric.min + end + + if metric.max and metric.max == metric.max and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",max=" .. metric.max + end + + -- add warning to attributes + if metric.warning_high and metric.warning_high == metric.warning_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.warning_high + end + + -- add critical to attributes + if metric.critical_high and metric.critical_high == metric.critical_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.critical_high + end + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_metric_info: build a string with all the required info for a metric. Info that can be either used as a metric label or metric attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common metric info +-- @param metric (table) the metric table +-- @return dimension_string (string) the dimension_string with all the required common metric info +function EventQueue:get_common_metric_info(dimension_type, dimension_string, metric) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + -- add metric instance + if metric.instance ~= "" and self.warp10_format[dimension_type].metric_instance then + dimension_string = dimension_string .. ",metric_instance=" .. broker.url_encode(metric.instance) + end + + -- add metric subinstances + if metric.subinstance[1] and self.warp10_format[dimension_type].metric_subinstances then + dimension_string = dimension_string .. ",metric_subinstances=" .. broker.url_encode(get_metric_subinstances_string(metric)) + end + + -- add metric unit + if metric.uom and self.warp10_format[dimension_type].metric_unit then + dimension_string = dimension_string .. ",uom=" .. broker.url_encode(metric.uom) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +function EventQueue:get_metric_subinstances_string(metric) + local subinstances_string = "" + + for _, subinstance in ipairs(metric.subinstance) do + if subinstances_string == "" then + subinstances_string = subinstance + else + subinstances_string = subinstances_string .. " " .. subinstance + end + end + + return subinstances_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + local data_table = { + type = "metric", + timestamp = os.date("!%Y-%m-%dT%TZ", t), + version = self.sc_params.params.api_version, + contentType = "warp10/plaintext", + content = event.data + } + + if not payload then + payload = { + version = self.sc_params.params.api_version, + data = { + data_table + } + } + else + table.insert(payload.data, data_table) + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + payload = broker.json_encode(payload) + local url = tostring(self.sc_params.params.centreon_report_http_address) .. tostring(self.sc_params.params.centreon_report_api_endpoint) + queue_metadata.headers = { + "Content-Type: application/vnd.centreon+json", + "x-api-key: " .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: centreon-report address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_metrics = sc_metrics.new(event, queue.sc_params.params, queue.sc_common, queue.sc_broker, queue.sc_logger) + queue.sc_event = queue.sc_metrics.sc_event + + if queue.sc_event:is_valid_category() then + if queue.sc_metrics:is_valid_bbdo_element() then + -- format event if it is validated + if queue.sc_metrics:is_valid_metric_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file diff --git a/centreon-certified/warp10/warp10-events-apiv2.lua b/centreon-certified/warp10/warp10-events-apiv2.lua new file mode 100644 index 00000000..62f28f39 --- /dev/null +++ b/centreon-certified/warp10/warp10-events-apiv2.lua @@ -0,0 +1,516 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "warp10_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/warp10-events.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + poller = false, + }, + labels = { + hg = false, + sg = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.warp10_address = params.warp10_http_address + self.sc_params.params.warp10_api_endpoint = params.warp10_api_endpoint or "/api/v0/update" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end + } + } + + self.format_metric = { + [categories.neb.id] = { + [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, + [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") + self:add() +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// centreon:status" .. self:build_labels() .. self:build_attributes() .. " " .. event.state + } +end + +-- makes no sense at the moment +-- function EventQueue:format_event_downtime() +-- local event = self.sc_event.event + +-- self.sc_event.event.formated_event = { +-- data = event.last_check .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. event.state +-- } +-- end + + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @param metric {table} a single metric data +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels(metric) + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_event_info("labels", labels_string) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @param metric {table} a single metric data +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes(metric) + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_event_info("attributes", attributes_string) + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_event_info: build a string with all the required info for a event. Info that can be either used as a event label or event attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common event info +-- @return dimension_string (string) the dimension_string with all the required common event info +function EventQueue:get_common_event_info(dimension_type, dimension_string) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + if not payload then + payload = event.data + else + payload = payload .. "\n" .. event.data + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + queue_metadata.headers = { + "Transfer-Encoding:chunked", + "X-Warp10-Token:" .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: warp10 address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker) + + if queue.sc_event:is_valid_category() then + if queue.sc_event:is_valid_element() then + -- format event if it is validated + if queue.sc_event:is_valid_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file diff --git a/centreon-certified/warp10/warp10-metrics-apiv2.lua b/centreon-certified/warp10/warp10-metrics-apiv2.lua new file mode 100644 index 00000000..81c7362f --- /dev/null +++ b/centreon-certified/warp10/warp10-metrics-apiv2.lua @@ -0,0 +1,595 @@ +#!/usr/bin/lua +-------------------------------------------------------------------------------- +-- Centreon Broker Warp10 Connector Events +-------------------------------------------------------------------------------- + + +-- Libraries +local curl = require "cURL" +local sc_common = require("centreon-stream-connectors-lib.sc_common") +local sc_logger = require("centreon-stream-connectors-lib.sc_logger") +local sc_broker = require("centreon-stream-connectors-lib.sc_broker") +local sc_event = require("centreon-stream-connectors-lib.sc_event") +local sc_params = require("centreon-stream-connectors-lib.sc_params") +local sc_macros = require("centreon-stream-connectors-lib.sc_macros") +local sc_flush = require("centreon-stream-connectors-lib.sc_flush") +local sc_metrics = require("centreon-stream-connectors-lib.sc_metrics") + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +-------------------------------------------------------------------------------- +-- Classe event_queue +-------------------------------------------------------------------------------- + +local EventQueue = {} +EventQueue.__index = EventQueue + +-------------------------------------------------------------------------------- +---- Constructor +---- @param conf The table given by the init() function and returned from the GUI +---- @return the new EventQueue +---------------------------------------------------------------------------------- + +function EventQueue.new(params) + local self = {} + + local mandatory_parameters = { + "api_token", + "warp10_http_address" + } + + self.fail = false + + -- set up log configuration + local logfile = params.logfile or "/var/log/centreon-broker/warp10-metrics.log" + local log_level = params.log_level or 1 + + -- initiate mandatory objects + self.sc_logger = sc_logger.new(logfile, log_level) + self.sc_common = sc_common.new(self.sc_logger) + self.sc_broker = sc_broker.new(self.sc_logger) + self.sc_params = sc_params.new(self.sc_common, self.sc_logger) + + -- checking mandatory parameters and setting a fail flag + if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then + self.fail = true + end + + -- this will tell which data is going to be part of the metric event. This is to avoid having thousands of params. It is at a cost of a slightly more complexe parameter + self.warp10_format = { + attributes = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + metric_boundaries = false, + metric_thresholds = false + }, + labels = { + hg = false, + sg = false, + metric_instance = false, + metric_subinstances = false, + metric_unit = false, + poller = false, + _cmaas = false + } + } + + -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs + self.sc_params.params.api_token = params.api_token + self.sc_params.params.warp10_address = params.warp10_http_address + self.sc_params.params.warp10_api_endpoint = params.warp10_api_endpoint or "/api/v0/update" + self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or "" + self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or "" + self.sc_params.params.cmaas = params.cmaas or "centreon" + self.sc_params.params.accepted_categories = params.accepted_categories or "neb" + self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" + self.sc_params.params.max_buffer_size = params.max_buffer_size or 30 + self.sc_params.params.hard_only = params.hard_only or 0 + self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0 + self.sc_params.params.enable_service_status_dedup = params.enable_service_status_dedup or 0 + -- just need to url encode the metric name so we don't need to filter out characters + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format#lines + self.sc_params.params.metric_name_regex = params.metric_name_regex or "[.*]" + self.sc_params.params.metric_replacement_character = params.metric_replacement_character or "_" + + -- apply users params and check syntax of standard ones + self.sc_params:param_override(params) + self.sc_params:check_params() + self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) + + -- only load the custom code file, not executed yet + if self.sc_params.load_custom_code_file and not self.sc_params:load_custom_code_file(self.sc_params.params.custom_code_file) then + self.sc_logger:error("[EventQueue:new]: couldn't successfully load the custom code file: " .. tostring(self.sc_params.params.custom_code_file)) + end + + self.sc_params:build_accepted_elements_info() + self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) + + local categories = self.sc_params.params.bbdo.categories + local elements = self.sc_params.params.bbdo.elements + + self.format_event = { + [categories.neb.id] = { + [elements.host_status.id] = function () return self:format_event_host() end, + [elements.service_status.id] = function () return self:format_event_service() end + } + } + + self.format_metric = { + [categories.neb.id] = { + [elements.host_status.id] = function (metric) return self:format_metric_host(metric) end, + [elements.service_status.id] = function (metric) return self:format_metric_service(metric) end + } + } + + self.send_data_method = { + [1] = function (payload, queue_metadata) return self:send_data(payload, queue_metadata) end + } + + self.build_payload_method = { + [1] = function (payload, event) return self:build_payload(payload, event) end + } + + -- return EventQueue object + setmetatable(self, { __index = EventQueue }) + self:update_warp10_format() + return self +end + +--- update_war10_format: set to true a flag for all labels and attributes that we should put in the event +function EventQueue:update_warp10_format() + local params = self.sc_params.params + + -- metric labels part + if params.warp10_accepted_labels ~= "" then + local warp10_accepted_labels = self.sc_common:split(params.warp10_accepted_labels) + + for index, accepted_label in ipairs(warp10_accepted_labels) do + if self.warp10_format.labels[accepted_label] ~= nil then + self.warp10_format.labels[accepted_label] = true + else + + local possible_labels = "" + for label_name, label_value in pairs(self.warp10_format.labels) do + possible_labels = possible_labels .. label_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_label) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_labels)) + end + end + end + + -- metric attributes part + if params.warp10_accepted_attributes ~= "" then + local warp10_accepted_attributes = self.sc_common:split(params.warp10_accepted_attributes) + + for index, accepted_attribute in pairs(warp10_accepted_attributes) do + if self.warp10_format.attributes[accepted_attribute] ~= nil then + self.warp10_format.attributes[accepted_attribute] = true + else + local possible_attributes = "" + for attribute_name, attribute_value in pairs(self.warp10_format.attributes) do + possible_attributes = possible_attributes .. attribute_name .. ", " + end + + self.sc_logger:error("[EventQueue:update_warp10_format]: Label: " .. tostring(accepted_attribute) + .. " is not a valid label. It is going to be ignored. Here is the list of possible labels: " .. tostring(possible_attributes)) + end + end + end + + +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_accepted_event method +-------------------------------------------------------------------------------- +function EventQueue:format_accepted_event() + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:format_event]: starting format event") + + -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file + if not self.format_event[category][element] then + self.sc_logger:error("[format_event]: You are trying to format an event with category: " + .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " + .. tostring(self.sc_params.params.reverse_element_mapping[category][element]) + .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") + else + self.format_event[category][element]() + end + + self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_host method +-------------------------------------------------------------------------------- +function EventQueue:format_event_host() + local event = self.sc_event.event + self.sc_logger:debug("[EventQueue:format_event_host]: call build_metric ") + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_event_service method +-------------------------------------------------------------------------------- +function EventQueue:format_event_service() + self.sc_logger:debug("[EventQueue:format_event_service]: call build_metric ") + local event = self.sc_event.event + self.sc_metrics:build_metric(self.format_metric[event.category][event.element]) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_host method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_host(metric) + self.sc_logger:debug("[EventQueue:format_metric_host]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +-------------------------------------------------------------------------------- +function EventQueue:format_metric_service(metric) + self.sc_logger:debug("[EventQueue:format_metric_service]: call format_metric ") + self:format_metric_event(metric) +end + +-------------------------------------------------------------------------------- +---- EventQueue:format_metric_service method +-- @param metric {table} a single metric data +------------------------------------------------------------------------------- +function EventQueue:format_metric_event(metric) + self.sc_logger:debug("[EventQueue:format_metric]: start real format metric ") + local event = self.sc_event.event + + self.sc_event.event.formated_event = { + data = event.last_check .. "000000// " .. broker.url_encode(metric.metric_name) .. self:build_labels(metric) .. self:build_attributes(metric) .. " " .. metric.value + } + + self:add() + self.sc_logger:debug("[EventQueue:format_metric]: end real format metric ") +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_labels method +-- @param metric {table} a single metric data +-- @return labels_string {string} a string with all labels +-------------------------------------------------------------------------------- +function EventQueue:build_labels(metric) + local event = self.sc_event.event + local params = self.sc_params.params + local resource_type = "host" + local labels_string = "host_name=" .. broker.url_encode(event.cache.host.name) .. ",host_id=" .. event.host_id + + -- add service name in labels + if event.cache.service and event.cache.service.description then + resource_type = "service" + labels_string = labels_string .. ",service_name=" .. broker.url_encode(event.cache.service.description) .. ",service_id=" .. event.service_id + end + + labels_string = self:get_common_metric_info("labels", labels_string, metric) + + -- param that we need here at Centreon on our own Warp10 + if self.warp10_format.labels._cmaas then + labels_string = labels_string .. ",_cmaas=" .. broker.url_encode(params.cmaas) + end + + labels_string = labels_string .. ",resource_type=" .. resource_type + + return "{" .. labels_string .. "}" +end + +-------------------------------------------------------------------------------- +---- EventQueue:build_attributes method +-- @param metric {table} a single metric data +-- @return tags {table} a string with all attributes +-------------------------------------------------------------------------------- +function EventQueue:build_attributes(metric) + local event = self.sc_event.event + local attributes_string = "" + + attributes_string = self:get_common_metric_info("attributes", attributes_string, metric) + + if metric.min and metric.min == metric.min and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",min=" .. metric.min + end + + if metric.max and metric.max == metric.max and self.warp10_format.attributes.metric_boundaries then + attributes_string = attributes_string .. ",max=" .. metric.max + end + + -- add warning to attributes + if metric.warning_high and metric.warning_high == metric.warning_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.warning_high + end + + -- add critical to attributes + if metric.critical_high and metric.critical_high == metric.critical_high and self.warp10_format.attributes.metric_thresholds then + attributes_string = attributes_string .. ",warning=" .. metric.critical_high + end + + if attributes_string ~= "" then + -- we trim because if the string is not empty we have a string that starts with a "," character + return "{" .. self.sc_common:trim(attributes_string, ",") .. "}" + end + + return attributes_string +end + +--- get_common_metric_info: build a string with all the required info for a metric. Info that can be either used as a metric label or metric attribute +-- @param dimension_type (string) can be "attributes" or "labels" +-- @param dimension_string (string) a string to which the function will concatenate common metric info +-- @param metric (table) the metric table +-- @return dimension_string (string) the dimension_string with all the required common metric info +function EventQueue:get_common_metric_info(dimension_type, dimension_string, metric) + local event = self.sc_event.event + local params = self.sc_params.params + + -- add hostgroups + if event.cache.hostgroups and event.cache.hostgroups[1] and self.warp10_format[dimension_type].hg then + dimension_string = dimension_string .. ",hostgroups=" .. broker.url_encode(self:get_hg_string(event)) + end + + -- add servicegroups name + if event.cache.servicegroups and event.cache.servicegroups[1] and self.warp10_format[dimension_type].sg then + dimension_string = dimension_string .. ",servicegroups=" .. broker.url_encode(self:get_sg_string(event)) + end + + -- add metric instance + if metric.instance ~= "" and self.warp10_format[dimension_type].metric_instance then + dimension_string = dimension_string .. ",metric_instance=" .. broker.url_encode(metric.instance) + end + + -- add metric subinstances + if metric.subinstance[1] and self.warp10_format[dimension_type].metric_subinstances then + dimension_string = dimension_string .. ",metric_subinstances=" .. broker.url_encode(get_metric_subinstances_string(metric)) + end + + -- add metric unit + if metric.uom and self.warp10_format[dimension_type].metric_unit then + dimension_string = dimension_string .. ",uom=" .. broker.url_encode(metric.uom) + end + + return dimension_string +end + +function EventQueue:get_hg_string(event) + local hostgroups_string = "" + + for _, hostgroup in ipairs(event.cache.hostgroups) do + if hostgroups_string == "" then + hostgroups_string = hostgroup.group_name + else + hostgroups_string = hostgroups_string .. " " .. hostgroup.group_name + end + end + + return hostgroups_string +end + + +function EventQueue:get_sg_string(event) + local servicegroups_string = "" + + for _, servicegroup in ipairs(event.cache.servicegroups) do + if servicegroups_string == "" then + servicegroups_string = servicegroup.group_name + else + servicegroups_string = servicegroups_string .. " " .. servicegroup.group_name + end + end + + return servicegroups_string +end + +function EventQueue:get_metric_subinstances_string(metric) + local subinstances_string = "" + + for _, subinstance in ipairs(metric.subinstance) do + if subinstances_string == "" then + subinstances_string = subinstance + else + subinstances_string = subinstances_string .. " " .. subinstance + end + end + + return subinstances_string +end + +-------------------------------------------------------------------------------- +-- EventQueue:add, add an event to the sending queue +-------------------------------------------------------------------------------- +function EventQueue:add() + -- store event in self.events lists + local category = self.sc_event.event.category + local element = self.sc_event.event.element + + self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) + .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) + + self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) + self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event + + self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) + .. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) +end + +-------------------------------------------------------------------------------- +-- EventQueue:build_payload, concatenate data so it is ready to be sent +-- @param payload {string} json encoded string +-- @param event {table} the event that is going to be added to the payload +-- @return payload {string} json encoded string +-------------------------------------------------------------------------------- +function EventQueue:build_payload(payload, event) + if not payload then + payload = event.data + else + payload = payload .. "\n" .. event.data + end + + return payload +end + +function EventQueue:send_data(payload, queue_metadata) + self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") + + local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint) + queue_metadata.headers = { + "Transfer-Encoding:chunked", + "X-Warp10-Token:" .. self.sc_params.params.api_token + } + + self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload) + + -- write payload in the logfile for test purpose + if self.sc_params.params.send_data_test == 1 then + self.sc_logger:notice("[send_data]: " .. tostring(payload)) + return true + end + + self.sc_logger:info("[EventQueue:send_data]: Going to send the following json " .. tostring(payload)) + self.sc_logger:info("[EventQueue:send_data]: warp10 address is: " .. tostring(url)) + + local http_response_body = "" + local http_request = curl.easy() + :setopt_url(url) + :setopt_writefunction( + function (response) + http_response_body = http_response_body .. tostring(response) + end + ) + :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) + :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) + :setopt(curl.OPT_HTTPHEADER, queue_metadata.headers) + + -- set proxy address configuration + if (self.sc_params.params.proxy_address ~= '') then + if (self.sc_params.params.proxy_port ~= '') then + http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") + end + end + + -- set proxy user configuration + if (self.sc_params.params.proxy_username ~= '') then + if (self.sc_params.params.proxy_password ~= '') then + http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) + else + self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") + end + end + + -- adding the HTTP POST data + http_request:setopt_postfields(payload) + + -- performing the HTTP request + http_request:perform() + + -- collecting results + http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) + + http_request:close() + + -- Handling the return code + local retval = false + -- https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress#response-status-code other than 200 is not good + if http_response_code == 200 then + self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) + retval = true + else + self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) + end + + return retval +end + +-------------------------------------------------------------------------------- +-- Required functions for Broker StreamConnector +-------------------------------------------------------------------------------- + +local queue + +-- Fonction init() +function init(conf) + queue = EventQueue.new(conf) +end + +-- -------------------------------------------------------------------------------- +-- write, +-- @param {table} event, the event from broker +-- @return {boolean} +-------------------------------------------------------------------------------- +function write (event) + -- skip event if a mandatory parameter is missing + if queue.fail then + queue.sc_logger:error("Skipping event because a mandatory parameter is not set") + return false + end + + -- initiate event object + queue.sc_metrics = sc_metrics.new(event, queue.sc_params.params, queue.sc_common, queue.sc_broker, queue.sc_logger) + queue.sc_event = queue.sc_metrics.sc_event + + if queue.sc_event:is_valid_category() then + if queue.sc_metrics:is_valid_bbdo_element() then + -- format event if it is validated + if queue.sc_metrics:is_valid_metric_event() then + queue:format_accepted_event() + end + --- log why the event has been dropped + else + queue.sc_logger:debug("dropping event because element is not valid. Event element is: " + .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) + end + else + queue.sc_logger:debug("dropping event because category is not valid. Event category is: " + .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) + end + + return flush() +end + + +-- flush method is called by broker every now and then (more often when broker has nothing else to do) +function flush() + local queues_size = queue.sc_flush:get_queues_size() + + -- nothing to flush + if queues_size == 0 then + return true + end + + -- flush all queues because last global flush is too old + if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- flush queues because too many events are stored in them + if queues_size > queue.sc_params.params.max_buffer_size then + if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then + return false + end + + return true + end + + -- there are events in the queue but they were not ready to be send + return false +end \ No newline at end of file diff --git a/modules/centreon-stream-connectors-lib/sc_event.lua b/modules/centreon-stream-connectors-lib/sc_event.lua index 3df94c61..f8d497e4 100644 --- a/modules/centreon-stream-connectors-lib/sc_event.lua +++ b/modules/centreon-stream-connectors-lib/sc_event.lua @@ -27,6 +27,8 @@ function sc_event.new(event, params, common, logger, broker) self.sc_broker = broker self.bbdo_version = self.sc_common:get_bbdo_version() + _PERFORMANCE_ANALYSIS.received_events = _PERFORMANCE_ANALYSIS.received_events + 1 + self.event.cache = {} setmetatable(self, { __index = ScEvent }) @@ -78,13 +80,18 @@ function ScEvent:is_valid_event() -- drop the event if it was not valid. Custom code do not have to work on already invalid events if not is_valid_event then + _PERFORMANCE_ANALYSIS.dropped_events = _PERFORMANCE_ANALYSIS.dropped_events + 1 return is_valid_event end -- run custom code if self.params.custom_code and type(self.params.custom_code) == "function" then self, is_valid_event = self.params.custom_code(self) - end + end + + if not is_valid_event then + _PERFORMANCE_ANALYSIS.dropped_events = _PERFORMANCE_ANALYSIS.dropped_events + 1 + end return is_valid_event end @@ -1014,7 +1021,7 @@ function ScEvent:is_valid_acknowledgement_event() return true end ---- is_vaid_downtime_event: check if the event is a valid downtime event +--- is_valid_downtime_event: check if the event is a valid downtime event -- return true|false (boolean) function ScEvent:is_valid_downtime_event() -- return false if the event is one of all the "fake" start or end downtime event received from broker @@ -1322,6 +1329,7 @@ function ScEvent:is_valid_downtime_event_start() end -- end compat patch + self.event.downtime_processing_step = "start" return true end @@ -1340,6 +1348,7 @@ function ScEvent:is_valid_downtime_event_end() end -- end compat patch + self.event.downtime_processing_step = "end" return true end diff --git a/modules/centreon-stream-connectors-lib/sc_flush.lua b/modules/centreon-stream-connectors-lib/sc_flush.lua index 71ab72d3..6cf48771 100644 --- a/modules/centreon-stream-connectors-lib/sc_flush.lua +++ b/modules/centreon-stream-connectors-lib/sc_flush.lua @@ -17,6 +17,48 @@ local ScFlush = {} function sc_flush.new(params, logger) local self = {} + _PERFORMANCE_ANALYSIS = { + received_events = 0, + dropped_events = 0, + sent_events = 0, + number_of_flush = 0, + number_of_max_buffer_size_reached = 0, + last_log_time = os.time(), + log_analysis = function () + if _PERFORMANCE_ANALYSIS.last_log_time + params.analysis_frequency <= os.time() then + return + end + + self.sc_logger:notice("[ScFlush:PERFORMANCE_ANALYSIS]: " .. _PERFORMANCE_ANALYSIS:get_analysis_result()) + _PERFORMANCE_ANALYSIS:reset() + end, + get_analysis_result = function () + local analysis = nil + local optimal_max_buffer_size, optimal_max_all_queues_age + + -- max buffer size is too big and never reached + if _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached == 0 then + -- 3 events envoyés, max 5, number of flush = 3 + optimal_max_buffer_size = + + end, + increase_sent_counters = function () + _PERFORMANCE_ANALYSIS.sent_events = _PERFORMANCE_ANALYSIS.sent_events + counter + _PERFORMANCE_ANALYSIS.number_of_flush = _PERFORMANCE_ANALYSIS.number_of_flush + 1 + if params.max_buffer_size % _PERFORMANCE_ANALYSIS.sent_events == 0 then + _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached = _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached + 1 + end + end, + reset = function () + _PERFORMANCE_ANALYSIS.received_events = 0 + _PERFORMANCE_ANALYSIS.dropped_events = 0 + _PERFORMANCE_ANALYSIS.sent_events = 0 + _PERFORMANCE_ANALYSIS.number_of_flush = 0 + _PERFORMANCE_ANALYSIS.number_of_max_buffer_size_reached = 0 + _PERFORMANCE_ANALYSIS.last_log_time = os.time() + end + } + -- create a default logger if it is not provided self.sc_logger = logger if not self.sc_logger then @@ -122,7 +164,7 @@ function ScFlush:get_queues_size() return queues_size end ---- flush_mixed_payload: flush a payload that contains various type of events (services mixed hosts for example) +--- flush_mixed_payload: flush a payload that contains various type of events (services mixed with hosts for example) -- @return boolean (boolean) true or false depending on the success of the operation function ScFlush:flush_mixed_payload(build_payload_method, send_method) local payload = nil @@ -138,6 +180,8 @@ function ScFlush:flush_mixed_payload(build_payload_method, send_method) -- send events if max buffer size is reached if counter >= self.params.max_buffer_size then + _PERFORMANCE_ANALYSIS:increase_sent_counters() + if not self:flush_payload(send_method, payload, self.queues.global_queues_metadata) then return false end @@ -149,6 +193,8 @@ function ScFlush:flush_mixed_payload(build_payload_method, send_method) end end + _PERFORMANCE_ANALYSIS:increase_sent_counters() + -- we need to empty all queues to not mess with broker retention if not self:flush_payload(send_method, payload, self.queues.global_queues_metadata) then return false @@ -174,6 +220,8 @@ function ScFlush:flush_homogeneous_payload(build_payload_method, send_method) -- send events if max buffer size is reached if counter >= self.params.max_buffer_size then + _PERFORMANCE_ANALYSIS:increase_sent_counters() + if not self:flush_payload( send_method, payload, @@ -188,6 +236,8 @@ function ScFlush:flush_homogeneous_payload(build_payload_method, send_method) end end + _PERFORMANCE_ANALYSIS:increase_sent_counters() + -- make sure there are no events left inside a specific queue if not self:flush_payload( send_method, diff --git a/modules/centreon-stream-connectors-lib/sc_params.lua b/modules/centreon-stream-connectors-lib/sc_params.lua index 0fe62e07..d4ab2032 100644 --- a/modules/centreon-stream-connectors-lib/sc_params.lua +++ b/modules/centreon-stream-connectors-lib/sc_params.lua @@ -125,6 +125,9 @@ function sc_params.new(common, logger) log_level = "", log_curl_commands = 0, + -- performance analysis parameters + analysis_frequency = 300, + -- metric metric_name_regex = "no_forbidden_character_to_replace", metric_replacement_character = "_",