Skip to content

Commit

Permalink
Fix reconnect logging and timing depending on error type (#523)
Browse files Browse the repository at this point in the history
* Fix reconnect logging and timing depending on error type

* cleanup flake8 errors

* add missing build-essential

* start at most one thread and close connection when reconnecting
  • Loading branch information
drewkerrigan authored Dec 8, 2017
1 parent 4f99f98 commit f0c2433
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 81 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ COPY requirements.txt /marathon-lb/

RUN set -x \
&& buildDeps=' \
build-essential \
gcc \
libcurl4-openssl-dev \
libffi-dev \
Expand Down
200 changes: 119 additions & 81 deletions marathon_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import sys
import threading
import time
import traceback
import datetime
from itertools import cycle
from operator import attrgetter
Expand All @@ -41,6 +40,7 @@

import dateutil.parser
import requests
import pycurl

from common import (get_marathon_auth_params, set_logging_args,
set_marathon_auth_args, setup_logging, cleanup_json)
Expand Down Expand Up @@ -227,16 +227,17 @@ def tasks(self):

def get_event_stream(self):
url = self.host + "/v2/events"
logger.info(
"SSE Active, trying fetch events from {0}".format(url))
return CurlHttpEventStream(url, self.__auth, self.__verify)

resp = CurlHttpEventStream(url, self.__auth, self.__verify)
def iter_events(self, stream):
logger.info(
"SSE Active, trying fetch events from {0}".format(stream.url))

class Event(object):
def __init__(self, data):
self.data = data

for line in resp.iter_lines():
for line in stream.iter_lines():
if line.strip() != '':
for real_event_data in re.split(r'\r\n',
line.decode('utf-8')):
Expand Down Expand Up @@ -1160,16 +1161,16 @@ def truncateMapFileIfExists(map_file):
os.ftruncate(fd, 0)
os.close(fd)


def generateAndValidateConfig(config, config_file, domain_map_array,
app_map_array, haproxy_map):
app_map_array, haproxy_map):
domain_map_file = os.path.join(os.path.dirname(config_file),
"domain2backend.map")
app_map_file = os.path.join(os.path.dirname(config_file),
"app2backend.map")

domain_map_string = str()
app_map_string = str()
runningConfig = str()

