Skip to content

Commit

Permalink
Refactor: leverage scheduler mixin (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
kares authored Jun 8, 2022
1 parent a6c6fa6 commit 6590dce
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 27 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.4.0
- Refactor: leverage scheduler mixin [#93](https://github.com/logstash-plugins/logstash-filter-translate/pull/93)

## 3.3.1
- Refactor: reading .csv for JRuby 9.3 compatibility [#94](https://github.com/logstash-plugins/logstash-filter-translate/pull/94)

Expand Down
26 changes: 5 additions & 21 deletions lib/logstash/filters/dictionary/file.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
# encoding: utf-8
require 'concurrent/atomic/atomic_boolean'
require 'rufus-scheduler'
require "logstash/util/loggable"
require "logstash/filters/fetch_strategy/file"

java_import 'java.util.concurrent.locks.ReentrantReadWriteLock'

module LogStash module Filters module Dictionary
class DictionaryFileError < StandardError; end

class File

include LogStash::Util::Loggable

def self.create(path, refresh_interval, refresh_behaviour, exact, regex)
if /\.y[a]?ml$/.match(path)
instance = YamlFile.new(path, refresh_interval, exact, regex)
Expand All @@ -30,14 +29,12 @@ def self.create(path, refresh_interval, refresh_behaviour, exact, regex)
end
end

include LogStash::Util::Loggable
attr_reader :dictionary, :fetch_strategy

def initialize(path, refresh_interval, exact, regex)
@dictionary_path = path
@refresh_interval = refresh_interval
@short_refresh = @refresh_interval <= 300
@stopping = Concurrent::AtomicBoolean.new # ported from jdbc_static, need a way to prevent a scheduled execution from running a load.
rw_lock = java.util.concurrent.locks.ReentrantReadWriteLock.new
@write_lock = rw_lock.writeLock
@dictionary = Hash.new
Expand All @@ -51,13 +48,6 @@ def initialize(path, refresh_interval, exact, regex)
end
@fetch_strategy = klass.new(*args)
load_dictionary(raise_exception = true)
stop_scheduler(initial = true)
start_scheduler unless @refresh_interval <= 0 # disabled, a scheduler interval of zero makes no sense
end

def stop_scheduler(initial = false)
@stopping.make_true unless initial
@scheduler.shutdown(:wait) if @scheduler
end

def load_dictionary(raise_exception=false)
Expand Down Expand Up @@ -88,13 +78,6 @@ def read_file_into_dictionary

private

def start_scheduler
@scheduler = Rufus::Scheduler.new
@scheduler.interval("#{@refresh_interval}s", :overlap => false) do
reload_dictionary
end
end

def merge_dictionary
@write_lock.lock
begin
Expand All @@ -116,14 +99,15 @@ def replace_dictionary
end
end

# scheduler executes this method, periodically
def reload_dictionary
return if @stopping.true?
if @short_refresh
load_dictionary if needs_refresh?
else
load_dictionary
end
end
public :reload_dictionary

def needs_refresh?
@dictionary_mtime != ::File.mtime(@dictionary_path).to_f
Expand Down
11 changes: 7 additions & 4 deletions lib/logstash/filters/translate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/deprecation_logger_support'
require 'logstash/plugin_mixins/scheduler'

require "logstash/filters/dictionary/memory"
require "logstash/filters/dictionary/file"
Expand Down Expand Up @@ -44,6 +45,8 @@ class Translate < LogStash::Filters::Base

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::Scheduler

config_name "translate"

# The name of the logstash event field containing the value to be compared for a
Expand Down Expand Up @@ -228,11 +231,11 @@ def register
else
@logger.debug? && @logger.debug("#{self.class.name}: Dictionary translation method - Fuzzy")
end
end # def register

def close
@lookup.stop_scheduler
end
if @lookup.respond_to?(:reload_dictionary) && @refresh_interval > 0 # a scheduler interval of zero makes no sense
scheduler.interval("#{@refresh_interval}s", overlap: false) { @lookup.reload_dictionary }
end
end # def register

def filter(event)
return unless @updater.test_for_inclusion(event, @override)
Expand Down
4 changes: 2 additions & 2 deletions logstash-filter-translate.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-translate'
s.version = '3.3.1'
s.version = '3.4.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Replaces field contents based on a hash or YAML file"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -24,7 +24,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.2'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-deprecation_logger_support', '~> 1.0'
s.add_runtime_dependency 'rufus-scheduler'
s.add_runtime_dependency "logstash-mixin-scheduler", '~> 1.0'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'rspec-sequencing'
Expand Down

0 comments on commit 6590dce

Please sign in to comment.