Skip to content

Commit

Permalink
Fix the serf watcher for synapse according to new base class and add …
Browse files Browse the repository at this point in the history
…tests
  • Loading branch information
Azhagu Selvan committed Feb 24, 2020
1 parent 88d0056 commit fa3e996
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 122 deletions.
159 changes: 67 additions & 92 deletions lib/synapse/service_watcher/serf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def start
# There is an edge case here:
# say that there have been several changes in the network in the last seconds,
# such as a massive rejoin after a network partition.
# The software might pick up a file dated from the beginning of the second and
# The software might pick up a file dated from the beginning of the second and
# it might then be updated at the end of the second, and it wouldn't be noticed
# by stat(2) because it only does second timestamps
#
Expand All @@ -41,141 +41,116 @@ def start
def watch
until @should_exit
begin
ctime = File.stat(@serf_members).ctime.to_i

if ctime > @last_ctime or (@last_discover < ctime + 10 and Time.new.to_i > ctime + 10)
@last_ctime = ctime
discover()
if is_it_time_yet?
if set_backends(discover())
log.info "serf backends have changed!"
end
end
rescue => e
log.warn "Error in watcher thread: #{e.inspect}"
log.warn e.backtrace
ensure
sleep_until_next_check()
end

sleep @cycle_delay
end

log.info "serf watcher exited successfully"
end

def stop
Thread.kill(@watcher)
# Must restart???
log.info "kill watcher for serf"
def sleep_until_next_check()
sleep(@cycle_delay)
end

#def ping?
# @zk.ping?
#end
def is_it_time_yet?
ctime = File.stat(@serf_members).ctime.to_i
if ctime > @last_ctime or (@last_discover < ctime + 10 and Time.new.to_i > ctime + 10)
@last_ctime = ctime
true
else
false
end
end

# find the current backends at the discovery path; sets @backends
def discover
log.info "discovering backends for service #{@name}"
@last_discover = Time.now.to_i

new_backends = []

# PUT A BEGIN HERE?
members_raw = File.read @serf_members
return if members_raw == @last_members_raw

members = false
# and sort to compare
new_backends = parse_members_json(@name, members_raw).sort! { |a,b| a.to_s <=> b.to_s }
new_backends
end

def parse_members_json(name, members_raw)
new_backends = []

begin
members = JSON.parse(members_raw)
rescue Exception => e
log.info "exception parsing json #{e.inspect}"
members = false
return new_backends
end

members = false unless members.is_a? Hash
return new_backends unless members.is_a? Hash

if members.has_key? 'members'
members = members['members']
else
members = false
return new_backends
end

members = false unless members.is_a? Array

new_backends = []

if members
# Now I do my pretty parsing

# please note that because of
# https://github.com/airbnb/smartstack-cookbook/blob/master/recipes/nerve.rb#L71
# the name won't just be the name you gave but name_port. this allows a same
# service to be on multiple ports of a same machine.

members.each do |member|
next unless member['status'] == 'alive'
member['tags'].each do |tag,data|
if tag =~ /^smart:#{@name}(|_[0-9]+)$/
host,port = data.split ':'

# Special trick
# If we have a all_backups_except_one option
# We use it here
# It makes every server except the one we specify (typically the current one)
# be the only one that doesn't have a 'backup' flag in haproxy. Useful for a
# scenario where we have a lot of slave servers (ie. mysql, sphinx) that have
# a local copy of data, and we prefer them, but want to fallback to the others
# in case of a problem

extra_haproxy_conf = ''

if @all_backups_except_one
if host != @all_backups_except_one
extra_haproxy_conf = 'backup'
end
return new_backends unless members.is_a? Array

# Now I do my pretty parsing

# please note that because of
# https://github.com/airbnb/smartstack-cookbook/blob/master/recipes/nerve.rb#L71
# the name won't just be the name you gave but name_port. this allows a same
# service to be on multiple ports of a same machine.
members.each do |member|
next unless member['status'] == 'alive'
member['tags'].each do |tag,data|
puts name, tag, data
if tag =~ /^smart:#{name}(|_[0-9]+)$/
host,port = data.split ':'

# Special trick
# If we have a all_backups_except_one option
# We use it here
# It makes every server except the one we specify (typically the current one)
# be the only one that doesn't have a 'backup' flag in haproxy. Useful for a
# scenario where we have a lot of slave servers (ie. mysql, sphinx) that have
# a local copy of data, and we prefer them, but want to fallback to the others
# in case of a problem

extra_haproxy_conf = ''

if @all_backups_except_one
if host != @all_backups_except_one
extra_haproxy_conf = 'backup'
end


new_backends << {
'name' => member['name'],
'host' => host,
'port' => port,
'extra_haproxy_conf' => extra_haproxy_conf,
}
log.debug "discovered backend #{member['name']} at #{host}:#{port} for service #{@name}"
end
end
end

# and sort to compare
new_backends.sort! { |a,b| a.to_s <=> b.to_s }


