Skip to content

Commit

Permalink
leader election framework for performance metric collection
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Roushan committed Jun 3, 2021
1 parent c46cdef commit b6df07d
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 7 deletions.
4 changes: 4 additions & 0 deletions delfin/cmd/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def main():

task_server = service.TaskService.create(binary='delfin-task',
coordination=True)
leader_election = service.LeaderElectionService.create()

service.serve(task_server)
service.serve(leader_election)

service.wait()


Expand Down
46 changes: 46 additions & 0 deletions delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
help='The backend type for distributed coordination.'
'Backend could be redis, mysql, zookeeper and so on.'
'For more supported backend, please check Tooz'),
cfg.StrOpt('leader_plugin',
default='tooz',
help='The leader election framework plugin options.'
'Currently only Tooz plugin is supported'),
cfg.StrOpt('backend_user',
default='',
help='The backend user for distributed coordination.'),
Expand Down Expand Up @@ -77,6 +81,10 @@ def start(self):
# NOTE(gouthamr): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')

LOG.info('Started Coordinator (Agent ID: %(agent)s, prefix: '
'%(prefix)s)', {'agent': self.agent_id,
'prefix': self.prefix})

backend_url = _get_redis_backend_url()
self.coordinator = coordination.get_coordinator(
backend_url, member_id,
Expand Down Expand Up @@ -112,6 +120,44 @@ def get_lock(self, name):
LOCK_COORDINATOR = Coordinator(prefix='delfin-')


class LeaderElectionCoordinator:

def __init__(self, group, coordinator=None):
self.group = group
self.coordinator = coordinator

def ensure_group(self):
# check if group exist
req = self.coordinator.get_groups()
groups = req.get()
try:
groups.index(self.group)
except Exception:
# Create a group
request = self.coordinator.create_group(self.group)
request.get()
else:
LOG.info("Leader group already exist")

def join_group(self):
# Join a group
request = self.coordinator.join_group(self.group)
request.get()

def register_leader_callback(self, callback):
return self.coordinator.watch_elected_as_leader(self.group, callback)

def start_heartbeat(self):
return self.coordinator.heartbeat()

def start_leader_watch(self):
return self.coordinator.run_watchers()

def cleanup(self):
self.coordinator.stand_down_group_leader(self.group)
self.coordinator.leave_group(self.group)


class Lock(locking.Lock):
"""Lock with dynamic name.
Expand Down
Empty file.
92 changes: 92 additions & 0 deletions delfin/leader_election/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2020 The SODA Authors.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Leader election interface defined"""

import six
import abc


class LeaderCallback:

def __init__(self):
self.on_started_leading = None
"""on_started_leading is called when elected as leader"""

self.on_stopped_leading = None
"""on_stopped_leading is called when Leader give up its leadership"""

@staticmethod
def register(on_started_leading, on_stop_leading):
callback = LeaderCallback()
callback.on_started_leading = on_started_leading
callback.on_stopped_leading = on_stop_leading
return callback


# @six.add_metaclass(abc.ABCMeta)
# class LeaderElectionLock:
#
# @abc.abstractmethod
# def get(self):
# """Get returns the LeaderElectionRecord"""
# pass
#
# @abc.abstractmethod
# def create(self, record):
# """Create attempts to create a LeaderElectionRecord"""
# pass
#
# @abc.abstractmethod
# def update(self, record):
# """Update will update and existing LeaderElectionRecord"""
# pass
#
# @abc.abstractmethod
# def identity(self):
# """Identity will return the locks Identity"""
# pass
#
# @abc.abstractmethod
# def describe(self):
# """Describe is used to convert details on current resource lock
# into a string"""
# pass


@six.add_metaclass(abc.ABCMeta)
class LeaderElector:

def __init__(self, callbacks, election_key):
self.callbacks = callbacks
self.election_key = election_key

@abc.abstractmethod
def run(self):
"""kick start leader election.
invoke callback.on_started_leading callback once elected as leader
invoke callback.on_stopped_leading callback once lose leadership
run returns once leader losses its leadership
"""
pass

def cleanup(self):
"""Cleanup leader election residue
"""
pass
Empty file.
56 changes: 56 additions & 0 deletions delfin/leader_election/tooz/leader_elector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2020 The SODA Authors.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Leader elector is leased based leader election"""

from delfin.leader_election.interface import LeaderElector
from delfin.coordination import LeaderElectionCoordinator
from delfin.coordination import LOCK_COORDINATOR
from oslo_log import log

import time

LOG = log.getLogger(__name__)

class Elector(LeaderElector):

def __init__(self, callbacks, election_key):
election_key = election_key.encode('ascii')
super(Elector, self).__init__(callbacks, election_key)
self.coordinator = None

def run(self):
if not self.coordinator:
LOCK_COORDINATOR.start()
self.coordinator = LeaderElectionCoordinator(self.election_key,
LOCK_COORDINATOR.
coordinator)
self.coordinator.ensure_group()
self.coordinator.join_group()
self.coordinator.register_leader_callback(self.callbacks.
on_started_leading)
while True:
LOG.info("sending heartbeat started")
timeout = self.coordinator.start_heartbeat()
LOG.info("started leader watch")
self.coordinator.start_leader_watch()
LOG.info("Return from leader watch")
time.sleep(timeout/2)

def cleanup(self):
self.coordinator.cleanup()
82 changes: 77 additions & 5 deletions delfin/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from delfin import coordination
from delfin import rpc
from delfin.task_manager.scheduler import schedule_manager
from delfin.leader_election.interface import LeaderCallback
from delfin.leader_election.tooz.leader_elector import Elector

LOG = log.getLogger(__name__)

Expand Down Expand Up @@ -269,7 +271,76 @@ def create(cls, host=None, binary=None, topic=None,

def start(self):
super(TaskService, self).start()
schedule_manager.SchedulerManager().start()


class LeaderElectionService(service.Service):
"""Leader election service for distributed system
The service takes callback functions and leader election unique
key to synchronize leaders in distributed environment
"""

def __init__(self, callback, key, *args, **kwargs):
super(LeaderElectionService, self).__init__()
self.callback = callback
self.key = key
self.leader_elector = None

def start(self):
"""Start leader election service
"""

# TODO(Amit): Create a factory pattern for plugin object
self.leader_elector = Elector(self.callback, self.key)

while True:
# start/restart participating in leader election
self.leader_elector.run()

# cleanup when losses the leadership
self.leader_elector.cleanup()

def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)

@classmethod
def create(cls, *args, **kwargs):
"""Instantiates class and passes back application object.
"""

# create callback object
scheduler_mgr = schedule_manager.SchedulerManager()
callback = LeaderCallback.register(
on_started_leading=scheduler_mgr.start,
on_stop_leading=scheduler_mgr.stop)

# maintain a unique key for metric collection leader election
leader_election_key = "performance-metric-collection-key"

service_obj = cls(callback, leader_election_key,
*args, **kwargs)

return service_obj

def kill(self):
"""Destroy the service object in the datastore."""
self.stop()

def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
# cleanup when losses the leadership
if self.leader_elector:
self.leader_elector.cleanup()
except Exception:
pass

super(LeaderElectionService, self).stop()

def wait(self):
pass


class WSGIService(service.ServiceBase):
Expand Down Expand Up @@ -396,10 +467,11 @@ def process_launcher():

def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError('serve() can only be called once')
_launcher = service.launch(CONF, server, workers=workers,
restart_method='mutate')
if not _launcher:
_launcher = service.Launcher(CONF, restart_method='mutate')
# raise RuntimeError('serve() can only be called once')

_launcher.launch_service(server, workers=workers)


def wait():
Expand Down
5 changes: 4 additions & 1 deletion delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SchedulerManager(object):
def __init__(self):
self.schedule_instance = scheduler.Scheduler.get_instance()

def start(self):
def start(self, *args, **kwargs):
""" Initialise the schedulers for periodic job creation
"""
ctxt = context.get_admin_context()
Expand All @@ -50,3 +50,6 @@ def start(self):
six.text_type(e))
else:
self.schedule_instance.start()

def stop(self, *args, **kwargs):
"""Cleanup periodic jobs"""
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Routes>=2.3.1 # MIT
six>=1.10.0 # MIT
SQLAlchemy>=1.3.0 # MIT
stevedore>=1.20.0 # Apache-2.0
tooz>=1.58.0 # Apache-2.0
tooz==2.8.0 # Apache-2.0
WebOb>=1.7.1 # MIT
pysnmp>=4.4.11 # BSD
redis>=3.3.8 # MIT
Expand Down

0 comments on commit b6df07d

Please sign in to comment.