From b624d014920930bf079f86ef1e8bb5dec4ec9c6a Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Mon, 11 Nov 2024 18:57:42 +0530 Subject: [PATCH] *Make some fixes for retries and upload --- lib/logstash/outputs/kusto.rb | 1 - lib/logstash/outputs/kusto/ingestor.rb | 61 +++++++++++++------------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index b53b648..893dba7 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -136,7 +136,6 @@ def close @logger.error("Error stopping ingestor: #{e.message}") @logger.error(e.backtrace.join("\n")) end - @logger.info("Kusto output plugin Closed") end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index f06a282..0f9a6c2 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -21,6 +21,8 @@ class Ingestor FIELD_REF = /%\{[^}]+\}/ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL) + @retry_count = 3 + @retry_delay = 10 @workers_pool = threadpool @logger = logger #Validate and assign @@ -85,6 +87,15 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) end + # retry_policy = Java::com.azure.storage.common.policy + # duration = Java::java.time.Duration.ofSeconds(5) + + # fixed_delay_options = Java::com.azure.core.http.policy.FixedDelayOptions.new(1,duration) + # retry_options = Java::com.azure.core.http.policy.RetryOptions.new(fixed_delay_options) + # req_retry_options = Java::com.azure.storage.common.policy.RequestRetryOptions.fromRetryOptions(retry_options, Java::java.time.Duration.ofSeconds(10), "") + + # queued_ingest_client = @kusto_client.to_java(Java::com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl) + # queued_ingest_client.setQueueRequestOptions(req_retry_options) @logger.debug('Kusto resources are ready.') end @@ -92,44 +103,32 @@ def upload_async(data) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") end - exception = nil @workers_pool.post do - LogStash::Util.set_thread_name("Kusto to ingest data") - begin - upload(data) - rescue => e - @logger.error('Error during async upload.', exception: e.class, message: e.message, backtrace: e.backtrace) - exception = e - end + LogStash::Util.set_thread_name("Kusto to ingest data #{JRuby.reference(Thread.current).native_thread.id}") + upload(data) end - # Wait for the task to complete and check for exceptions - @workers_pool.shutdown - @workers_pool.wait_for_termination - - raise exception if exception - rescue Exception => e - @logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e end def upload(data) - @logger.debug("Sending data to Kusto") - if data.size > 0 - begin - data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) - rescue => e - @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e + begin + @logger.debug("Sending data to Kusto") + if data.size > 0 + data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) + result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + else + @logger.warn("Data is empty and is not ingested.") + end + @logger.debug("Data sent to Kusto.") + rescue => e + if tries < @retry_count + tries += 1 + logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) + sleep @retry_delay + retry + else + logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) end - else - @logger.warn("Data is empty and is not ingested.") end - - @logger.debug("Data sent to Kusto.") - rescue => e - @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e end def stop