diff --git a/centreon-certified/bsm/bsm-events-apiv2.lua b/centreon-certified/bsm/bsm-events-apiv2.lua
new file mode 100644
index 00000000..94f548e4
--- /dev/null
+++ b/centreon-certified/bsm/bsm-events-apiv2.lua
@@ -0,0 +1,372 @@
+--
+-- Copyright © 2021 Centreon
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the Licensself.sc_event.event.
+-- You may obtain a copy of the License at
+--
+-- http://www.apachself.sc_event.event.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the Licensself.sc_event.event.
+--
+-- For more information : contact@centreon.com
+--
+-- To work you need to provide to this script a Broker stream connector output configuration
+-- with the following informations:
+--
+-- source_ci (string): Name of the transmiter, usually Centreon server name
+-- http_server_url (string): the full HTTP URL. Default: https://my.bsm.server:30005/bsmc/rest/events/ws-centreon/.
+-- http_proxy_string (string): the full proxy URL if needed to reach the BSM server. Default: empty.
+-- log_path (string): the log file to use
+-- log_level (number): the log level (0, 1, 2, 3) where 3 is the maximum level. 0 logs almost nothing. 1 logs only the beginning of the script and errors. 2 logs a reasonable amount of verbosself.sc_event.event. 3 logs almost everything possible, to be used only for debug. Recommended value in production: 1.
+-- max_buffer_size (number): how many events to store before sending them to the server.
+-- max_buffer_age (number): flush the events when the specified time (in second) is reached (even if max_size is not reached).
+
+-- Libraries
+local curl = require "cURL"
+
+-- Centreon lua core libraries
+local sc_common = require("centreon-stream-connectors-lib.sc_common")
+local sc_logger = require("centreon-stream-connectors-lib.sc_logger")
+local sc_broker = require("centreon-stream-connectors-lib.sc_broker")
+local sc_event = require("centreon-stream-connectors-lib.sc_event")
+local sc_params = require("centreon-stream-connectors-lib.sc_params")
+local sc_macros = require("centreon-stream-connectors-lib.sc_macros")
+local sc_flush = require("centreon-stream-connectors-lib.sc_flush")
+
+
+--------------------------------------------------------------------------------
+-- EventQueue class
+--------------------------------------------------------------------------------
+
+local EventQueue = {}
+EventQueue.__index = EventQueue
+
+--------------------------------------------------------------------------------
+-- Constructor
+-- @param conf The table given by the init() function and returned from the GUI
+-- @return the new EventQueue
+--------------------------------------------------------------------------------
+
+function EventQueue.new(params)
+ local self = {}
+ self.fail = false
+
+ local mandatory_parameters = {
+ "http_server_url"
+ }
+
+ -- set up log configuration
+ local logfile = params.logfile or "/var/log/centreon-broker/bsm_event-apiv2.log"
+ local log_level = params.log_level or 1
+
+ -- initiate mandatory objects
+ self.sc_logger = sc_logger.new(logfile, log_level)
+ self.sc_common = sc_common.new(self.sc_logger)
+ self.sc_broker = sc_broker.new(self.sc_logger)
+ self.sc_params = sc_params.new(self.sc_common, self.sc_logger)
+
+ -- checking mandatory parameters and setting a fail flag
+ if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then
+ self.fail = true
+ end
+
+ -- overriding default parameters for this stream connector if the default values doesn't suit the basic needs
+ self.sc_params.params.accepted_categories = params.accepted_categories or "neb"
+ self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status"
+ self.sc_params.params.source_ci = params.source_ci or "Centreon"
+ self.sc_params.params.max_output_length = params.max_output_length or 1024
+
+ -- apply users params and check syntax of standard ones
+ self.sc_params:param_override(params)
+ self.sc_params:check_params()
+
+ self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger)
+ self.format_template = self.sc_params:load_event_format_file(true)
+ self.sc_params:build_accepted_elements_info()
+ self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger)
+
+ local categories = self.sc_params.params.bbdo.categories
+ local elements = self.sc_params.params.bbdo.elements
+
+ self.format_event = {
+ [categories.neb.id] = {
+ [elements.host_status.id] = function () return self:format_event_host() end,
+ [elements.service_status.id] = function () return self:format_event_service() end
+ },
+ [categories.bam.id] = {}
+ }
+
+ self.send_data_method = {
+ [1] = function (payload) return self:send_data(payload) end
+ }
+
+ self.build_payload_method = {
+ [1] = function (payload, event) return self:build_payload(payload, event) end
+ }
+
+ -- return EventQueue object
+ setmetatable(self, { __index = EventQueue })
+ return self
+end
+
+--------------------------------------------------------------------------------
+---- EventQueue:format_event method
+---------------------------------------------------------------------------------
+function EventQueue:format_accepted_event()
+ local category = self.sc_event.event.category
+ local element = self.sc_event.event.element
+ local template = self.sc_params.params.format_template[category][element]
+ self.sc_logger:debug("[EventQueue:format_event]: starting format event")
+ self.sc_event.event.formated_event = {}
+
+ if self.format_template and template ~= nil and template ~= "" then
+ self.sc_event.event.formated_event = self.sc_macros:replace_sc_macro(template, self.sc_event.event, true)
+ else
+ -- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file
+ if not self.format_event[category][element] then
+ self.sc_logger:error("[format_event]: You are trying to format an event with category: "
+ .. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: "
+ .. tostring(self.sc_params.params.reverse_element_mapping[category][element])
+ .. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element")
+ else
+ self.format_event[category][element]()
+ end
+ end
+
+ self:add()
+ self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished")
+end
+
+-- Format XML file with host infoamtion
+function EventQueue:format_event_host()
+ local xml_host_severity = self.sc_broker:get_severity(self.sc_event.event.host_id)
+ local xml_url = self.sc_common:ifnil_or_empty(self.sc_event.event.cache.host.action_url, 'no action url for this host')
+ local xml_notes = self.sc_common:ifnil_or_empty(self.sc_event.event.cache.host.notes, 'no notes found on host')
+
+ if xml_host_severity == false then
+ xml_host_severity = 0
+ end
+
+ self.sc_event.event.formated_event = {
+ hostname = self.sc_event.event.cache.host.name,
+ host_severity = xml_host_severity,
+ host_notes = xml_notes,
+ url = xml_url,
+ source_ci = self.sc_common:ifnil_or_empty(self.source_ci, 'Centreon'),
+ source_host_id = self.sc_common:ifnil_or_empty(self.sc_event.event.host_id, 0),
+ scheduled_downtime_depth = self.sc_common:ifnil_or_empty(self.sc_event.event.scheduled_downtime_depth, 0)
+ }
+end
+
+-- Format XML file with service infoamtion
+function EventQueue:format_event_service()
+ local xml_url = self.sc_common:ifnil_or_empty(self.sc_event.event.cache.host.notes_url, 'no url for this service')
+ local xml_service_severity = self.sc_broker:get_severity(self.sc_event.event.host_id, self.sc_event.event.service_id)
+
+ if xml_service_severity == false then
+ xml_service_severity = 0
+ end
+
+ self.sc_event.event.formated_event = {
+ hostname = self.sc_event.event.cache.host.name,
+ svc_desc = self.sc_event.event.cache.service.description,
+ state = self.sc_event.event.state,
+ last_update = self.sc_event.event.last_update,
+ output = string.match(self.sc_event.event.output, "^(.*)\n"),
+ service_severity = xml_service_severity,
+ url = xml_url,
+ source_host_id = self.sc_common:ifnil_or_empty(self.sc_event.event.host_id, 0),
+ source_svc_id = self.sc_common:ifnil_or_empty(self.sc_event.event.service_id, 0),
+ scheduled_downtime_depth = self.sc_common:ifnil_or_empty(self.sc_event.event.scheduled_downtime_depth, 0)
+ }
+end
+
+--------------------------------------------------------------------------------
+-- EventQueue:add method
+-- @param e An event
+--------------------------------------------------------------------------------
+
+function EventQueue:add()
+ -- store event in self.events lists
+ local category = self.sc_event.event.category
+ local element = self.sc_event.event.element
+
+ self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category])
+ .. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element]))
+
+ self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events))
+ self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event
+
+ self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events)
+ .. "max is: " .. tostring(self.sc_params.params.max_buffer_size))
+end
+
+--------------------------------------------------------------------------------
+-- EventQueue:build_payload, concatenate data so it is ready to be sent
+-- @param payload {string} json encoded string
+-- @param event {table} the event that is going to be added to the payload
+-- @return payload {string} json encoded string
+--------------------------------------------------------------------------------
+function EventQueue:build_payload(payload, event)
+ if not payload then
+ payload = ""
+ for index, xml_str in pairs(event) do
+ payload = payload .. "<" .. tostring(index) .. ">" .. tostring(self.sc_common:xml_escape(xml_str)) .. "" .. tostring(index) .. ">"
+ end
+ payload = payload .. ""
+
+ else
+ payload = payload .. ""
+ for index, xml_str in pairs(event) do
+ payload = payload .. "<" .. tostring(index) .. ">" .. tostring(self.sc_common:xml_escape(xml_str)) .. "" .. tostring(index) .. ">"
+ end
+ payload = payload .. ""
+ end
+
+ return payload
+end
+
+function EventQueue:send_data(payload)
+ self.sc_logger:debug("[EventQueue:send_data]: Starting to send data")
+
+ -- write payload in the logfile for test purpose
+ if self.sc_params.params.send_data_test == 1 then
+ self.sc_logger:notice("[send_data]: " .. tostring(payload))
+ return true
+ end
+
+ self.sc_logger:info("[EventQueue:send_data]: Going to send the following xml " .. tostring(payload))
+ self.sc_logger:info("[EventQueue:send_data]: BSM Http Server URL is: \"" .. tostring(self.sc_params.params.http_server_url .. "\""))
+
+ local http_response_body = ""
+ local http_request = curl.easy()
+ :setopt_url(self.sc_params.params.http_server_url)
+ :setopt_writefunction(
+ function (response)
+ http_response_body = http_response_body .. tostring(response)
+ end
+ )
+ :setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout)
+ :setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection)
+ :setopt(
+ curl.OPT_HTTPHEADER,
+ {
+ "Content-Type: text/xml",
+ "content-length: " .. string.len(payload)
+ }
+ )
+
+ -- set proxy address configuration
+ if (self.sc_params.params.proxy_address ~= '') then
+ if (self.sc_params.params.proxy_port ~= '') then
+ http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port)
+ else
+ self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used")
+ end
+ end
+
+ -- set proxy user configuration
+ if (self.sc_params.params.proxy_username ~= '') then
+ if (self.sc_params.params.proxy_password ~= '') then
+ http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password)
+ else
+ self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used")
+ end
+ end
+
+ -- adding the HTTP POST data
+ http_request:setopt_postfields(payload)
+ -- performing the HTTP request
+ http_request:perform()
+ -- collecting results
+ http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE)
+ http_request:close()
+
+ -- Handling the return code
+ local retval = false
+ if http_response_code == 202 or http_response_code == 200 then
+ self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code))
+ retval = true
+ else
+ self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body))
+ end
+ return retval
+end
+
+--------------------------------------------------------------------------------
+-- Required functions for Broker StreamConnector
+--------------------------------------------------------------------------------
+
+local queue
+
+-- Fonction init()
+function init(conf)
+ queue = EventQueue.new(conf)
+end
+
+-- Fonction write()
+function write(event)
+ -- skip event if a mandatory parameter is missing
+ if queue.fail then
+ queue.sc_logger:error("Skipping event because a mandatory parameter is not set")
+ return false
+ end
+
+ -- initiate event object
+ queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker)
+
+ if queue.sc_event:is_valid_category() then
+ if queue.sc_event:is_valid_element() then
+ -- format event if it is validated
+ if queue.sc_event:is_valid_event() then
+ queue:format_accepted_event()
+ end
+
+ --- log why the event has been dropped
+ else
+ queue.sc_logger:debug("dropping event because element is not valid. Event element is: "
+ .. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element]))
+ end
+ else
+ queue.sc_logger:debug("dropping event because category is not valid. Event category is: "
+ .. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category]))
+ end
+
+ return flush()
+end
+
+-- flush method is called by broker every now and then (more often when broker has nothing else to do)
+function flush()
+ local queues_size = queue.sc_flush:get_queues_size()
+
+ -- nothing to flush
+ if queues_size == 0 then
+ return true
+ end
+
+ -- flush all queues because last global flush is too old
+ if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then
+ if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then
+ return false
+ end
+
+ return true
+ end
+
+ -- flush queues because too many events are stored in them
+ if queues_size > queue.sc_params.params.max_buffer_size then
+ if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then
+ return false
+ end
+
+ return true
+ end
+
+ -- there are events in the queue but they were not ready to be send
+ return false
+end