From a1ea77070b2b40e713b8ad486bf79c1a3f0fb878 Mon Sep 17 00:00:00 2001 From: Amit Roushan Date: Tue, 8 Jun 2021 07:46:12 +0530 Subject: [PATCH] distributed performance metric collection framework --- delfin/cmd/task.py | 4 + delfin/coordination.py | 82 +++++++++++- delfin/drivers/fake_storage/__init__.py | 10 +- delfin/leader_election/__init__.py | 0 delfin/leader_election/factory.py | 50 ++++++++ delfin/leader_election/interface.py | 72 +++++++++++ delfin/leader_election/tooz/__init__.py | 0 delfin/leader_election/tooz/callback.py | 28 ++++ delfin/leader_election/tooz/leader_elector.py | 120 ++++++++++++++++++ delfin/service.py | 72 ++++++++++- .../scheduler/schedule_manager.py | 85 +++++++++---- delfin/task_manager/scheduler/scheduler.py | 28 ---- .../failed_performance_collection_handler.py | 5 +- .../telemetry/failed_telemetry_job.py | 33 ++++- .../schedulers/telemetry/telemetry_job.py | 61 +++++---- .../telemetry/test_failed_telemetry_job.py | 11 +- .../telemetry/test_telemetry_job.py | 8 +- .../task_manager/scheduler/test_scheduler.py | 8 +- requirements.txt | 2 +- 19 files changed, 575 insertions(+), 104 deletions(-) create mode 100644 delfin/leader_election/__init__.py create mode 100644 delfin/leader_election/factory.py create mode 100644 delfin/leader_election/interface.py create mode 100644 delfin/leader_election/tooz/__init__.py create mode 100644 delfin/leader_election/tooz/callback.py create mode 100644 delfin/leader_election/tooz/leader_elector.py delete mode 100644 delfin/task_manager/scheduler/scheduler.py diff --git a/delfin/cmd/task.py b/delfin/cmd/task.py index d878fff81..f0e2f3ca2 100644 --- a/delfin/cmd/task.py +++ b/delfin/cmd/task.py @@ -44,7 +44,11 @@ def main(): task_server = service.TaskService.create(binary='delfin-task', coordination=True) + leader_election = service.LeaderElectionService.create() + service.serve(task_server) + service.serve(leader_election) + service.wait() diff --git a/delfin/coordination.py b/delfin/coordination.py index 4b115e351..3c65e02b0 100644 --- a/delfin/coordination.py +++ b/delfin/coordination.py @@ -45,7 +45,10 @@ help='The backend server for distributed coordination.'), cfg.IntOpt('expiration', default=100, - help='The expiration(in second) of the lock.') + help='The expiration(in second) of the lock.'), + cfg.IntOpt('lease_timeout', + default=15, + help='The expiration(in second) of the lock.'), ] CONF = cfg.CONF @@ -77,6 +80,10 @@ def start(self): # NOTE(gouthamr): Tooz expects member_id as a byte string. member_id = (self.prefix + self.agent_id).encode('ascii') + LOG.info('Started Coordinator (Agent ID: %(agent)s, prefix: ' + '%(prefix)s)', {'agent': self.agent_id, + 'prefix': self.prefix}) + backend_url = _get_redis_backend_url() self.coordinator = coordination.get_coordinator( backend_url, member_id, @@ -112,6 +119,79 @@ def get_lock(self, name): LOCK_COORDINATOR = Coordinator(prefix='delfin-') +class LeaderElectionCoordinator(Coordinator): + + def __init__(self, agent_id=None): + super(LeaderElectionCoordinator, self). \ + __init__(agent_id=agent_id, prefix="leader_election") + self.group = None + + def start(self): + """Connect to coordination back end.""" + if self.started: + return + + # NOTE(gouthamr): Tooz expects member_id as a byte string. + member_id = (self.prefix + "-" + self.agent_id).encode('ascii') + LOG.info('Started Coordinator (Agent ID: %(agent)s, ' + 'prefix: %(prefix)s)', {'agent': self.agent_id, + 'prefix': self.prefix}) + + backend_url = _get_redis_backend_url() + self.coordinator = coordination.get_coordinator( + backend_url, member_id, + timeout=CONF.coordination.lease_timeout) + self.coordinator.start() + self.started = True + + def ensure_group(self, group): + # request groups + req = self.coordinator.get_groups() + groups = req.get() + try: + # check if group exist + groups.index(group) + except Exception: + # create a group if not exist + request = self.coordinator.create_group(group) + request.get() + else: + LOG.info("Leader group already exist") + + self.group = group + + def join_group(self): + # Join a group + if self.group: + request = self.coordinator.join_group(self.group) + request.get() + + def register_on_start_leading_callback(self, callback): + return self.coordinator.watch_elected_as_leader(self.group, callback) + + def start_heartbeat(self): + return self.coordinator.heartbeat() + + def start_leader_watch(self): + return self.coordinator.run_watchers() + + def stop(self): + """Disconnect from coordination back end.""" + if self.started: + # stop coordinator + self.coordinator.stop() + self.coordinator = None + self.started = False + + LOG.info('Stopped Coordinator (Agent ID: %(agent)s', + {'agent': self.agent_id}) + + def is_still_leader(self): + for acquired_lock in self.coordinator._acquired_locks: + return acquired_lock.is_still_owner() + return False + + class Lock(locking.Lock): """Lock with dynamic name. diff --git a/delfin/drivers/fake_storage/__init__.py b/delfin/drivers/fake_storage/__init__.py index e5165e3cb..5962178f7 100644 --- a/delfin/drivers/fake_storage/__init__.py +++ b/delfin/drivers/fake_storage/__init__.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import random import datetime -import decorator import math -import six +import random import time + +import decorator +import six +from eventlet import greenthread from oslo_config import cfg from oslo_log import log from oslo_utils import uuidutils @@ -80,7 +82,7 @@ def wait_random(low, high): def _wait(f, *a, **k): rd = random.randint(0, 100) secs = low + (high - low) * rd / 100 - time.sleep(secs) + greenthread.sleep(secs) return f(*a, **k) return _wait diff --git a/delfin/leader_election/__init__.py b/delfin/leader_election/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/leader_election/factory.py b/delfin/leader_election/factory.py new file mode 100644 index 000000000..f1b7c3fbc --- /dev/null +++ b/delfin/leader_election/factory.py @@ -0,0 +1,50 @@ +# Copyright 2021 The SODA Authors. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from delfin.leader_election.tooz.callback import ToozLeaderElectionCallback +from delfin.leader_election.tooz.leader_elector import Elector +from delfin.task_manager.scheduler.schedule_manager import SchedulerManager + +LEADER_ELECTION_KEY = "delfin-performance-metric-collection" + + +class LeaderElectionFactory: + + @staticmethod + def construct_elector(plugin, leader_key=None): + """ + Construct leader election elector based on specified plugin + + :param string plugin: required plugin for leader election + """ + # maintain a unique key for metric collection leader election + leader_election_key = LEADER_ELECTION_KEY + if leader_key: + leader_election_key = leader_key + + scheduler_mgr = SchedulerManager() + + if plugin == "tooz": + # create callback object + callback = ToozLeaderElectionCallback.register( + on_leading_callback=scheduler_mgr.start, + on_stop_callback=scheduler_mgr.stop) + + return Elector(callback, leader_election_key) + else: + raise ValueError(plugin) diff --git a/delfin/leader_election/interface.py b/delfin/leader_election/interface.py new file mode 100644 index 000000000..6aad79828 --- /dev/null +++ b/delfin/leader_election/interface.py @@ -0,0 +1,72 @@ +# Copyright 2021 The SODA Authors. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Leader election interface defined""" + +import six +import abc + + +@six.add_metaclass(abc.ABCMeta) +class LeaderCallback: + + def __init__(self): + self.on_started_leading_callback = None + """on_started_leading is called when elected as leader""" + + self.on_stopped_leading_callback = None + """on_stopped_leading is called when Leader give up its leadership""" + + @abc.abstractmethod + def on_started_leading(self, *args, **kwargs): + pass + + @abc.abstractmethod + def on_stopped_leading(self, *args, **kwargs): + pass + + @classmethod + def register(cls, on_leading_callback, on_stop_callback): + callback = cls() + callback.on_started_leading_callback = on_leading_callback + callback.on_stopped_leading_callback = on_stop_callback + return callback + + +@six.add_metaclass(abc.ABCMeta) +class LeaderElector: + + def __init__(self, callbacks, election_key): + self.callbacks = callbacks + self.election_key = election_key + + @abc.abstractmethod + def run(self): + """kick start leader election. + invoke callback.on_started_leading callback once elected as leader + invoke callback.on_stopped_leading callback once lose leadership + + run returns once leader losses its leadership + """ + pass + + @abc.abstractmethod + def cleanup(self): + """Cleanup leader election residue + """ + pass diff --git a/delfin/leader_election/tooz/__init__.py b/delfin/leader_election/tooz/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/leader_election/tooz/callback.py b/delfin/leader_election/tooz/callback.py new file mode 100644 index 000000000..2016cb3f2 --- /dev/null +++ b/delfin/leader_election/tooz/callback.py @@ -0,0 +1,28 @@ +# Copyright 2021 The SODA Authors. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from delfin.leader_election.interface import LeaderCallback + + +class ToozLeaderElectionCallback(LeaderCallback): + + def on_started_leading(self, *args, **kwargs): + return self.on_started_leading_callback() + + def on_stopped_leading(self, *args, **kwargs): + return self.on_stopped_leading_callback() diff --git a/delfin/leader_election/tooz/leader_elector.py b/delfin/leader_election/tooz/leader_elector.py new file mode 100644 index 000000000..b5335c9dc --- /dev/null +++ b/delfin/leader_election/tooz/leader_elector.py @@ -0,0 +1,120 @@ +# Copyright 2021 The SODA Authors. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Leader elector is leased based leader election""" + +import threading + +from oslo_log import log +from oslo_utils import timeutils + +from delfin.coordination import LeaderElectionCoordinator +from delfin.leader_election.interface import LeaderElector + +LOG = log.getLogger(__name__) + + +class Elector(LeaderElector): + + def __init__(self, callbacks, leader_election_key): + key = leader_election_key.encode('ascii') + super(Elector, self).__init__(callbacks, key) + + self._coordinator = None + self.leader = False + self._stop = threading.Event() + self._runner = None + self._beats = 0 + + def run(self): + if self._coordinator: + return + + self._stop.clear() + self._beats = 0 + + self._coordinator = LeaderElectionCoordinator() + # start leader coordinator + self._coordinator.start() + + self._coordinator.ensure_group(self.election_key) + # join coordinator + self._coordinator.join_group() + + # register callback for elected leader + self._coordinator. \ + register_on_start_leading_callback(self. + callbacks.on_started_leading) + + # register internal callback to notify being a leader + self._coordinator. \ + register_on_start_leading_callback(self.set_leader_callback) + + while not self._stop.is_set(): + with timeutils.StopWatch() as w: + LOG.debug("sending heartbeats for leader election") + wait_until_next_beat = self._coordinator.start_heartbeat() + + ran_for = w.elapsed() + has_to_sleep_for = wait_until_next_beat - ran_for + if has_to_sleep_for < 0: + LOG.warning( + "Heartbeating took too long to execute (it ran for" + " %0.2f seconds which is %0.2f seconds longer than" + " the next heartbeat idle time). This may cause" + " timeouts (in locks, leadership, ...) to" + " happen (which will not end well).", ran_for, + ran_for - wait_until_next_beat) + self._beats += 1 + + # check if coordinator is still a leader + if self.leader and not self._coordinator.is_still_leader(): + self.on_stopped_leading() + self.leader = False + return + self._coordinator.start_leader_watch() + + if self.leader: + # adjust time for leader + has_to_sleep_for = has_to_sleep_for / 2 + + LOG.debug('resting after leader watch as leader=%(leader)s ' + 'for heartbeat timeout of %(timeout)s sec', + {'timeout': has_to_sleep_for, 'leader': self.leader}) + + self._stop.wait(has_to_sleep_for) + + def set_leader_callback(self, *args, **kwargs): + self.leader = True + + def cleanup(self): + # stop heartbeat + self._stop.set() + + if self._coordinator: + # stop coordinator + self._coordinator.stop() + self._coordinator = None + + # leadership cleanup + if self.leader: + self.on_stopped_leading() + self.leader = False + + def on_stopped_leading(self): + self.callbacks.on_stopped_leading() diff --git a/delfin/service.py b/delfin/service.py index 9b321aa7f..6ad6b6f19 100644 --- a/delfin/service.py +++ b/delfin/service.py @@ -33,7 +33,7 @@ from delfin import context from delfin import coordination from delfin import rpc -from delfin.task_manager.scheduler import schedule_manager +from delfin.leader_election.factory import LeaderElectionFactory LOG = log.getLogger(__name__) @@ -69,6 +69,10 @@ cfg.PortOpt('trap_receiver_port', default=162, help='Port at which trap receiver listens.'), + cfg.StrOpt('leader_election_plugin', + default="tooz", + help='Supported plugin for leader election. Options: ' + 'tooz(Default)'), ] CONF = cfg.CONF @@ -269,7 +273,62 @@ def create(cls, host=None, binary=None, topic=None, def start(self): super(TaskService, self).start() - schedule_manager.SchedulerManager().start() + + +class LeaderElectionService(service.Service): + """Leader election service for distributed system + + The service takes callback functions and leader election unique + key to synchronize leaders in distributed environment + """ + + def __init__(self, leader_elector, *args, **kwargs): + super(LeaderElectionService, self).__init__() + self.leader_elector = leader_elector + + def start(self): + """Start leader election service + """ + while True: + # start/restart participating in leader election + self.leader_elector.run() + + # cleanup and again start participating for leadership + self.leader_elector.cleanup() + + def __getattr__(self, key): + leader = self.__dict__.get('leader', None) + return getattr(leader, key) + + @classmethod + def create(cls, *args, **kwargs): + """Instantiates class and passes back application object. + """ + leader_elector = LeaderElectionFactory.construct_elector( + CONF.leader_election_plugin) + + service_obj = cls(leader_elector, *args, **kwargs) + + return service_obj + + def kill(self): + """Destroy the service object in the datastore.""" + self.stop() + + def stop(self, graceful=False): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + # cleanup when losses the leadership + if self.leader_elector: + self.leader_elector.cleanup() + except Exception: + pass + + super(LeaderElectionService, self).stop(graceful) + + def wait(self): + pass class WSGIService(service.ServiceBase): @@ -396,10 +455,11 @@ def process_launcher(): def serve(server, workers=None): global _launcher - if _launcher: - raise RuntimeError('serve() can only be called once') - _launcher = service.launch(CONF, server, workers=workers, - restart_method='mutate') + if not _launcher: + _launcher = service.Launcher(CONF, restart_method='mutate') + # raise RuntimeError('serve() can only be called once') + + _launcher.launch_service(server, workers=workers) def wait(): diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index bc0a92dfd..dcfbded10 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -15,38 +15,79 @@ from datetime import datetime import six +from apscheduler.schedulers.background import BackgroundScheduler from oslo_log import log +from oslo_utils import importutils from oslo_utils import uuidutils from delfin import context -from delfin.common.constants import TelemetryCollection -from delfin.task_manager.scheduler import scheduler -from delfin.task_manager.scheduler.schedulers.telemetry import telemetry_job +from delfin import utils +from delfin.task_manager.scheduler.schedulers.telemetry.failed_telemetry_job \ + import FailedTelemetryJob +from delfin.task_manager.scheduler.schedulers.telemetry.telemetry_job import \ + TelemetryJob LOG = log.getLogger(__name__) +SCHEDULER_BOOT_JOBS = [ + TelemetryJob.__module__ + '.' + TelemetryJob.__name__, + FailedTelemetryJob.__module__ + '.' + FailedTelemetryJob.__name__ +] + +@six.add_metaclass(utils.Singleton) class SchedulerManager(object): - def __init__(self): - self.schedule_instance = scheduler.Scheduler.get_instance() + def __init__(self, scheduler=None): + if not scheduler: + scheduler = BackgroundScheduler() + self.scheduler = scheduler + self.scheduler_started = False + + self.boot_jobs = dict() + self.boot_jobs_scheduled = False + self.ctx = context.get_admin_context() def start(self): """ Initialise the schedulers for periodic job creation """ - ctxt = context.get_admin_context() - try: - - # Create a jobs for periodic scheduling - periodic_scheduler_job_id = uuidutils.generate_uuid() - self.schedule_instance.add_job( - telemetry_job.TelemetryJob(ctxt), 'interval', args=[ctxt], - seconds=TelemetryCollection.PERIODIC_JOB_INTERVAL, - next_run_time=datetime.now(), - id=periodic_scheduler_job_id) - except Exception as e: - # TODO: Currently failure of scheduler is failing task manager - # start flow, it is logged and ignored. - LOG.error("Failed to initialize periodic tasks, reason: %s.", - six.text_type(e)) - else: - self.schedule_instance.start() + if not self.scheduler_started: + self.scheduler.start() + self.scheduler_started = True + + if not self.boot_jobs_scheduled: + try: + for job in SCHEDULER_BOOT_JOBS: + job_class = importutils.import_class(job) + job_instance = job_class(self.ctx) + + # Create a jobs for periodic scheduling + job_id = uuidutils.generate_uuid() + self.scheduler.add_job(job_instance, 'interval', + seconds=job_class.job_interval(), + next_run_time=datetime.now(), + id=job_id) + # book keeping of jobs + self.boot_jobs[job_id] = job_instance + + except Exception as e: + # TODO: Currently failure of scheduler is failing task manager + # start flow, it is logged and ignored. + LOG.error("Failed to initialize periodic tasks, reason: %s.", + six.text_type(e)) + raise e + + def stop(self): + """Cleanup periodic jobs""" + + for job_id, job in self.boot_jobs.items(): + self.scheduler.remove_job(job_id) + job.stop() + self.boot_jobs.clear() + self.boot_jobs_scheduled = False + + # if self.scheduler_started: + # self.scheduler.shutdown() + # self.scheduler_started = False + + def get_scheduler(self): + return self.scheduler diff --git a/delfin/task_manager/scheduler/scheduler.py b/delfin/task_manager/scheduler/scheduler.py deleted file mode 100644 index 9a00046dc..000000000 --- a/delfin/task_manager/scheduler/scheduler.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import six -from apscheduler.schedulers.background import BackgroundScheduler - -from delfin import utils - - -@six.add_metaclass(utils.Singleton) -class Scheduler: - @staticmethod - def get_instance(): - return Scheduler().background_scheduler - - def __init__(self): - self.background_scheduler = BackgroundScheduler() diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py index 9dee9e1a7..86b1e5793 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py @@ -23,7 +23,7 @@ from delfin.db.sqlalchemy.models import Task from delfin.i18n import _ from delfin.task_manager import rpcapi as task_rpcapi -from delfin.task_manager.scheduler import scheduler +from delfin.task_manager.scheduler import schedule_manager from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask LOG = log.getLogger(__name__) @@ -42,7 +42,8 @@ def __init__(self, ctx, failed_task_id, storage_id, args, job_id, self.start_time = start_time self.end_time = end_time self.task_rpcapi = task_rpcapi.TaskAPI() - self.scheduler_instance = scheduler.Scheduler.get_instance() + self.scheduler_instance = \ + schedule_manager.SchedulerManager().get_scheduler() self.result = TelemetryJobStatus.FAILED_JOB_STATUS_INIT @staticmethod diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py b/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py index 6291d2b17..0bfb56179 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py @@ -20,26 +20,30 @@ from oslo_utils import uuidutils from delfin import db -from delfin import utils from delfin.common.constants import TelemetryJobStatus, TelemetryCollection from delfin.db.sqlalchemy.models import FailedTask from delfin.exception import TaskNotFound -from delfin.task_manager.scheduler import scheduler +from delfin.task_manager.scheduler import schedule_manager LOG = log.getLogger(__name__) -@six.add_metaclass(utils.Singleton) class FailedTelemetryJob(object): def __init__(self, ctx): # create the object of periodic scheduler - self.scheduler = scheduler.Scheduler.get_instance() + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() self.ctx = ctx + self.stopped = False + self.job_ids = set() def __call__(self): """ :return: """ + + if self.stopped: + return + try: # Remove jobs from scheduler when marked for delete filters = {'deleted': True} @@ -48,8 +52,7 @@ def __call__(self): "in this cycle:%s" % len(failed_tasks)) for failed_task in failed_tasks: job_id = failed_task['job_id'] - if job_id and self.scheduler.get_job(job_id): - self.scheduler.remove_job(job_id) + self.remove_scheduled_job(job_id) db.failed_task_delete(self.ctx, failed_task['id']) except Exception as e: LOG.error("Failed to remove periodic scheduling job , reason: %s.", @@ -112,6 +115,7 @@ def __call__(self): instance, 'interval', seconds=failed_task[FailedTask.interval.name], next_run_time=datetime.now(), id=job_id) + self.job_ids.add(job_id) except Exception as e: LOG.error("Failed to schedule retry tasks for performance " @@ -121,4 +125,19 @@ def __call__(self): def _teardown_task(self, ctx, failed_task_id, job_id): db.failed_task_delete(ctx, failed_task_id) - self.scheduler.remove_job(job_id) + self.remove_scheduled_job(job_id) + + def remove_scheduled_job(self, job_id): + if job_id in self.job_ids: + self.job_ids.remove(job_id) + if job_id and self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + def stop(self): + self.stopped = True + for job_id in self.job_ids.copy(): + self.remove_scheduled_job(job_id) + + @classmethod + def job_interval(cls): + return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py b/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py index 448a13822..d64a49c33 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py @@ -21,44 +21,50 @@ from delfin import db from delfin.common.constants import TelemetryCollection -from delfin.task_manager.scheduler import scheduler -from delfin.task_manager.scheduler.schedulers.telemetry.failed_telemetry_job \ - import FailedTelemetryJob +from delfin.task_manager.scheduler import schedule_manager LOG = log.getLogger(__name__) class TelemetryJob(object): - def __init__(self, ctxt): - self.scheduler = scheduler.Scheduler.get_instance() + def __init__(self, ctx): + self.ctx = ctx + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() # Reset last run time of tasks to restart scheduling and # start the failed task job - task_list = db.task_get_all(ctxt) + task_list = db.task_get_all(ctx) for task in task_list: - db.task_update(ctxt, task['id'], {'last_run_time': None}) - self._schedule_failed_telemetry_job_handler(ctxt) + db.task_update(ctx, task['id'], {'last_run_time': None}) - def __call__(self, ctx): + self.stopped = False + self.job_ids = set() + + def __call__(self): """ Schedule the collection tasks based on interval """ + + if self.stopped: + """If Job is stopped return immediately""" + return + try: # Remove jobs from scheduler when marked for delete filters = {'deleted': True} - tasks = db.task_get_all(ctx, filters=filters) + tasks = db.task_get_all(self.ctx, filters=filters) LOG.debug("Total tasks found deleted " "in this cycle:%s" % len(tasks)) for task in tasks: job_id = task['job_id'] if job_id and self.scheduler.get_job(job_id): - self.scheduler.remove_job(job_id) - db.task_delete(ctx, task['id']) + self.remove_scheduled_job(job_id) + db.task_delete(self.ctx, task['id']) except Exception as e: LOG.error("Failed to remove periodic scheduling job , reason: %s.", six.text_type(e)) try: filters = {'last_run_time': None} - tasks = db.task_get_all(ctx, filters=filters) + tasks = db.task_get_all(self.ctx, filters=filters) LOG.debug("Schedule performance collection triggered: total " "tasks to be handled:%s" % len(tasks)) for task in tasks: @@ -74,14 +80,17 @@ def __call__(self, ctx): .strftime('%Y-%m-%d %H:%M:%S') collection_class = importutils.import_class(task['method']) - instance = collection_class.get_instance(ctx, task_id) + instance = collection_class.get_instance(self.ctx, task_id) self.scheduler.add_job( instance, 'interval', seconds=task['interval'], next_run_time=next_collection_time, id=job_id) + # jobs book keeping + self.job_ids.add(job_id) + update_task_dict = {'job_id': job_id, 'last_run_time': last_run_time} - db.task_update(ctx, task_id, update_task_dict) + db.task_update(self.ctx, task_id, update_task_dict) LOG.info('Periodic collection task triggered for for task id: ' '%s ' % task['id']) except Exception as e: @@ -90,10 +99,18 @@ def __call__(self, ctx): else: LOG.debug("Periodic collection task Scheduling completed.") - def _schedule_failed_telemetry_job_handler(self, ctx): - periodic_scheduler_job_id = uuidutils.generate_uuid() - self.scheduler.add_job( - FailedTelemetryJob(ctx), 'interval', - seconds=TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL, - next_run_time=datetime.now(), - id=periodic_scheduler_job_id) + def stop(self): + self.stopped = True + for job_id in self.job_ids.copy(): + self.remove_scheduled_job(job_id) + LOG.info("Stopping telemetry jobs") + + @classmethod + def job_interval(cls): + return TelemetryCollection.PERIODIC_JOB_INTERVAL + + def remove_scheduled_job(self, job_id): + if job_id in self.job_ids: + self.job_ids.remove(job_id) + if job_id and self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py index 6b45d6fd2..3be557dc6 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py @@ -33,7 +33,7 @@ FailedTask.id.name: 43, FailedTask.retry_count.name: 0, FailedTask.result.name: "Init", - FailedTask.job_id.name: None, + FailedTask.job_id.name: "fake_job_id", FailedTask.task_id.name: uuidutils.generate_uuid(), FailedTask.method.name: FailedPerformanceCollectionHandler.__module__ + '.' + @@ -75,10 +75,13 @@ def test_failed_job_scheduling(self, mock_add_job): @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.remove_job') + @mock.patch( + 'apscheduler.schedulers.background.BackgroundScheduler.get_job') @mock.patch.object(db, 'failed_task_delete') @mock.patch.object(db, 'failed_task_get_all') def test_failed_job_with_max_retry(self, mock_failed_get_all, mock_failed_task_delete, + mock_get_job, mock_remove_job): # configure to return entry with max retry count failed_jobs = fake_failed_jobs.copy() @@ -90,9 +93,11 @@ def test_failed_job_with_max_retry(self, mock_failed_get_all, # call failed job scheduling failed_job() + mock_get_job.return_value = True + # entry get deleted and job get removed self.assertEqual(mock_failed_task_delete.call_count, 2) - self.assertEqual(mock_remove_job.call_count, 1) + self.assertEqual(mock_remove_job.call_count, 2) @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.get_job') @@ -134,4 +139,4 @@ def test_failed_job_scheduling_with_no_task(self, mock_failed_get_all, # entry get deleted and job get removed self.assertEqual(mock_failed_task_delete.call_count, 2) - self.assertEqual(mock_remove_job.call_count, 1) + self.assertEqual(mock_remove_job.call_count, 0) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py index b002c8101..54e6600e2 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py @@ -79,8 +79,8 @@ def test_telemetry_job_scheduling(self, mock_add_job): ctx = context.get_admin_context() telemetry_job = TelemetryJob(ctx) # call telemetry job scheduling - telemetry_job(ctx) - self.assertEqual(mock_add_job.call_count, 2) + telemetry_job() + self.assertEqual(mock_add_job.call_count, 1) @mock.patch.object(db, 'task_get_all', mock.Mock(return_value=Incorrect_telemetry_jobs)) @@ -96,7 +96,7 @@ def test_telemetry_job_scheduling_exception(self, mock_log_error): ctx = context.get_admin_context() telemetry_job = TelemetryJob(ctx) # call telemetry job scheduling - telemetry_job(ctx) + telemetry_job() self.assertEqual(mock_log_error.call_count, 2) @mock.patch.object(db, 'task_delete', @@ -115,5 +115,5 @@ def test_telemetry_removal_success(self, mock_log_error): ctx = context.get_admin_context() telemetry_job = TelemetryJob(ctx) # call telemetry job scheduling - telemetry_job(ctx) + telemetry_job() self.assertEqual(mock_log_error.call_count, 1) diff --git a/delfin/tests/unit/task_manager/scheduler/test_scheduler.py b/delfin/tests/unit/task_manager/scheduler/test_scheduler.py index 394e005ba..0460e67d9 100644 --- a/delfin/tests/unit/task_manager/scheduler/test_scheduler.py +++ b/delfin/tests/unit/task_manager/scheduler/test_scheduler.py @@ -15,16 +15,16 @@ from apscheduler.schedulers.background import BackgroundScheduler from delfin import test -from delfin.task_manager.scheduler import scheduler +from delfin.task_manager.scheduler import schedule_manager class TestScheduler(test.TestCase): - def test_scheduler_singleton(self): - first_instance = scheduler.Scheduler.get_instance() + def test_scheduler_manager_singleton(self): + first_instance = schedule_manager.SchedulerManager().get_scheduler() self.assertIsInstance(first_instance, BackgroundScheduler) - second_instance = scheduler.Scheduler.get_instance() + second_instance = schedule_manager.SchedulerManager().get_scheduler() self.assertIsInstance(second_instance, BackgroundScheduler) self.assertEqual(first_instance, second_instance) diff --git a/requirements.txt b/requirements.txt index 96a3f5aed..528895108 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,7 +30,7 @@ Routes>=2.3.1 # MIT six>=1.10.0 # MIT SQLAlchemy>=1.3.0 # MIT stevedore>=1.20.0 # Apache-2.0 -tooz>=1.58.0 # Apache-2.0 +tooz==2.8.0 # Apache-2.0 WebOb>=1.7.1 # MIT pysnmp>=4.4.11 # BSD redis>=3.3.8 # MIT