From 5941adfbb90a024de3a8480a063ed0e023cc0da9 Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Wed, 16 Oct 2024 17:46:03 +0530 Subject: [PATCH] * Refactor to classes --- lib/logstash/outputs/kusto.rb | 18 +- .../outputs/kusto/custom_size_based_buffer.rb | 42 ++--- lib/logstash/outputs/kusto/ingestor.rb | 73 +++------ lib/logstash/outputs/kusto/interval.rb | 126 +++++++------- .../kusto/kustoLogstashConfiguration.rb | 155 ++++++++++++++++++ spec/outputs/kusto/ingestor_spec.rb | 97 ----------- spec/outputs/kusto/interval_spec.rb | 69 ++++++++ .../kusto/kustoLogstashConfiguration_spec.rb | 106 ++++++++++++ 8 files changed, 444 insertions(+), 242 deletions(-) create mode 100644 lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb delete mode 100755 spec/outputs/kusto/ingestor_spec.rb create mode 100644 spec/outputs/kusto/interval_spec.rb create mode 100755 spec/outputs/kusto/kustoLogstashConfiguration_spec.rb diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 6fa160b4..18a1d501 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -7,6 +7,7 @@ require 'logstash/outputs/kusto/ingestor' require 'logstash/outputs/kusto/interval' require 'logstash/outputs/kusto/custom_size_based_buffer' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' ## # This plugin sends messages to Azure Kusto in batches. @@ -91,13 +92,13 @@ def register max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs) - - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, proxy_host, proxy_port, proxy_protocol, @logger, executor) - - # Deprecation warning for path - if @path - @logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.") - end + + kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping) + kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth) + kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) + @kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger) + @ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, executor) + end @@ -114,8 +115,7 @@ def multi_receive_encoded(events_and_encoded) def close @logger.info("Closing Kusto output plugin") - - begin + begin @buffer.shutdown unless @buffer.nil? @logger.info("Buffer shutdown") unless @buffer.nil? rescue => e diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 7c680266..6a012276 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -39,10 +39,10 @@ def initialize(max_size_mb, max_interval, &flush_callback) end end - def <<(event) - while buffer_full? do - sleep 0.1 - end + def <<(event) + while buffer_full? do + sleep 0.1 + end @pending_mutex.synchronize do @buffer_state[:pending_items] << event @@ -58,23 +58,23 @@ def shutdown clear_buffer_files end - private + private - def buffer_full? - @pending_mutex.synchronize do - @buffer_state[:pending_size] >= @buffer_config[:max_size] - end - end + def buffer_full? + @pending_mutex.synchronize do + @buffer_state[:pending_size] >= @buffer_config[:max_size] + end + end - def buffer_flush(options = {}) - force = options[:force] || options[:final] - final = options[:final] + def buffer_flush(options = {}) + force = options[:force] || options[:final] + final = options[:final] - if final - @flush_mutex.lock - elsif !@flush_mutex.try_lock - return 0 - end + if final + @flush_mutex.lock + elsif !@flush_mutex.try_lock + return 0 + end items_flushed = 0 @@ -85,7 +85,7 @@ def buffer_flush(options = {}) @pending_mutex.synchronize do return 0 if @buffer_state[:pending_size] == 0 - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] return 0 @@ -123,8 +123,8 @@ def buffer_flush(options = {}) @flush_mutex.unlock end - items_flushed - end + items_flushed + end def save_buffer_to_file(events) buffer_state_copy = { diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index e991d5a5..392cb0a1 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,38 +20,39 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth) + #Validate and assign + kusto_logstash_configuration.validate_config() + @kusto_logstash_configuration = kusto_logstash_configuration + @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http - # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - # If there is managed identity, use it. This means the AppId and AppKey are empty/nil - # If there is CLI Auth, use that instead of managed identity - is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth) + + is_managed_identity = @kusto_logstash_configuration.kusto_auth.is_managed_identity # If it is system managed identity, propagate the system identity - is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) + is_system_assigned_managed_identity = @kusto_logstash_configuration.kusto_auth.is_system_assigned_managed_identity # Is it direct connection - is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + is_direct_conn = @kusto_logstash_configuration.kusto_proxy.is_direct_conn # Create a connection string kusto_connection_string = if is_managed_identity if is_system_assigned_managed_identity @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_ingest.managed_identity_id) end else - if cli_auth + if @kusto_logstash_configuration.kusto_auth.cli_auth @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') - kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using app id and app key.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_auth.app_id, @kusto_logstash_configuration.kusto_auth.app_key.value, @kusto_logstash_configuration.kusto_auth.app_tenant) end end @logger.debug(Gem.loaded_specs.to_s) @@ -62,22 +63,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli tuple_utils = Java::org.apache.commons.lang3.tuple # kusto_connection_string.setClientVersionForTracing(name_for_tracing) version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" - kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); + kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,name_for_tracing.to_s,version_for_tracing.to_s,false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else - kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() + kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_logstash_configuration.kusto_proxy.proxy_host,@kusto_logstash_configuration.kusto_proxy.proxy_port,@kusto_logstash_configuration.kusto_proxy.proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end - @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) - is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) - if is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + @ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table) + + if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided + @logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping) + @ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) else @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @@ -86,38 +87,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli @logger.debug('Kusto resources are ready.') end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth) - # Add an additional validation and fail this upfront - if app_id.nil? && app_key.nil? && managed_identity_id.nil? - if cli_auth - @logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production') - else - @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') - raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') - end - end - if database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end - - if table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') - end - - if json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') - end - - if not(["https", "http"].include? proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') - end - - end - def upload_async(data) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") diff --git a/lib/logstash/outputs/kusto/interval.rb b/lib/logstash/outputs/kusto/interval.rb index 33046309..6ba8d4a4 100755 --- a/lib/logstash/outputs/kusto/interval.rb +++ b/lib/logstash/outputs/kusto/interval.rb @@ -5,77 +5,77 @@ require 'logstash/errors' class LogStash::Outputs::Kusto < LogStash::Outputs::Base - ## - # Bare-bones utility for running a block of code at an interval. - # - class Interval - ## - # Initializes a new Interval with the given arguments and starts it - # before returning it. - # - # @param interval [Integer] (see: Interval#initialize) - # @param procsy [#call] (see: Interval#initialize) - # - # @return [Interval] - # - def self.start(interval, procsy) - new(interval, procsy).tap(&:start) - end + ## + # Bare-bones utility for running a block of code at an interval. + # + class Interval + ## + # Initializes a new Interval with the given arguments and starts it + # before returning it. + # + # @param interval [Integer] (see: Interval#initialize) + # @param procsy [#call] (see: Interval#initialize) + # + # @return [Interval] + # + def self.start(interval, procsy) + new(interval, procsy).tap(&:start) + end - ## - # @param interval [Integer]: time in seconds to wait between calling the given proc - # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. - def initialize(interval, procsy) - @interval = interval - @procsy = procsy + ## + # @param interval [Integer]: time in seconds to wait between calling the given proc + # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. + def initialize(interval, procsy) + @interval = interval + @procsy = procsy - # Mutex, ConditionVariable, etc. - @mutex = Mutex.new - @sleeper = ConditionVariable.new - end + # Mutex, ConditionVariable, etc. + @mutex = Mutex.new + @sleeper = ConditionVariable.new + end - ## - # Starts the interval, or returns if it has already been started. - # - # @return [void] - def start - @mutex.synchronize do - return if @thread && @thread.alive? + ## + # Starts the interval, or returns if it has already been started. + # + # @return [void] + def start + @mutex.synchronize do + return if @thread && @thread.alive? - @thread = Thread.new { run } - end - end + @thread = Thread.new { run } + end + end - ## - # Stop the interval. - # Does not interrupt if execution is in-progress. - def stop - @mutex.synchronize do - @stopped = true - end + ## + # Stop the interval. + # Does not interrupt if execution is in-progress. + def stop + @mutex.synchronize do + @stopped = true + end - @thread && @thread.join - end + @thread && @thread.join + end - ## - # @return [Boolean] - def alive? - @thread && @thread.alive? - end + ## + # @return [Boolean] + def alive? + @thread && @thread.alive? + end - private + private - def run - @mutex.synchronize do - loop do - @sleeper.wait(@mutex, @interval) - break if @stopped + def run + @mutex.synchronize do + loop do + @sleeper.wait(@mutex, @interval) + break if @stopped - @procsy.call - end - end - ensure - @sleeper.broadcast - end - end + @procsy.call + end + end + ensure + @sleeper.broadcast + end + end end diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb new file mode 100644 index 00000000..0aecb874 --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -0,0 +1,155 @@ +# encoding: utf-8 +# A class just having all the configurations wrapped into a seperate object +module LogStash + module Outputs + module KustoInternal + class KustoLogstashConfiguration + FIELD_REF = /%\{[^}]+\}/ + def initialize(kusto_ingest,kusto_auth, kusto_proxy, logger) + @logger = logger + @kusto_ingest = kusto_ingest + @kusto_auth = kusto_auth + @kusto_proxy = kusto_proxy + @logger.info("Kusto configuration initialized.") + end # def initialize + + # Configuration + def kusto_ingest + @kusto_ingest + end + def kusto_auth + @kusto_auth + end + def kusto_proxy + @kusto_proxy + end + + def validate_config() + # Add an additional validation and fail this upfront + if @kusto_auth.app_id.to_s.empty? && @kusto_auth.managed_identity_id.to_s.empty? && !@kusto_auth.cli_auth + @logger.error('managed_identity_id is not provided, cli_auth is false and app_id/app_key is empty.') + raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + end + # If proxy AAD is required and the proxy configuration is not provided - fail + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only can be used only when proxy is configured.', @kusto_proxy.proxy_aad_only) + raise LogStash::ConfigurationError.new('proxy_aad_only can be used only when proxy is configured.') + end + + if @kusto_ingest.database =~ FIELD_REF + @logger.error('database config value should not be dynamic.', @kusto_ingest.database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end + if @kusto_ingest.table =~ FIELD_REF + @logger.error('table config value should not be dynamic.', @kusto_ingest.table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + end + if @kusto_ingest.json_mapping =~ FIELD_REF + @logger.error('json_mapping config value should not be dynamic.', @kusto_ingest.json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + end + if not(["https", "http"].include? @kusto_proxy.proxy_protocol) + @logger.error('proxy_protocol has to be http or https.', @kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.',@kusto_proxy.proxy_host,@kusto_proxy.proxy_port,@kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.') + end + # If all validation pass then configuration is valid + return true + end #validate_config() + + end # class KustoLogstashConfiguration + class KustoAuthConfiguration + def initialize(app_id, app_key, app_tenant, managed_identity_id, cli_auth) + @app_id = app_id + @app_key = app_key + @app_tenant = app_tenant + @managed_identity_id = managed_identity_id + @cli_auth = cli_auth + @is_managed_identity = app_id.to_s.empty? && app_key.to_s.empty? && !cli_auth + @is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(kusto_auth.managed_identity_id) + end + # Authentication configuration + def app_id + @app_id + end + def app_key + @app_key + end + def app_tenant + @app_tenant + end + def managed_identity_id + @managed_identity_id + end + def is_managed_identity + @is_managed_identity + end + def cli_auth + @cli_auth + end + def is_system_assigned_managed_identity + @is_system_assigned_managed_identity + end + end # class KustoAuthConfiguration + class KustoProxyConfiguration + def initialize(proxy_host , proxy_port , proxy_protocol, proxy_aad_only) + @proxy_host = proxy_host + @proxy_port = proxy_port + @proxy_protocol = proxy_protocol + @proxy_aad_only = proxy_aad_only + # Is it direct connection + @is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + end + # proxy configuration + def proxy_host + @proxy_host + end + + def proxy_port + @proxy_port + end + + def proxy_protocol + @proxy_protocol + end + + def proxy_aad_only + @proxy_aad_only + end + + def is_direct_conn + @is_direct_conn + end + end # class KustoProxyConfiguration + class KustoIngestConfiguration + def initialize(ingest_url, database, table, json_mapping) + @ingest_url = ingest_url + @database = database + @table = table + @json_mapping = json_mapping + @is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) + end + # For ingestion + def ingest_url + @ingest_url + end + def database + @database + end + def table + @table + end + def json_mapping + @json_mapping + end + def is_mapping_ref_provided + @is_mapping_ref_provided + end + end # class KustoIngestionConfiguration + end # module KustoInternal + end # module Outputs +end # module LogStash diff --git a/spec/outputs/kusto/ingestor_spec.rb b/spec/outputs/kusto/ingestor_spec.rb deleted file mode 100755 index a077549b..00000000 --- a/spec/outputs/kusto/ingestor_spec.rb +++ /dev/null @@ -1,97 +0,0 @@ -# encoding: utf-8 -require_relative "../../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/outputs/kusto/ingestor' - -describe LogStash::Outputs::Kusto::Ingestor do - - let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } - let(:app_id) { "myid" } - let(:app_key) { LogStash::Util::Password.new("mykey") } - let(:app_tenant) { "mytenant" } - let(:managed_identity) { "managed_identity" } - let(:database) { "mydatabase" } - let(:cliauth) { false } - let(:table) { "mytable" } - let(:proxy_host) { "localhost" } - let(:proxy_port) { 80 } - let(:proxy_protocol) { "http" } - let(:json_mapping) { "mymapping" } - let(:logger) { spy('logger') } - - describe 'Ingestor' do - - it 'does not throw an error when initializing' do - RSpec.configuration.reporter.message("Running test: does not throw an error when initializing") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.not_to raise_error - RSpec.configuration.reporter.message("Completed test: does not throw an error when initializing") - end - - dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] - - context 'doesnt allow database to have some dynamic part' do - dynamic_name_array.each do |test_database| - it "with database: #{test_database}" do - RSpec.configuration.reporter.message("Running test: doesnt allow database to have some dynamic part with database: #{test_database}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, test_database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow database to have some dynamic part with database: #{test_database}") - end - end - end - - context 'doesnt allow table to have some dynamic part' do - dynamic_name_array.each do |test_table| - it "with table: #{test_table}" do - RSpec.configuration.reporter.message("Running test: doesnt allow table to have some dynamic part with table: #{test_table}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, test_table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow table to have some dynamic part with table: #{test_table}") - end - end - end - - context 'doesnt allow mapping to have some dynamic part' do - dynamic_name_array.each do |json_mapping| - it "with mapping: #{json_mapping}" do - RSpec.configuration.reporter.message("Running test: doesnt allow mapping to have some dynamic part with mapping: #{json_mapping}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow mapping to have some dynamic part with mapping: #{json_mapping}") - end - end - end - - context 'proxy protocol has to be http or https' do - it "with proxy protocol: socks" do - RSpec.configuration.reporter.message("Running test: proxy protocol has to be http or https with proxy protocol: socks") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, 'socks', logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: proxy protocol has to be http or https with proxy protocol: socks") - end - end - - context 'one of appid or managedid has to be provided' do - it "with empty managed identity and appid" do - RSpec.configuration.reporter.message("Running test: one of appid or managedid has to be provided with empty managed identity and appid") - expect { - ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "", cliauth, database, table, json_mapping, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: one of appid or managedid has to be provided with empty managed identity and appid") - end - end - end - -end diff --git a/spec/outputs/kusto/interval_spec.rb b/spec/outputs/kusto/interval_spec.rb new file mode 100644 index 00000000..ccd9cf46 --- /dev/null +++ b/spec/outputs/kusto/interval_spec.rb @@ -0,0 +1,69 @@ +# # spec/interval_test.rb +# require 'rspec' +# require 'logstash/outputs/kusto/interval' + + +# describe LogStash::Outputs::Kusto::Interval do +# let(:interval_time) { 1 } +# let(:procsy) { double("procsy", call: true) } + +# describe '#initialize' do +# it 'initializes with the correct interval and procsy' do +# interval = described_class.new(interval_time, procsy) +# expect(interval.instance_variable_get(:@interval)).to eq(interval_time) +# expect(interval.instance_variable_get(:@procsy)).to eq(procsy) +# end +# end + +# describe '#start' do +# it 'starts the interval thread' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# expect(interval.alive?).to be true +# interval.stop +# end + +# it 'does not start a new thread if already started' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# first_thread = interval.instance_variable_get(:@thread) +# interval.start +# second_thread = interval.instance_variable_get(:@thread) +# expect(first_thread).to eq(second_thread) +# interval.stop +# end +# end + +# describe '#stop' do +# it 'stops the interval thread' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# interval.stop +# expect(interval.alive?).to be false +# end +# end + +# describe '#alive?' do +# it 'returns true if the thread is alive' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# expect(interval.alive?).to be true +# interval.stop +# end + +# it 'returns false if the thread is not alive' do +# interval = described_class.new(interval_time, procsy) +# expect(interval.alive?).to be false +# end +# end + +# describe 'interval execution' do +# it 'calls the proc at the specified interval' do +# interval = described_class.new(interval_time, procsy) +# expect(procsy).to receive(:call).at_least(:twice) +# interval.start +# sleep(2.5) +# interval.stop +# end +# end +# end \ No newline at end of file diff --git a/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb new file mode 100755 index 00000000..cbb09ea7 --- /dev/null +++ b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb @@ -0,0 +1,106 @@ +# encoding: utf-8 +require_relative "../../spec_helpers.rb" +require 'logstash/outputs/kusto' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + +describe LogStash::Outputs::KustoInternal::KustoLogstashConfiguration do + + let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } + let(:app_id) { "myid" } + let(:app_key) { LogStash::Util::Password.new("mykey") } + let(:app_tenant) { "mytenant" } + let(:managed_identity) { "managed_identity" } + let(:database) { "mydatabase" } + let(:cliauth) { false } + let(:table) { "mytable" } + let(:proxy_host) { "localhost" } + let(:proxy_port) { 80 } + let(:proxy_protocol) { "http" } + let(:json_mapping) { "mymapping" } + let(:delete_local) { false } + let(:logger) { spy(:logger) } + let(:proxy_aad_only) { false } + + let(:kusto_ingest_base) { LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) } + let(:kusto_auth_base) { LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cliauth) } + let(:kusto_proxy_base) { LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) } + + describe '#initialize' do + it 'does not throw an error when initializing' do + # note that this will cause an internal error since connection is being tried. + # however we still want to test that all the java stuff is working as expected + expect { + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.not_to raise_error + end + + dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] + + context 'doesnt allow database to have some dynamic part' do + dynamic_name_array.each do |test_database| + it "with database: #{test_database}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, test_database, table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow table to have some dynamic part' do + dynamic_name_array.each do |test_table| + it "with database: #{test_table}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, test_table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow mapping to have some dynamic part' do + dynamic_name_array.each do |json_mapping| + it "with database: #{json_mapping}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'proxy protocol has to be http or https' do + it "with proxy protocol: socks" do + expect { + kusto_proxy = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , 'socks', false) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'one of appid or managedid or cli_auth has to be provided' do + it "with empty managed identity and appid" do + expect { + kusto_auth = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new("", app_key, app_tenant, "", false) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'if proxy_aad is provided' do + it "proxy details should be provided" do + expect { + kusto_proxy = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new("" , "" , proxy_protocol, true) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end +end \ No newline at end of file