if haproxy_map:
domain_map_string = generateMapString(domain_map_array)
Expand All @@ -1178,15 +1179,13 @@ def generateAndValidateConfig(config, config_file, domain_map_array,
return writeConfigAndValidate(
config, config_file, domain_map_string, domain_map_file,
app_map_string, app_map_file, haproxy_map)



def compareWriteAndReloadConfig(config, config_file, domain_map_array,
app_map_array, haproxy_map):

changed = False
config_valid = False

# See if the last config on disk matches this, and if so don't reload
# haproxy
domain_map_file = os.path.join(os.path.dirname(config_file),
Expand Down Expand Up @@ -1249,7 +1248,7 @@ def compareWriteAndReloadConfig(config, config_file, domain_map_array,
changed = False
config_valid = True
logger.debug("skipping reload: config unchanged")

return changed, config_valid


Expand Down Expand Up @@ -1521,31 +1520,32 @@ def regenerate_config(marathon, config_file, groups, bind_http_https,
templater, haproxy_map, domain_map_array,
app_map_array, config_file)

(changed, config_valid) = compareWriteAndReloadConfig(generated_config, config_file,
domain_map_array, app_map_array, haproxy_map)
(changed, config_valid) = compareWriteAndReloadConfig(
generated_config, config_file, domain_map_array, app_map_array,
haproxy_map)

if changed and not config_valid:
apps = make_config_valid_and_regenerate(marathon, groups, bind_http_https, ssl_certs,
templater, haproxy_map, domain_map_array,
app_map_array, config_file)
apps = make_config_valid_and_regenerate(
marathon, groups, bind_http_https, ssl_certs, templater,
haproxy_map, domain_map_array, app_map_array, config_file)

return apps


# Build up a valid configuration by adding one app at a time and checking
# for valid config file after each app
def make_config_valid_and_regenerate(marathon, groups, bind_http_https, ssl_certs,
templater, haproxy_map, domain_map_array,
app_map_array, config_file):
def make_config_valid_and_regenerate(marathon, groups, bind_http_https,
ssl_certs, templater, haproxy_map,
domain_map_array, app_map_array,
config_file):
try:
start_time = time.time()

valid_marathon_apps = []
marathon_apps = marathon.list()
apps = []
excluded_ids = []

for app in marathon_apps:

valid_marathon_apps.append(app)
apps = get_apps(marathon, valid_marathon_apps)

Expand All @@ -1555,33 +1555,41 @@ def make_config_valid_and_regenerate(marathon, groups, bind_http_https, ssl_cert
generated_config = config(apps, groups, bind_http_https, ssl_certs,
templater, haproxy_map, domain_map_array,
app_map_array, config_file)

if not generateAndValidateConfig(generated_config, config_file,
domain_map_array, app_map_array, haproxy_map):
domain_map_array, app_map_array,
haproxy_map):
invalid_id = valid_marathon_apps[-1]["id"]
logger.warn("invalid configuration caused by app %s; it will be excluded", invalid_id)
logger.warn(
"invalid configuration caused by app %s; "
"it will be excluded", invalid_id)
excluded_ids.append(invalid_id)
del valid_marathon_apps[-1]

if len(valid_marathon_apps) == 0:
apps = []

if len(valid_marathon_apps) > 0:
logger.debug("regentrating valid config which excludes the following apps: %s", excluded_ids)
compareWriteAndReloadConfig(generated_config, config_file,
domain_map_array, app_map_array, haproxy_map)
logger.debug("regentrating valid config which excludes the"
"following apps: %s", excluded_ids)
compareWriteAndReloadConfig(generated_config,
config_file,
domain_map_array,
app_map_array, haproxy_map)
else:
logger.error("A valid config file could not be generated after excluding all apps! skipping reload")

logger.debug("regenerating while excluding invalid tasks finished, took %s seconds",
time.time() - start_time)
logger.error("A valid config file could not be generated after"
"excluding all apps! skipping reload")

logger.debug("regenerating while excluding invalid tasks finished, "
"took %s seconds",
time.time() - start_time)

return apps

except requests.exceptions.ConnectionError as e:
logger.error("Connection error({0}): {1}".format(
e.errno, e.strerror))
except:
except Exception:
logger.exception("Unexpected error!")


Expand All @@ -1603,28 +1611,36 @@ def __init__(self, marathon, config_file, groups,
self.__pending_reload = False
self.__haproxy_map = haproxy_map

self.__thread = None

# Fetch the base data
self.reset_from_tasks()

def start(self):
self.__stop = False
if self.__thread is not None and self.__thread.is_alive():
self.reset_from_tasks()
return
self.__thread = threading.Thread(target=self.try_reset)
self.__thread.start()

def try_reset(self):
with self.__condition:
logger.info('starting event processor thread')
logger.info('({}): starting event processor thread'.format(
threading.get_ident()))
while True:
self.__condition.acquire()

if self.__stop:
logger.info('stopping event processor thread')
logger.info('({}): stopping event processor thread'.format(
threading.get_ident()))
self.__condition.release()
return

if not self.__pending_reset and not self.__pending_reload:
if not self.__condition.wait(300):
logger.info('condition wait expired')
logger.info('({}): condition wait expired'.format(
threading.get_ident()))

pending_reset = self.__pending_reset
pending_reload = self.__pending_reload
Expand Down Expand Up @@ -1655,21 +1671,25 @@ def do_reset(self):
self.__templater,
self.__haproxy_map)

logger.debug("updating tasks finished, took %s seconds",
time.time() - start_time)
logger.debug("({0}): updating tasks finished, "
"took {1} seconds".format(
threading.get_ident(),
time.time() - start_time))
except requests.exceptions.ConnectionError as e:
logger.error("Connection error({0}): {1}".format(
e.errno, e.strerror))
except:
logger.error("({0}): Connection error({1}): {2}".format(
threading.get_ident(), e.errno, e.strerror))
except Exception:
logger.exception("Unexpected error!")

def do_reload(self):
try:
# Validate the existing config before reloading
logger.debug("attempting to reload existing config...")
logger.debug("({}): attempting to reload existing "
"config...".format(
threading.get_ident()))
if validateConfig(self.__config_file):
reloadConfig()
except:
except Exception:
logger.exception("Unexpected error!")

def stop(self):
Expand Down Expand Up @@ -1792,34 +1812,6 @@ def get_arg_parser():
return parser


def process_sse_events(marathon, processor):
try:
processor.start()
events = marathon.get_event_stream()
for event in events:
try:
# logger.info("received event: {0}".format(event))
# marathon might also send empty messages as keepalive...
if (event.data.strip() != ''):
# marathon sometimes sends more than one json per event
# e.g. {}\r\n{}\r\n\r\n
for real_event_data in re.split(r'\r\n', event.data):
data = load_json(real_event_data)
logger.info(
"received event of type {0}"
.format(data['eventType']))
processor.handle_event(data)
else:
logger.info("skipping empty message")
except:
print(event.data)
print("Unexpected error:", sys.exc_info()[0])
traceback.print_stack()
raise
finally:
processor.stop()


def load_json(data_str):
return cleanup_json(json.loads(data_str))

Expand Down Expand Up @@ -1891,21 +1883,67 @@ def load_json(data_str):
args.haproxy_map)
signal.signal(signal.SIGHUP, processor.handle_signal)
signal.signal(signal.SIGUSR1, processor.handle_signal)
backoff = 3
backoffFactor = 1.5
waitSeconds = 3
maxWaitSeconds = 300
waitResetSeconds = 600
while True:
stream_started = time.time()
currentWaitSeconds = random.random() * waitSeconds
stream = marathon.get_event_stream()
try:
process_sse_events(marathon, processor)
except:
# processor start is now idempotent and will start at
# most one thread
processor.start()
events = marathon.iter_events(stream)
for event in events:
if (event.data.strip() != ''):
# marathon sometimes sends more than one json per event
# e.g. {}\r\n{}\r\n\r\n
for real_event_data in re.split(r'\r\n', event.data):
data = load_json(real_event_data)
logger.info(
"received event of type {0}"
.format(data['eventType']))
processor.handle_event(data)
else:
logger.info("skipping empty message")
except pycurl.error as e:
errno, e_msg = e.args
# Error number 28:
# 'Operation too slow. Less than 1 bytes/sec transferred
# the last 300 seconds'
# This happens when there is no activity on the marathon
# event stream for the last 5 minutes. In this case we
# should immediately reconnect in case the connection to
# marathon died silently so that we miss as few events as
# possible.
if errno == 28:
m = 'Possible timeout detected: {}, reconnecting now...'
logger.info(m.format(e_msg))
currentWaitSeconds = 0
else:
logger.exception("Caught exception")
logger.error("Reconnecting in {}s...".format(
currentWaitSeconds))
except Exception:
logger.exception("Caught exception")
backoff = backoff * 1.5
if backoff > 300:
backoff = 300
logger.error("Reconnecting in {}s...".format(backoff))
# Reset the backoff if it's been more than 10 minutes
if time.time() - stream_started > 600:
backoff = 3
time.sleep(random.random() * backoff)
logger.error("Reconnecting in {}s...".format(
currentWaitSeconds))
# We must close the connection because we are calling
# get_event_stream on the next loop
stream.curl.close()
if currentWaitSeconds > 0:
# Increase the next waitSeconds by the backoff factor
waitSeconds = backoffFactor * waitSeconds
# Don't sleep any more than 5 minutes
if waitSeconds > maxWaitSeconds:
waitSeconds = maxWaitSeconds
# Reset the backoff if it's been more than 10 minutes
if (time.time() - stream_started) > waitResetSeconds:
waitSeconds = 3
time.sleep(currentWaitSeconds)
processor.stop()
else:
# Generate base config
regenerate_config(marathon,
Expand Down

0 comments on commit f0c2433

Please sign in to comment.