Skip to content

Commit

Permalink
distributed performance metric collection framework
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Roushan committed Jun 17, 2021
1 parent 1c0a644 commit a1ea770
Show file tree
Hide file tree
Showing 19 changed files with 575 additions and 104 deletions.
4 changes: 4 additions & 0 deletions delfin/cmd/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def main():

task_server = service.TaskService.create(binary='delfin-task',
coordination=True)
leader_election = service.LeaderElectionService.create()

service.serve(task_server)
service.serve(leader_election)

service.wait()


Expand Down
82 changes: 81 additions & 1 deletion delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
help='The backend server for distributed coordination.'),
cfg.IntOpt('expiration',
default=100,
help='The expiration(in second) of the lock.')
help='The expiration(in second) of the lock.'),
cfg.IntOpt('lease_timeout',
default=15,
help='The expiration(in second) of the lock.'),
]

CONF = cfg.CONF
Expand Down Expand Up @@ -77,6 +80,10 @@ def start(self):
# NOTE(gouthamr): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')

LOG.info('Started Coordinator (Agent ID: %(agent)s, prefix: '
'%(prefix)s)', {'agent': self.agent_id,
'prefix': self.prefix})

backend_url = _get_redis_backend_url()
self.coordinator = coordination.get_coordinator(
backend_url, member_id,
Expand Down Expand Up @@ -112,6 +119,79 @@ def get_lock(self, name):
LOCK_COORDINATOR = Coordinator(prefix='delfin-')


class LeaderElectionCoordinator(Coordinator):

def __init__(self, agent_id=None):
super(LeaderElectionCoordinator, self). \
__init__(agent_id=agent_id, prefix="leader_election")
self.group = None

def start(self):
"""Connect to coordination back end."""
if self.started:
return

# NOTE(gouthamr): Tooz expects member_id as a byte string.
member_id = (self.prefix + "-" + self.agent_id).encode('ascii')
LOG.info('Started Coordinator (Agent ID: %(agent)s, '
'prefix: %(prefix)s)', {'agent': self.agent_id,
'prefix': self.prefix})

backend_url = _get_redis_backend_url()
self.coordinator = coordination.get_coordinator(
backend_url, member_id,
timeout=CONF.coordination.lease_timeout)
self.coordinator.start()
self.started = True

def ensure_group(self, group):
# request groups
req = self.coordinator.get_groups()
groups = req.get()
try:
# check if group exist
groups.index(group)
except Exception:
# create a group if not exist
request = self.coordinator.create_group(group)
request.get()
else:
LOG.info("Leader group already exist")

self.group = group

def join_group(self):
# Join a group
if self.group:
request = self.coordinator.join_group(self.group)
request.get()

def register_on_start_leading_callback(self, callback):
return self.coordinator.watch_elected_as_leader(self.group, callback)

def start_heartbeat(self):
return self.coordinator.heartbeat()

def start_leader_watch(self):
return self.coordinator.run_watchers()

def stop(self):
"""Disconnect from coordination back end."""
if self.started:
# stop coordinator
self.coordinator.stop()
self.coordinator = None
self.started = False

LOG.info('Stopped Coordinator (Agent ID: %(agent)s',
{'agent': self.agent_id})

def is_still_leader(self):
for acquired_lock in self.coordinator._acquired_locks:
return acquired_lock.is_still_owner()
return False


class Lock(locking.Lock):
"""Lock with dynamic name.
Expand Down
10 changes: 6 additions & 4 deletions delfin/drivers/fake_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import random
import datetime
import decorator
import math
import six
import random
import time

import decorator
import six
from eventlet import greenthread
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
Expand Down Expand Up @@ -80,7 +82,7 @@ def wait_random(low, high):
def _wait(f, *a, **k):
rd = random.randint(0, 100)
secs = low + (high - low) * rd / 100
time.sleep(secs)
greenthread.sleep(secs)
return f(*a, **k)

return _wait
Expand Down
Empty file.
50 changes: 50 additions & 0 deletions delfin/leader_election/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2021 The SODA Authors.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from delfin.leader_election.tooz.callback import ToozLeaderElectionCallback
from delfin.leader_election.tooz.leader_elector import Elector
from delfin.task_manager.scheduler.schedule_manager import SchedulerManager

LEADER_ELECTION_KEY = "delfin-performance-metric-collection"


class LeaderElectionFactory:

@staticmethod
def construct_elector(plugin, leader_key=None):
"""
Construct leader election elector based on specified plugin
:param string plugin: required plugin for leader election
"""
# maintain a unique key for metric collection leader election
leader_election_key = LEADER_ELECTION_KEY
if leader_key:
leader_election_key = leader_key

scheduler_mgr = SchedulerManager()

if plugin == "tooz":
# create callback object
callback = ToozLeaderElectionCallback.register(
on_leading_callback=scheduler_mgr.start,
on_stop_callback=scheduler_mgr.stop)

return Elector(callback, leader_election_key)
else:
raise ValueError(plugin)
72 changes: 72 additions & 0 deletions delfin/leader_election/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2021 The SODA Authors.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Leader election interface defined"""

import six
import abc


@six.add_metaclass(abc.ABCMeta)
class LeaderCallback:

def __init__(self):
self.on_started_leading_callback = None
"""on_started_leading is called when elected as leader"""

self.on_stopped_leading_callback = None
"""on_stopped_leading is called when Leader give up its leadership"""

@abc.abstractmethod
def on_started_leading(self, *args, **kwargs):
pass

@abc.abstractmethod
def on_stopped_leading(self, *args, **kwargs):
pass

@classmethod
def register(cls, on_leading_callback, on_stop_callback):
callback = cls()
callback.on_started_leading_callback = on_leading_callback
callback.on_stopped_leading_callback = on_stop_callback
return callback


@six.add_metaclass(abc.ABCMeta)
class LeaderElector:

def __init__(self, callbacks, election_key):
self.callbacks = callbacks
self.election_key = election_key

@abc.abstractmethod
def run(self):
"""kick start leader election.
invoke callback.on_started_leading callback once elected as leader
invoke callback.on_stopped_leading callback once lose leadership
run returns once leader losses its leadership
"""
pass

@abc.abstractmethod
def cleanup(self):
"""Cleanup leader election residue
"""
pass
Empty file.
28 changes: 28 additions & 0 deletions delfin/leader_election/tooz/callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021 The SODA Authors.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from delfin.leader_election.interface import LeaderCallback


class ToozLeaderElectionCallback(LeaderCallback):

def on_started_leading(self, *args, **kwargs):
return self.on_started_leading_callback()

def on_stopped_leading(self, *args, **kwargs):
return self.on_stopped_leading_callback()
Loading

0 comments on commit a1ea770

Please sign in to comment.