new_backends_s = new_backends.to_s
if new_backends_s == @last_backends_s
# we got the same result as last time - no need to reconfigure
log.info "serf members list for #{@name} returned identical results - not reconfiguring"
return
end
@last_backends_s = new_backends_s
end

if new_backends.empty?
if @default_servers.empty?
log.warn "no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}"
else
log.warn "no backends for service #{@name}; using default servers: #{@default_servers.inspect}"
@backends = @default_servers
new_backends << {
'name' => member['name'],
'host' => host,
'port' => port,
'extra_haproxy_conf' => extra_haproxy_conf,
}
log.debug "discovered backend #{member['name']} at #{host}:#{port} for service #{name}"
end
end
else
log.info "discovered #{new_backends.length} backends for service #{@name}"
@backends = new_backends
@synapse.reconfigure!
end
return new_backends
end

private
#WTF is the use of this??
def validate_discovery_opts
raise ArgumentError, "invalid discovery method #{@discovery['method']}" \
unless @discovery['method'] == 'serf'
raise ArgumentError, "invalid discovery method #{@discovery['method']}" unless @discovery['method'] == 'serf'
end

end
end
30 changes: 0 additions & 30 deletions spec/lib/synapse/serf_spec.rb

This file was deleted.

120 changes: 120 additions & 0 deletions spec/lib/synapse/service_watcher_serf_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
require 'spec_helper'
require 'synapse/service_watcher/serf'

describe Synapse::ServiceWatcher::SerfWatcher do
let(:mock_synapse) do
mock_synapse = instance_double(Synapse::Synapse)
mockgenerator = Synapse::ConfigGenerator::BaseGenerator.new()
allow(mock_synapse).to receive(:available_generators).and_return({
'haproxy' => mockgenerator
})
allow(mock_synapse).to receive(:reconfigure!).and_return(true)
mock_synapse
end
let(:discovery) { { 'method' => 'serf', 'hosts' => 'somehost','path' => 'some/path' } }

let(:config) do
{
'name' => 'test',
'haproxy' => {},
'discovery'=> discovery
}
end

context 'SerfWatcher' do
subject { Synapse::ServiceWatcher::SerfWatcher.new(config, mock_synapse) }
it 'should validate' do
expect(subject.send(:validate_discovery_opts)).to be_nil
end
context 'watch' do
it 'should discover new backends' do
fake_backends = [1,2,3]
expect(subject).to receive(:discover).and_return(fake_backends)
expect(subject).to receive(:is_it_time_yet?).and_return(true)
expect(subject).to receive(:set_backends).with(fake_backends) { subject.stop }
expect(subject).to receive(:sleep_until_next_check)
subject.send(:watch)
end

it 'sleeps until next check if discover_instances fails' do
expect(subject).to receive(:is_it_time_yet?).and_return(true)
expect(subject).to receive(:discover) do
subject.stop
raise "discover failed"
end
expect(subject).to receive(:sleep_until_next_check)
subject.send(:watch)
end
end

context 'parse_members_json' do
it 'should parse json and return backends' do
members_json = <<-eos
{
"members": [
{
"name": "ip-10-0-2-7",
"addr": "10.0.2.7:7946",
"port": 7946,
"tags": {
"smart:espresso-quality-v4_test_9529": "10.0.2.7:9529"
},
"status": "alive",
"protocol": {
"max": 4,
"min": 2,
"version": 4
}
},
{
"name": "ip-10-0-2-118",
"addr": "10.0.2.118:7946",
"port": 7946,
"tags": {
"smart:espresso-pa-v1-scores_test_9528": "10.0.2.118:9528"
},
"status": "alive",
"protocol": {
"max": 4,
"min": 2,
"version": 4
}
},
{
"name": "ip-10-0-2-221",
"addr": "10.0.2.221:7946",
"port": 7946,
"tags": {
"smart:espresso-coordinator_test_9520": "10.0.2.221:9520"
},
"status": "alive",
"protocol": {
"max": 4,
"min": 2,
"version": 4
}
},
{
"name": "ip-10-0-2-156",
"addr": "10.0.2.156:7946",
"port": 7946,
"tags": {
"smart:espresso-captions-v3_test_9525": "10.0.2.156:9525"
},
"status": "alive",
"protocol": {
"max": 4,
"min": 2,
"version": 4
}
}
]
}
eos
expect(subject.parse_members_json('espresso-coordinator_test', members_json)).to eq [{"name"=>"ip-10-0-2-221","host"=>"10.0.2.221","port"=>"9520","extra_haproxy_conf"=>""}]
expect(subject.parse_members_json('espresso-captions-v3_test', members_json)).to eq [{"name"=>"ip-10-0-2-156","host"=>"10.0.2.156","port"=>"9525","extra_haproxy_conf"=>""}]
expect(subject.parse_members_json('non-existent', members_json)).to eq []
end
end
end
end

0 comments on commit fa3e996

Please sign in to comment.