From a8f77057a9b91ee1bff2ea00ebd71a74f5ad8c18 Mon Sep 17 00:00:00 2001 From: Hypo Date: Mon, 6 Apr 2020 01:14:41 +0800 Subject: [PATCH 1/8] init celery worker closes #778 --- .../expr => deploy/guacamole}/guacctl.sh | 0 open-hackathon-server/requirements.txt | 5 + .../src/hackathon/docker/__init__.py | 4 - .../hackathon/docker/docker_formation_base.py | 28 -- .../src/hackathon/docker/hosted_docker.py | 249 ------------------ .../src/hackathon/expr/docker_expr_starter.py | 106 -------- .../src/hackathon/worker/__init__.py | 41 +++ .../src/hackathon/worker/expr_tasks.py | 36 +++ 8 files changed, 82 insertions(+), 387 deletions(-) rename {open-hackathon-server/src/hackathon/expr => deploy/guacamole}/guacctl.sh (100%) delete mode 100644 open-hackathon-server/src/hackathon/docker/__init__.py delete mode 100644 open-hackathon-server/src/hackathon/docker/docker_formation_base.py delete mode 100644 open-hackathon-server/src/hackathon/docker/hosted_docker.py delete mode 100644 open-hackathon-server/src/hackathon/expr/docker_expr_starter.py create mode 100644 open-hackathon-server/src/hackathon/worker/__init__.py create mode 100644 open-hackathon-server/src/hackathon/worker/expr_tasks.py diff --git a/open-hackathon-server/src/hackathon/expr/guacctl.sh b/deploy/guacamole/guacctl.sh similarity index 100% rename from open-hackathon-server/src/hackathon/expr/guacctl.sh rename to deploy/guacamole/guacctl.sh diff --git a/open-hackathon-server/requirements.txt b/open-hackathon-server/requirements.txt index f36305528..a9ec942f0 100644 --- a/open-hackathon-server/requirements.txt +++ b/open-hackathon-server/requirements.txt @@ -1,3 +1,4 @@ +amqp==2.5.2 aniso8601==8.0.0 APScheduler==3.0.3 attrs==19.3.0 @@ -15,7 +16,9 @@ azure-servicebus==0.20.1 azure-servicemanagement-legacy==0.20.1 azure-storage==0.20.2 Beaker==1.11.0 +billiard==3.6.3.0 cachetools==4.0.0 +celery==4.4.2 certifi==2019.11.28 chardet==3.0.4 click==7.1.1 @@ -30,6 +33,7 @@ idna==2.9 importlib-metadata==1.5.0 itsdangerous==1.1.0 Jinja2==2.11.1 +kombu==4.6.8 kubernetes==9.0.0 lxml==4.3.0 mailthon==0.1.1 @@ -62,6 +66,7 @@ subprocess32==3.5.4 tzlocal==2.0.0 urllib3==1.25.8 validictory==1.0.0 +vine==1.3.0 wcwidth==0.1.8 websocket-client==0.57.0 Werkzeug==0.15.3 diff --git a/open-hackathon-server/src/hackathon/docker/__init__.py b/open-hackathon-server/src/hackathon/docker/__init__.py deleted file mode 100644 index 99d0ef8a4..000000000 --- a/open-hackathon-server/src/hackathon/docker/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" diff --git a/open-hackathon-server/src/hackathon/docker/docker_formation_base.py b/open-hackathon-server/src/hackathon/docker/docker_formation_base.py deleted file mode 100644 index da741bd54..000000000 --- a/open-hackathon-server/src/hackathon/docker/docker_formation_base.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -import abc - - -class DockerFormationBase(object, metaclass=abc.ABCMeta): - @abc.abstractmethod - def start(self, unit, **kwargs): - """start a docker container""" - return - - @abc.abstractmethod - def stop(self, name, **kwargs): - """stop a docker container""" - return - - @abc.abstractmethod - def delete(self, name, **kwargs): - """delete a docker container""" - return - - @abc.abstractmethod - def report_health(self): - """report health status""" - return \ No newline at end of file diff --git a/open-hackathon-server/src/hackathon/docker/hosted_docker.py b/open-hackathon-server/src/hackathon/docker/hosted_docker.py deleted file mode 100644 index 4c5ae23c5..000000000 --- a/open-hackathon-server/src/hackathon/docker/hosted_docker.py +++ /dev/null @@ -1,249 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -import sys - -sys.path.append("..") - -from threading import Lock -import json -import requests -from datetime import timedelta - -from hackathon import RequiredFeature, Component, Context -from hackathon.hmongo.models import DockerContainer, DockerHostServer -from hackathon.constants import HEALTH, HEALTH_STATUS, HACKATHON_CONFIG, CLOUD_PROVIDER - -import collections -def flatten(x): - result = [] - for el in x: - if isinstance(x, collections.Iterable) and not isinstance(el, str): - result.extend(flatten(el)) - else: - result.append(el) - return result - -class HostedDockerFormation(Component): - hackathon_template_manager = RequiredFeature("hackathon_template_manager") - hackathon_manager = RequiredFeature("hackathon_manager") - expr_manager = RequiredFeature("expr_manager") - """ - Docker resource management based on docker remote api v1.18 - Host resource are required. - """ - application_json = {'content-type': 'application/json'} - docker_host_manager = RequiredFeature("docker_host_manager") - - def __init__(self): - self.lock = Lock() - - def report_health(self): - """Report health of DockerHostServers - - :rtype: dict - :return health status item of docker. OK when all servers running, Warning if some of them working, - Error if no server running - """ - try: - # TODO skip hackathons that are offline or ended - hosts = self.db.find_all_objects(DockerHostServer) - alive = 0 - for host in hosts: - if self.ping(host): - alive += 1 - if alive == len(hosts): - return { - HEALTH.STATUS: HEALTH_STATUS.OK - } - elif alive > 0: - return { - HEALTH.STATUS: HEALTH_STATUS.WARNING, - HEALTH.DESCRIPTION: 'at least one docker host servers are down' - } - else: - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: 'all docker host servers are down' - } - except Exception as e: - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: e.message - } - - def create_container(self, docker_host, container_config, container_name): - """ - only create a container, in this step, we cannot start a container. - :param docker_host: - :param container_config: - :param container_name: - :return: - """ - containers_url = '%s/containers/create?name=%s' % (self.__get_vm_url(docker_host), container_name) - req = requests.post(containers_url, data=json.dumps(container_config), headers=self.application_json) - self.log.debug(req.content) - # todo check the http code first - container = json.loads(req.content) - if container is None: - raise AssertionError("container is none") - return container - - def start_container(self, docker_host, container_id): - """ - start a container - :param docker_host: - :param container_id: - :return: - """ - url = '%s/containers/%s/start' % (self.__get_vm_url(docker_host), container_id) - req = requests.post(url, headers=self.application_json) - self.log.debug(req.content) - - def stop_container(self, host_server, container_name): - """ - delete a container - :param name: - :param docker_host: - :return: - """ - containers_url = '%s/containers/%s?force=1' % (self.__get_vm_url(host_server), container_name) - req = requests.delete(containers_url) - return req - - def pull_image(self, context): - # todo fix pull_image? - docker_host_id, image_name, tag = context.docker_host, context.image_name, context.tag - docker_host = self.db.find_first_object_by(DockerHostServer, id=docker_host_id) - if not docker_host: - return - pull_image_url = self.__get_vm_url(docker_host) + "/images/create?fromImage=" + image_name + '&tag=' + tag - self.log.debug(" send request to pull image:" + pull_image_url) - return requests.post(pull_image_url) - - def get_pulled_images(self, docker_host): - get_images_url = self.__get_vm_url(docker_host) + "/images/json?all=0" - current_images_info = json.loads(requests.get(get_images_url).content) # [{},{},{}] - current_images_tags = [x['RepoTags'] for x in current_images_info] # [[],[],[]] - return flatten(current_images_tags) # [ imange:tag, image:tag ] - - def ensure_images(self): - hackathons = self.hackathon_manager.get_online_hackathons() - list(map(lambda h: self.__ensure_images_for_hackathon(h), hackathons)) - - def is_container_running(self, docker_container): - """check container's running status on docker host - - if status is Running or Restarting returns True , else returns False - - :type docker_container: DockerContainer - :param docker_container: the container that you want to check - - :type: bool - :return True: the container running status is running or restarting , else returns False - - """ - docker_host = docker_container.host_server - if docker_host: - container_info = self.__get_container_info_by_container_id(docker_host, docker_container.container_id) - if container_info is None: - return False - return container_info['State']['Running'] or container_info['State']['Restarting'] - else: - return False - - def ping(self, docker_host, timeout=20): - """Ping docker host to check running status - - :type docker_host : DockerHostServer - :param docker_host: the hots that you want to check docker service running status - - :type: bool - :return: True: running status is OK, else return False - - """ - try: - ping_url = '%s/_ping' % self.__get_vm_url(docker_host) - req = requests.get(ping_url, timeout=timeout) - return req.status_code == 200 and req.content == 'OK' - except Exception as e: - self.log.error(e) - return False - - def get_containers_detail_by_ve(self, virtual_environment): - """Get all containers' detail from "Database" filtered by related virtual_environment - - :rtype: dict - :return: get the info of all containers - - """ - container = virtual_environment.docker_container - if container: - return self.util.make_serializable(container.to_mongo().to_dict()) - return {} - - def list_containers(self, docker_host, timeout=20): - """ - return: json(as list form) through "Docker restful API" - """ - containers_url = '%s/containers/json' % self.__get_vm_url(docker_host) - req = requests.get(containers_url, timeout=timeout) - self.log.debug(req.content) - return self.util.convert(json.loads(req.content)) - - def get_container_by_name(self, container_name, docker_host): - containers = self.list_containers(docker_host) - return next((c for c in containers if container_name in c["Names"] or '/' + container_name in c["Names"]), None) - - # --------------------------------------------- helper function ---------------------------------------------# - - def __get_vm_url(self, docker_host): - return 'http://%s:%d' % (docker_host.public_dns, docker_host.public_docker_api_port) - - def __get_schedule_job_id(self, hackathon): - return "pull_images_for_hackathon_%s" % hackathon.id - - def __ensure_images_for_hackathon(self, hackathon): - job_id = self.__get_schedule_job_id(hackathon) - job_exist = self.scheduler.has_job(job_id) - if hackathon.event_end_time < self.util.get_now(): - if job_exist: - self.scheduler.remove_job(job_id) - return - else: - if job_exist: - self.log.debug("job %s existed" % job_id) - else: - self.log.debug( - "adding schedule job to ensure images for hackathon %s" % hackathon.name) - next_run_time = self.util.get_now() + timedelta(seconds=3) - context = Context(hackathon_id=hackathon.id) - self.scheduler.add_interval(feature="hackathon_template_manager", - method="pull_images_for_hackathon", - id=job_id, - context=context, - next_run_time=next_run_time, - minutes=60) - - def __get_container_info_by_container_id(self, docker_host, container_id): - """get a container info by container_id from a docker host - - :param: the docker host which you want to search container from - - :type container_id: str|unicode - :param as a parameter that you want to search container though docker remote API - - :return dic object of the container info if not None - """ - try: - get_container_url = self.__get_vm_url(docker_host) + "/containers/%s/json?all=0" % container_id - req = requests.get(get_container_url) - if 300 > req.status_code >= 200: - container_info = json.loads(req.content) - return container_info - return None - except Exception as ex: - self.log.error(ex) - return None diff --git a/open-hackathon-server/src/hackathon/expr/docker_expr_starter.py b/open-hackathon-server/src/hackathon/expr/docker_expr_starter.py deleted file mode 100644 index 46b7fbcb3..000000000 --- a/open-hackathon-server/src/hackathon/expr/docker_expr_starter.py +++ /dev/null @@ -1,106 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - - -import sys - -sys.path.append("..") - -import random -import string -import pexpect -from os.path import abspath, dirname, realpath - -from hackathon.hmongo.models import Experiment, VirtualEnvironment -from hackathon.constants import VE_PROVIDER, VEStatus, VERemoteProvider, EStatus -from hackathon.expr.expr_starter import ExprStarter - - -class DockerExprStarter(ExprStarter): - def _internal_rollback(self, context): - # currently rollback share the same process as stop - self._internal_stop_expr(context) - - def _stop_virtual_environment(self, virtual_environment, experiment, context): - pass - - def _internal_start_expr(self, context): - for unit in context.template_content.units: - try: - self.__start_virtual_environment(context, unit) - except Exception as e: - self.log.error(e) - self._on_virtual_environment_failed(context) - - def _internal_start_virtual_environment(self, context): - raise NotImplementedError() - - def _get_docker_proxy(self): - raise NotImplementedError() - - def _internal_stop_expr(self, context): - expr = Experiment.objects(id=context.experiment_id).first() - if not expr: - return - - if len(expr.virtual_environments) == 0: - expr.status = EStatus.ROLL_BACKED - expr.save() - return - - # delete containers and change expr status - for ve in expr.virtual_environments: - context = context.copy() # create new context for every virtual_environment - context.virtual_environment_name = ve.name - self._stop_virtual_environment(ve, expr, context) - - def __start_virtual_environment(self, context, docker_template_unit): - origin_name = docker_template_unit.get_name() - prefix = str(context.experiment_id)[0:9] - suffix = "".join(random.sample(string.ascii_letters + string.digits, 8)) - new_name = '%s-%s-%s' % (prefix, origin_name, suffix.lower()) - docker_template_unit.set_name(new_name) - self.log.debug("starting to start container: %s" % new_name) - - # db document for VirtualEnvironment - ve = VirtualEnvironment(provider=VE_PROVIDER.DOCKER, - name=new_name, - image=docker_template_unit.get_image_with_tag(), - status=VEStatus.INIT, - remote_provider=VERemoteProvider.Guacamole) - # create a new context for current ve only - context = context.copy() - experiment = Experiment.objects(id=context.experiment_id).no_dereference().only("virtual_environments").first() - experiment.virtual_environments.append(ve) - experiment.save() - - # start container remotely , use hosted docker - context.virtual_environment_name = ve.name - context.unit = docker_template_unit - self._internal_start_virtual_environment(context) - - def _enable_guacd_file_transfer(self, context): - """ - This function should be invoked after container is started in hosted_docker.py - :param ve: virtual environment - """ - expr = Experiment.objects(id=context.experiment_id).no_dereference().first() - virtual_env = expr.virtual_environments.get(name=context.virtual_environment_name) - remote = virtual_env.remote_paras - - p = pexpect.spawn("scp -P %s %s %s@%s:/usr/local/sbin/guacctl" % - (remote["port"], - abspath("%s/../expr/guacctl" % dirname(realpath(__file__))), - remote["username"], - remote["hostname"])) - i = p.expect([pexpect.TIMEOUT, 'yes/no', 'password: ']) - if i == 1: - p.sendline("yes") - i = p.expect([pexpect.TIMEOUT, 'password:']) - - if i != 0: - p.sendline(remote["password"]) - p.expect(pexpect.EOF) - p.close() diff --git a/open-hackathon-server/src/hackathon/worker/__init__.py b/open-hackathon-server/src/hackathon/worker/__init__.py new file mode 100644 index 000000000..5a1d71b42 --- /dev/null +++ b/open-hackathon-server/src/hackathon/worker/__init__.py @@ -0,0 +1,41 @@ +import os +import logging +from celery import Celery, Task + +from hackathon.util import safe_get_config + +LOG = logging.getLogger(__name__) + +celery_always_eager = os.getenv("DEBUG", "false") == "true" +LOG.debug("Create celery app and celery_always_eager is {}".format( + celery_always_eager)) + + +class OhpTask(Task): + def run(self, *args, **kwargs): + pass + + def on_failure(self, exc, task_id, args, kwargs, einfo): + super(OhpTask, self).on_failure(exc, task_id, args, kwargs, einfo) + + def on_success(self, retval, task_id, args, kwargs): + super(OhpTask, self).on_success(retval, task_id, args, kwargs) + + +celery_app = Celery( + "ohp_worker", + include=[ + "worker.expr_tasks", + ], + task_cls=OhpTask) +celery_app.conf.update( + CELERY_BROKER_URL=safe_get_config("CELERY_BROKER_URL", ""), + CELERY_RESULT_BACKEND=safe_get_config("CELERY_RESULT_BACKEND", ""), + CELERY_DEFAULT_EXCHANGE='oph', + CELERYD_CONCURRENCY=20, + CELERYD_MAX_TASKS_PER_CHILD=50, + CELERY_DEFAULT_QUEUE='ohp.tasks.queue', + CELERY_DEFAULT_ROUTING_KEY='ohp.tasks', + CELERY_ALWAYS_EAGER=celery_always_eager, + CELERY_TASK_ALWAYS_EAGER=celery_always_eager, +) diff --git a/open-hackathon-server/src/hackathon/worker/expr_tasks.py b/open-hackathon-server/src/hackathon/worker/expr_tasks.py new file mode 100644 index 000000000..66af88fad --- /dev/null +++ b/open-hackathon-server/src/hackathon/worker/expr_tasks.py @@ -0,0 +1,36 @@ +from hackathon.hmongo.models import Experiment, Hackathon +from hackathon.constants import EStatus +from hackathon import RequiredFeature + +from . import celery_app as worker + +template_library = RequiredFeature("template_library") + + +@worker.task(name="batch_start_expr") +def pre_alloc_expr(hackathon_id, template, count): + template_content = template_library.load_template(template) + hackathon = Hackathon.objects(id=hackathon_id).first() + + for i in range(count): + expr = Experiment(status=EStatus.INIT, + template=template, + virtual_environments=[], + hackathon=hackathon) + expr.save() + start_new_expr.delay(hackathon_id, str(expr.id), template_content) + + +@worker.task(name="start_new_expr") +def start_new_expr(hackathon_id, expr_id, template_content): + pass + + +@worker.task(name="stop_expr") +def stop_expr(hackathon_id, expr_id): + pass + + +@worker.task(name="clean_resources") +def clean_resources(hackathon_id): + pass From cc78503ec56fc75799037bc4d16685c300695e85 Mon Sep 17 00:00:00 2001 From: Hypo Date: Sun, 19 Apr 2020 13:52:32 +0800 Subject: [PATCH 2/8] init cloud providers --- .../src/hackathon/cloud_providers/__init__.py | 29 +++++ .../src/hackathon/cloud_providers/k8s.py | 18 +++ .../src/hackathon/cloud_providers/util.py | 30 +++++ .../src/hackathon/expr/expr_mgr.py | 105 +++++------------- .../src/hackathon/expr/expr_starter.py | 5 +- .../src/hackathon/expr/k8s_expr_starter.py | 2 +- .../src/hackathon/worker/expr_tasks.py | 14 --- .../src/tests/apitest/test_experiment_api.py | 15 ++- 8 files changed, 119 insertions(+), 99 deletions(-) create mode 100644 open-hackathon-server/src/hackathon/cloud_providers/__init__.py create mode 100644 open-hackathon-server/src/hackathon/cloud_providers/k8s.py create mode 100644 open-hackathon-server/src/hackathon/cloud_providers/util.py diff --git a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py new file mode 100644 index 000000000..b8c8e9bdc --- /dev/null +++ b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py @@ -0,0 +1,29 @@ +__all__ = ["Provider"] + +_provider_classes = {} + + +class Provider: + def __init__(self, name, provider_cfg): + self.p_class = _provider_classes[name] + self.p = self.p_class(provider_cfg) + + self.p.connect() + + def create_expr(self, hackathon, experiment, template_content): + pass + + def start_expr(self, hackathon, experiment, template_content): + # TODO + pass + + def pause_expr(self, hackathon, experiment, template_content): + # TODO + pass + + def delete_expr(self, hackathon, experiment, template_content): + pass + + +def registry_provider(name, provider): + _provider_classes[name] = provider diff --git a/open-hackathon-server/src/hackathon/cloud_providers/k8s.py b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py new file mode 100644 index 000000000..68b7c8ce7 --- /dev/null +++ b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py @@ -0,0 +1,18 @@ +from .util import BaseProvider + + +class K8sProvider(BaseProvider): + def connect(self): + pass + + def create_instance(self, ins_cfg): + pass + + def start_instance(self, ins_cfg): + pass + + def pause_instance(self, ins_cfg): + pass + + def delete_instance(self, ins_cfg): + pass diff --git a/open-hackathon-server/src/hackathon/cloud_providers/util.py b/open-hackathon-server/src/hackathon/cloud_providers/util.py new file mode 100644 index 000000000..926ebd9ed --- /dev/null +++ b/open-hackathon-server/src/hackathon/cloud_providers/util.py @@ -0,0 +1,30 @@ +import abc +from collections import namedtuple + + +class BaseProvider(abc.ABC): + def __init__(self, cfg): + self.cfg = cfg + + @abc.abstractmethod + def connect(self): + pass + + @abc.abstractmethod + def create_instance(self, ins_cfg): + pass + + @abc.abstractmethod + def start_instance(self, ins_cfg): + pass + + @abc.abstractmethod + def pause_instance(self, ins_cfg): + pass + + @abc.abstractmethod + def delete_instance(self, ins_cfg): + pass + + +Instance = namedtuple("Instance", field_names=[]) diff --git a/open-hackathon-server/src/hackathon/expr/expr_mgr.py b/open-hackathon-server/src/hackathon/expr/expr_mgr.py index a008e9f97..02015c2d9 100644 --- a/open-hackathon-server/src/hackathon/expr/expr_mgr.py +++ b/open-hackathon-server/src/hackathon/expr/expr_mgr.py @@ -12,10 +12,10 @@ from mongoengine import Q from hackathon import Component, RequiredFeature, Context -from hackathon.constants import EStatus, VERemoteProvider, VE_PROVIDER, VEStatus, ReservedUser, \ - HACK_NOTICE_EVENT, HACK_NOTICE_CATEGORY, CLOUD_PROVIDER, HACKATHON_CONFIG -from hackathon.hmongo.models import Experiment, User, Hackathon, UserHackathon, Template +from hackathon.constants import EStatus, VERemoteProvider, VE_PROVIDER, VEStatus, CLOUD_PROVIDER, HACKATHON_CONFIG +from hackathon.hmongo.models import Experiment, User, Hackathon from hackathon.hackathon_response import not_found, ok +from hackathon.worker.expr_tasks import start_new_expr, stop_expr, clean_resources __all__ = ["ExprManager"] @@ -78,9 +78,8 @@ def stop_expr(self, expr_id): self.log.debug("begin to stop %s" % str(expr_id)) expr = Experiment.objects(id=expr_id).first() if expr is not None: - starter = self.get_starter(expr.hackathon, expr.template) - if starter: - starter.stop_expr(Context(experiment_id=expr.id, experiment=expr)) + hackathon = expr.hackathon + stop_expr.delay(str(hackathon.id), str(expr.id)) self.log.debug("experiment %s ended success" % expr_id) return ok('OK') else: @@ -88,22 +87,9 @@ def stop_expr(self, expr_id): def get_expr_status_and_confirm_starting(self, expr_id): expr = Experiment.objects(id=expr_id).first() - if expr: - return self.__report_expr_status(expr, isToConfirmExprStarting=True) - else: + if not expr: return not_found('Experiment Not found') - - def check_expr_status(self, experiment): - # update experiment status - virtual_environment_list = experiment.virtual_environments - if all(x.status == VEStatus.RUNNING for x in virtual_environment_list) \ - and len(virtual_environment_list) == experiment.template.virtual_environment_count: - experiment.status = EStatus.RUNNING - experiment.save() - try: - self.template_library.template_verified(experiment.template.id) - except: - pass + return self.__report_expr_status(expr, isToConfirmExprStarting=True) def get_expr_list_by_hackathon_id(self, hackathon, context): # get a list of all experiments' detail @@ -175,7 +161,7 @@ def pre_allocate_expr(self, context): remain_num = min(allowed_currency, pre_num) - start_num self.log.debug( "no starting template: %s , remain num is %d ... " % (template.name, remain_num)) - self.start_pre_alloc_exprs(None, template.name, hackathon.name, remain_num) + self.start_pre_alloc_exprs(template.name, hackathon.name, remain_num) break except Exception as e: self.log.error(e) @@ -210,68 +196,39 @@ def __verify_hackathon(self, hackathon_name): else: raise NotFound("Hackathon with name %s not found" % hackathon_name) - def get_starter(self, hackathon, template): - # load expr starter - starter = None - if not hackathon or not template: - return starter - - # TODO Interim workaround for kubernetes, need real implementation - if hackathon.config.get('cloud_provider') == CLOUD_PROVIDER.KUBERNETES: - return RequiredFeature("k8s_service") - - if template.provider == VE_PROVIDER.DOCKER: - raise NotImplementedError() - elif template.provider == VE_PROVIDER.AZURE: - raise NotImplementedError() - elif template.provider == VE_PROVIDER.K8S: - starter = RequiredFeature("k8s_service") - - return starter - def __start_new_expr(self, hackathon, template, user): - starter = self.get_starter(hackathon, template) - - if not starter: - raise PreconditionFailed("either template not supported or hackathon resource not configured") - - context = starter.start_expr(Context( + expr = Experiment( + status=EStatus.INIT, template=template, user=user, - hackathon=hackathon, - pre_alloc_enabled=False)) + virtual_environments=[], + hackathon=hackathon) + expr.save() + + template_content = self.template_library.load_template(template) + start_new_expr.delay(str(hackathon.id), str(expr.id), template_content) - return self.__report_expr_status(context.experiment) + return self.__report_expr_status(expr) - def start_pre_alloc_exprs(self, user, template_name, hackathon_name=None, pre_alloc_num=0): + def start_pre_alloc_exprs(self, template_name, hackathon_name=None, pre_alloc_num=0): self.log.debug("start_pre_alloc_exprs: %d " % pre_alloc_num) if pre_alloc_num == 0: return hackathon = self.__verify_hackathon(hackathon_name) template = self.__verify_template(hackathon, template_name) - - starter = self.get_starter(hackathon, template) - if not starter: - raise PreconditionFailed("either template not supported or hackathon resource not configured") + template_content = self.template_library.load_template(template) while pre_alloc_num > 0: - context = starter.start_expr(Context( - template=template, - user=user, - hackathon=hackathon, - pre_alloc_enabled=True)) - - if context == None: - self.log.debug("pre_alloc_num left: %d " % pre_alloc_num) - break - else: - self.__report_expr_status(context.experiment) - pre_alloc_num -= 1 + expr = Experiment(status=EStatus.INIT, + template=template, + virtual_environments=[], + hackathon=hackathon) + expr.save() + start_new_expr.delay(str(hackathon.id), str(expr.id), template_content) + pre_alloc_num -= 1 - def on_expr_started(self, experiment): - hackathon = experiment.hackathon - user = experiment.user + self.log.debug("start_pre_alloc_exprs: finish") def __report_expr_status(self, expr, isToConfirmExprStarting=False): # todo check whether need to restart Window-expr if it shutdown @@ -399,12 +356,8 @@ def roll_back(self, expr_id): self.log.warn("rollback failed due to experiment not found") return - starter = self.get_starter(expr.hackathon, expr.template) - if not starter: - self.log.warn("rollback failed due to no starter found") - return - - return starter.rollback(Context(experiment=expr)) + hackathon = expr.hackathon + stop_expr.delay(str(hackathon.id), str(expr.id)) def __get_expr_with_detail(self, experiment): self.__check_expr_real_status(experiment) diff --git a/open-hackathon-server/src/hackathon/expr/expr_starter.py b/open-hackathon-server/src/hackathon/expr/expr_starter.py index 1d7e60236..0042446ad 100644 --- a/open-hackathon-server/src/hackathon/expr/expr_starter.py +++ b/open-hackathon-server/src/hackathon/expr/expr_starter.py @@ -34,15 +34,12 @@ def start_expr(self, context): expr.save() template_content = self.template_library.load_template(context.template) - expr.status = EStatus.STARTING - expr.save() - # context contains complex object, we need create another serializable one with only simple fields new_context = Context(template_content=template_content, template_name=context.template.name, hackathon_id=context.hackathon.id, experiment_id=expr.id, - pre_alloc_enabled = context.pre_alloc_enabled) + pre_alloc_enabled=context.pre_alloc_enabled) if context.get("user", None): new_context.user_id = context.user.id if self._internal_start_expr(new_context): diff --git a/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py b/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py index 8b9d1de02..35c5e32d1 100644 --- a/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py +++ b/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py @@ -51,7 +51,7 @@ def _internal_start_expr(self, context): status=VEStatus.INIT, remote_provider=VERemoteProvider.Guacamole)) - experiment.status = EStatus.INIT + experiment.status = EStatus.STARTING experiment.save() self.log.debug("virtual_environments %s created, creating k8s..." % _env_name) self.__schedule_start(context) diff --git a/open-hackathon-server/src/hackathon/worker/expr_tasks.py b/open-hackathon-server/src/hackathon/worker/expr_tasks.py index 66af88fad..b23523e07 100644 --- a/open-hackathon-server/src/hackathon/worker/expr_tasks.py +++ b/open-hackathon-server/src/hackathon/worker/expr_tasks.py @@ -7,20 +7,6 @@ template_library = RequiredFeature("template_library") -@worker.task(name="batch_start_expr") -def pre_alloc_expr(hackathon_id, template, count): - template_content = template_library.load_template(template) - hackathon = Hackathon.objects(id=hackathon_id).first() - - for i in range(count): - expr = Experiment(status=EStatus.INIT, - template=template, - virtual_environments=[], - hackathon=hackathon) - expr.save() - start_new_expr.delay(hackathon_id, str(expr.id), template_content) - - @worker.task(name="start_new_expr") def start_new_expr(hackathon_id, expr_id, template_content): pass diff --git a/open-hackathon-server/src/tests/apitest/test_experiment_api.py b/open-hackathon-server/src/tests/apitest/test_experiment_api.py index 60f026f91..71b4e3cbb 100644 --- a/open-hackathon-server/src/tests/apitest/test_experiment_api.py +++ b/open-hackathon-server/src/tests/apitest/test_experiment_api.py @@ -2,8 +2,15 @@ class TestUserExperiment(ApiTestCase): - def test_start_experiment(self): - pass - def test_stop_experiment(self): - pass + def test_start_experiment(self, user1): + self.client.post("/api/user/experiment") + + def test_stop_experiment(self, user1): + self.client.delete("/api/user/experiment") + + def test_get_experiment(self, user1): + self.client.get("/api/user/experiment") + + def test_experiment_heart_beat(self, user1): + self.client.put("/api/user/experiment") From 1ef77457e633771f00581877f1b08fe0f222efa0 Mon Sep 17 00:00:00 2001 From: Hypo Date: Sun, 19 Apr 2020 15:52:04 +0800 Subject: [PATCH 3/8] add k8s cloud provider --- .../src/hackathon/cloud_providers/__init__.py | 3 + .../src/hackathon/cloud_providers/k8s.py | 329 +++++++++++++++++- .../src/hackathon/cloud_providers/util.py | 4 + .../src/tests/worker/__init__.py | 0 4 files changed, 332 insertions(+), 4 deletions(-) create mode 100644 open-hackathon-server/src/tests/worker/__init__.py diff --git a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py index b8c8e9bdc..8323c9afb 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py @@ -24,6 +24,9 @@ def pause_expr(self, hackathon, experiment, template_content): def delete_expr(self, hackathon, experiment, template_content): pass + def wait_for_ready(self): + pass + def registry_provider(name, provider): _provider_classes[name] = provider diff --git a/open-hackathon-server/src/hackathon/cloud_providers/k8s.py b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py index 68b7c8ce7..8151559e8 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/k8s.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py @@ -1,18 +1,339 @@ +import time +import logging +import yaml as yaml_tool +from urllib3 import disable_warnings +from urllib3.exceptions import InsecureRequestWarning, HTTPError +from kubernetes import client +from kubernetes.client.rest import ApiException + +from hackathon.constants import HEALTH, HEALTH_STATUS, K8S_DEPLOYMENT_STATUS + from .util import BaseProvider +LOG = logging.getLogger(__name__) +DEFAULT_CONNECT_TIMEOUT = 60 +DEFAULT_RESYNC_TIME = 10 + +# 忽略自签名 ca 的告警日志 +disable_warnings(InsecureRequestWarning) + + +class K8sResourceError(Exception): + err_id = "" + err_msg_format = "{}: {}" + + def __init__(self, err_msg): + self._err_msg = err_msg + + @property + def err_msg(self): + return self.err_msg_format.format(self.err_id, self._err_msg) + + +class YmlParseError(K8sResourceError): + err_id = "K8s Yaml parse error" + + +class EnvError(K8sResourceError): + err_id = "K8s environment error" + + +class DeploymentError(EnvError): + err_id = "K8s deployment error" + + +class ServiceError(EnvError): + err_id = "K8s service error" + + +class StatefulSetError(EnvError): + err_id = "K8s StatefulSet error" + + +class PVCError(EnvError): + err_id = "K8s PersistentVolumeClaims error" + class K8sProvider(BaseProvider): - def connect(self): - pass + def __init__(self, cfg): + super(K8sProvider, self).__init__(cfg) + + self.namespace = cfg['namespace'] + self.api_url = cfg['api_url'] + self.token = cfg['token'] + + configuration = client.Configuration() + configuration.host = self.api_url + configuration.api_key['authorization'] = 'bearer ' + self.token + + # TODO support ca + configuration.verify_ssl = False + self.configuration = configuration + + self.api_client = None + + def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT): + report = self.report_health(timeout) + return report[HEALTH.STATUS] == HEALTH_STATUS.OK + + def report_health(self, timeout=20): + try: + api_instance = client.CoreV1Api(self.api_client) + api_instance.list_namespaced_pod(self.namespace, timeout_seconds=timeout) + return {HEALTH.STATUS: HEALTH_STATUS.OK} + except ApiException as e: + LOG.error("connect k8s api server error: {}".format(e)) + return { + HEALTH.STATUS: HEALTH_STATUS.ERROR, + HEALTH.DESCRIPTION: "Get Pod info error: {}".format(e), + } + except HTTPError: + return { + HEALTH.STATUS: HEALTH_STATUS.ERROR, + HEALTH.DESCRIPTION: "Connect K8s ApiServer {} error: connection timeout".format(self.api_url), + } def create_instance(self, ins_cfg): - pass + try: + for pvc in ins_cfg.persistent_volume_claims: + self.create_k8s_pvc(pvc) + + for i, s in enumerate(ins_cfg.services): + svc_name = self.create_k8s_service(s) + # overwrite service config and get the public port from K8s + ins_cfg.services[i] = yaml_tool.dump(self.get_service_by_name(svc_name)) + + for d in ins_cfg.deployments: + self.create_k8s_deployment(d) + + for s in ins_cfg.stateful_sets: + self.create_k8s_statefulset(s) + except Exception as e: + LOG.error("k8s_service_start_failed: {}".format(e)) + return ins_cfg def start_instance(self, ins_cfg): + # TODO pass def pause_instance(self, ins_cfg): + # TODO pass def delete_instance(self, ins_cfg): - pass + for d in ins_cfg.deployments: + self.delete_k8s_deployment(d['metadata']['name']) + + for pvc in ins_cfg.persistent_volume_claims: + self.delete_k8s_pvc(pvc['metadata']['name']) + + for s in ins_cfg.stateful_sets: + self.delete_k8s_statefulset(s['metadata']['name']) + + for s in ins_cfg.services: + self.delete_k8s_service(s['metadata']['name']) + + def wait_instance_ready(self, ins_cfg, timeout=None): + start_at = time.time() + while True: + all_ready = True + for d in ins_cfg.deployments: + if all_ready and self.get_deployment_status(d['metadata']['name']) != K8S_DEPLOYMENT_STATUS.AVAILABLE: + all_ready = False + continue + + for s in ins_cfg.stateful_sets: + # TODO check statfulSet status + pass + + if all_ready: + return all_ready + + time.sleep(DEFAULT_RESYNC_TIME) + if timeout is not None and time.time() > start_at + timeout: + raise RuntimeError("Start deployment error: Timeout") + + ### + # Deployment + ### + + def list_deployments(self, labels=None, timeout=20): + _deployments = [] + kwargs = {"timeout_seconds": timeout, "watch": False} + if labels and isinstance(labels, dict): + label_selector = ",".join(["{}={}".format(k, v) for k, v in list(labels.items())]) + kwargs['label_selector'] = label_selector + + apps_v1_group = client.AppsV1Api(self.api_client) + try: + ret = apps_v1_group.list_namespaced_deployment(self.namespace, **kwargs) + except ApiException as e: + LOG.error(e) + return [] + + for i in ret.items: + _deployments.append(i.metadata.name) + return _deployments + + def deployment_exists(self, name): + return self.get_deployment_by_name(name, need_raise=False) is not None + + def create_k8s_deployment(self, yaml): + api_instance = client.AppsV1Api(self.api_client) + if isinstance(yaml, str) or isinstance(yaml, str): + # Only support ONE deployment yaml + yaml = yaml_tool.load(yaml) + assert isinstance(yaml, dict), "Start a deployment without legal yaml." + metadata = yaml.get("metadata", {}) + deploy_name = metadata.get("name") + + try: + if self.get_deployment_by_name(deploy_name, need_raise=False): + raise DeploymentError("Deployment name was existed.") + + api_instance.create_namespaced_deployment(self.namespace, yaml, async_req=False) + except ApiException as e: + LOG.error("Start deployment error: {}".format(e)) + raise DeploymentError("Start deployment error: {}".format(e)) + return deploy_name + + def get_deployment_by_name(self, deployment_name, need_raise=True): + api_instance = client.AppsV1Api(self.api_client) + try: + _deploy = api_instance.read_namespaced_deployment(deployment_name, self.namespace) + except ApiException: + if need_raise: + raise DeploymentError("Deplotment {} not found".format(deployment_name)) + return None + return _deploy + + def get_deployment_status(self, deployment_name): + _deploy = self.get_deployment_by_name(deployment_name) + _status = _deploy.status + if not _status.replicas: + return K8S_DEPLOYMENT_STATUS.PAUSE + if _status.replicas == _status.available_replicas: + return K8S_DEPLOYMENT_STATUS.AVAILABLE + if _status.unavailable_replicas > 0: + return K8S_DEPLOYMENT_STATUS.ERROR + return K8S_DEPLOYMENT_STATUS.PENDING + + def start_k8s_deployment(self, deployment_name): + _deploy = self.get_deployment_by_name(deployment_name) + api_instance = client.AppsV1Api(self.api_client) + if not _deploy: + raise DeploymentError("Deployment {} not found".format(deployment_name)) + + _spec = _deploy.spec + if _spec.replicas > 0: + return deployment_name + _spec.replicas = 1 + api_instance.patch_namespaced_deployment(deployment_name, self.namespace, _deploy) + LOG.info("Started existed deployment: {}".format(deployment_name)) + + def pause_k8s_deployment(self, deployment_name): + _deploy = self.get_deployment_by_name(deployment_name) + _spec = _deploy.spec + _spec.replicas = 0 + api_instance = client.AppsV1Api(self.api_client) + try: + api_instance.patch_namespaced_deployment(deployment_name, self.namespace, _deploy) + except ApiException as e: + LOG.error("Pause deployment error: {}".format(e)) + raise DeploymentError("Pause {} error {}".format(deployment_name, e)) + LOG.info("Paused existed deployment: {}".format(deployment_name)) + + def delete_k8s_deployment(self, deployment_name): + api_instance = client.AppsV1Api(self.api_client) + try: + api_instance.delete_namespaced_deployment(deployment_name, self.namespace) + except ApiException as e: + LOG.error("Delete deployment error: {}".format(e)) + raise DeploymentError("Delete {} error {}".format(deployment_name, e)) + LOG.info("Deleted existed deployment: {}".format(deployment_name)) + + ### + # Service + ### + + def get_service_by_name(self, service_name, need_raise=True): + api_instance = client.CoreV1Api(self.api_client) + try: + _svc = api_instance.read_namespaced_service(service_name, self.namespace) + except ApiException: + if need_raise: + raise ServiceError("Service {} not found".format(service_name)) + return None + return _svc.to_dict() + + def create_k8s_service(self, yaml): + if isinstance(yaml, str) or isinstance(yaml, str): + # Only support ONE deployment yaml + yaml = yaml_tool.load(yaml) + assert isinstance(yaml, dict), "Create a service without legal yaml." + + api_instance = client.CoreV1Api(self.api_client) + try: + svc = api_instance.create_namespaced_service(self.namespace, yaml) + return svc.to_dict()['metadata']['name'] + except ApiException as e: + LOG.error("Create service error: {}".format(e)) + raise ServiceError("Create service error: {}".format(e)) + + def delete_k8s_service(self, service_name): + api_instance = client.CoreV1Api(self.api_client) + try: + api_instance.delete_namespaced_service(service_name, self.namespace) + except ApiException as e: + LOG.error("Delete service error: {}".format(e)) + raise ServiceError("Delete service error: {}".format(e)) + + ### + # StatefulSet + ### + + def create_k8s_statefulset(self, yaml): + if isinstance(yaml, str) or isinstance(yaml, str): + # Only support ONE deployment yaml + yaml = yaml_tool.load(yaml) + assert isinstance(yaml, dict), "Create a statefulset without legal yaml." + + api_instance = client.AppsV1Api(self.api_client) + try: + api_instance.create_namespaced_stateful_set(self.namespace, yaml) + except ApiException as e: + LOG.error("Create StatefulSet error: {}".format(e)) + raise StatefulSetError("Create StatefulSet error: {}".format(e)) + + def delete_k8s_statefulset(self, statefulset_name): + api_instance = client.AppsV1Api(self.api_client) + try: + api_instance.delete_namespaced_stateful_set(statefulset_name, self.namespace) + except ApiException as e: + LOG.error("Delete StatefulSet error: {}".format(e)) + raise StatefulSetError("Delete StatefulSet error: {}".format(e)) + + ### + # PersistentVolumeClaim + ### + + def create_k8s_pvc(self, yaml): + if isinstance(yaml, str) or isinstance(yaml, str): + # Only support ONE deployment yaml + yaml = yaml_tool.load(yaml) + assert isinstance(yaml, dict), "Create a PVC without legal yaml." + + api_instance = client.CoreV1Api(self.api_client) + try: + api_instance.create_namespaced_persistent_volume_claim(self.namespace, yaml) + except ApiException as e: + LOG.error("Create PVC error: {}".format(e)) + raise PVCError("Create PVC error: {}".format(e)) + + def delete_k8s_pvc(self, pvc_name): + api_instance = client.CoreV1Api(self.api_client) + try: + api_instance.delete_namespaced_persistent_volume_claim(pvc_name, self.namespace) + except ApiException as e: + LOG.error("Delete PVC error: {}".format(e)) + raise PVCError("Delete PVC error: {}".format(e)) diff --git a/open-hackathon-server/src/hackathon/cloud_providers/util.py b/open-hackathon-server/src/hackathon/cloud_providers/util.py index 926ebd9ed..ca2177281 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/util.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/util.py @@ -26,5 +26,9 @@ def pause_instance(self, ins_cfg): def delete_instance(self, ins_cfg): pass + @abc.abstractmethod + def wait_instance_ready(self, ins_cfg, timeout=None): + pass + Instance = namedtuple("Instance", field_names=[]) diff --git a/open-hackathon-server/src/tests/worker/__init__.py b/open-hackathon-server/src/tests/worker/__init__.py new file mode 100644 index 000000000..e69de29bb From 3027911b6bba80c6176712534c1e8d067b667418 Mon Sep 17 00:00:00 2001 From: Hypo Date: Sun, 19 Apr 2020 17:39:35 +0800 Subject: [PATCH 4/8] del starter and k8s adapter --- .../src/hackathon/__init__.py | 3 +- .../src/hackathon/cloud_providers/__init__.py | 103 +++++- .../src/hackathon/cloud_providers/k8s.py | 21 +- .../src/hackathon/cloud_providers/util.py | 10 +- .../src/hackathon/expr/__init__.py | 1 - .../src/hackathon/expr/expr_starter.py | 114 ------ .../src/hackathon/expr/k8s_expr_starter.py | 334 ------------------ .../src/hackathon/hk8s/__init__.py | 8 - .../src/hackathon/hk8s/errors.py | 34 -- .../src/hackathon/hk8s/k8s_service_adapter.py | 239 ------------- .../src/hackathon/hk8s/service_adapter.py | 23 -- .../src/hackathon/hmongo/models.py | 30 ++ .../template/docker_template_unit.py | 4 +- .../hackathon/template/template_content.py | 16 +- .../src/hackathon/template/template_unit.py | 5 +- 15 files changed, 154 insertions(+), 791 deletions(-) delete mode 100644 open-hackathon-server/src/hackathon/expr/expr_starter.py delete mode 100644 open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py delete mode 100644 open-hackathon-server/src/hackathon/hk8s/__init__.py delete mode 100644 open-hackathon-server/src/hackathon/hk8s/errors.py delete mode 100644 open-hackathon-server/src/hackathon/hk8s/k8s_service_adapter.py delete mode 100644 open-hackathon-server/src/hackathon/hk8s/service_adapter.py diff --git a/open-hackathon-server/src/hackathon/__init__.py b/open-hackathon-server/src/hackathon/__init__.py index ab858d3ed..3d43f8ea6 100644 --- a/open-hackathon-server/src/hackathon/__init__.py +++ b/open-hackathon-server/src/hackathon/__init__.py @@ -179,9 +179,8 @@ def init_db(): def init_expr_components(): - from .expr import ExprManager, K8SExprStarter + from .expr import ExprManager factory.provide("expr_manager", ExprManager) - factory.provide("k8s_service", K8SExprStarter) def init_voice_verify(): diff --git a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py index 8323c9afb..0eebfdd16 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py @@ -1,32 +1,111 @@ +import time +import random +import string +import logging + +from hackathon.hmongo.models import K8sEnvironment, VirtualEnvironment +from hackathon.constants import VE_PROVIDER, EStatus, VEStatus, VERemoteProvider + __all__ = ["Provider"] +LOG = logging.getLogger(__name__) + _provider_classes = {} +class ProviderError(Exception): + pass + + class Provider: - def __init__(self, name, provider_cfg): - self.p_class = _provider_classes[name] + def __init__(self, provider_name, provider_cfg): + if provider_name not in _provider_classes: + raise ProviderError("{} provider not found", provider_name) + self.provider_name = provider_name + self.p_class = _provider_classes[provider_name] self.p = self.p_class(provider_cfg) - self.p.connect() + try: + self.p.connect() + except Exception as e: + raise ProviderError("connect Provider Platform error: {}".format(e)) - def create_expr(self, hackathon, experiment, template_content): - pass + def create_expr(self, experiment, template_content): + LOG.info("creating experiment {}".format(experiment.id)) + start_at = time.time() + experiment.status = EStatus.STARTING + experiment.save() + + try: + new_env = self.p.create_instance(self._get_ve_config(experiment, template_content)) + experiment.virtual_environments[0] = new_env + experiment.save() - def start_expr(self, hackathon, experiment, template_content): + self.wait_for_ready() + except Exception as e: + LOG.error("wait for start error: {}".format(e)) + experiment.status = EStatus.FAILED + else: + experiment.virtual_environments[0].status = VEStatus.RUNNING + experiment.status = EStatus.RUNNING + experiment.save() + LOG.info("create expr spend time: {}".format(time.time() - start_at)) + + def start_expr(self, experiment, template_content): # TODO pass - def pause_expr(self, hackathon, experiment, template_content): + def pause_expr(self, experiment, template_content): # TODO pass - def delete_expr(self, hackathon, experiment, template_content): - pass + def delete_expr(self, experiment, template_content): + LOG.info("deleting experiment {}".format(experiment.id)) + + try: + self.p.delete_instance(self._get_ve_config(experiment, template_content)) + except Exception as e: + LOG.error("delete expr error: {}".format(e)) + experiment.status = EStatus.FAILED + else: + experiment.status = EStatus.STOPPED + experiment.save() + LOG.info("experiment {} deleted".format(experiment.id)) def wait_for_ready(self): - pass + self.p.wait_instance_ready() + + def _get_ve_config(self, experiment, template_content): + if not experiment.virtual_environments or len(experiment.virtual_environments) == 0: + experiment.virtual_environments = [self._init_ve_config(experiment, template_content)] + experiment.save() + + return experiment.virtual_environments[0] + + def _init_ve_config(self, experiment, template_content): + if self.provider_name == VE_PROVIDER.K8S: + hackathon = experiment.hackathon + unit = template_content.unit + env_name = "{}-{}".format(hackathon.name, "".join(random.sample(string.ascii_lowercase, 6))) + yaml_content = unit.gen_k8s_yaml(env_name) + k8s_env = K8sEnvironment.load_from_yaml(env_name, yaml_content) + return VirtualEnvironment( + provider=VE_PROVIDER.K8S, + name=env_name, + status=VEStatus.INIT, + remote_provider=VERemoteProvider.Guacamole, + k8s_resource=k8s_env, + ) + raise ProviderError("not found {} instance config".format(self.provider_name)) + + +def registry_provider(provider_name, provider_class): + _provider_classes[provider_name] = provider_class + + +def _registry(): + from .k8s import K8sProvider + registry_provider(VE_PROVIDER.K8S, K8sProvider) -def registry_provider(name, provider): - _provider_classes[name] = provider +_registry() diff --git a/open-hackathon-server/src/hackathon/cloud_providers/k8s.py b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py index 8151559e8..a15d06fe9 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/k8s.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py @@ -93,7 +93,8 @@ def report_health(self, timeout=20): HEALTH.DESCRIPTION: "Connect K8s ApiServer {} error: connection timeout".format(self.api_url), } - def create_instance(self, ins_cfg): + def create_instance(self, ve_cfg): + ins_cfg = ve_cfg.k8s_resource try: for pvc in ins_cfg.persistent_volume_claims: self.create_k8s_pvc(pvc) @@ -112,15 +113,16 @@ def create_instance(self, ins_cfg): LOG.error("k8s_service_start_failed: {}".format(e)) return ins_cfg - def start_instance(self, ins_cfg): + def start_instance(self, ve_cfg): # TODO pass - def pause_instance(self, ins_cfg): + def pause_instance(self, ve_cfg): # TODO pass - def delete_instance(self, ins_cfg): + def delete_instance(self, ve_cfg): + ins_cfg = ve_cfg.k8s_resource for d in ins_cfg.deployments: self.delete_k8s_deployment(d['metadata']['name']) @@ -133,7 +135,8 @@ def delete_instance(self, ins_cfg): for s in ins_cfg.services: self.delete_k8s_service(s['metadata']['name']) - def wait_instance_ready(self, ins_cfg, timeout=None): + def wait_instance_ready(self, ve_cfg, timeout=None): + ins_cfg = ve_cfg.k8s_resource start_at = time.time() while True: all_ready = True @@ -182,7 +185,7 @@ def create_k8s_deployment(self, yaml): api_instance = client.AppsV1Api(self.api_client) if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Start a deployment without legal yaml." metadata = yaml.get("metadata", {}) deploy_name = metadata.get("name") @@ -269,7 +272,7 @@ def get_service_by_name(self, service_name, need_raise=True): def create_k8s_service(self, yaml): if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Create a service without legal yaml." api_instance = client.CoreV1Api(self.api_client) @@ -295,7 +298,7 @@ def delete_k8s_service(self, service_name): def create_k8s_statefulset(self, yaml): if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Create a statefulset without legal yaml." api_instance = client.AppsV1Api(self.api_client) @@ -320,7 +323,7 @@ def delete_k8s_statefulset(self, statefulset_name): def create_k8s_pvc(self, yaml): if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Create a PVC without legal yaml." api_instance = client.CoreV1Api(self.api_client) diff --git a/open-hackathon-server/src/hackathon/cloud_providers/util.py b/open-hackathon-server/src/hackathon/cloud_providers/util.py index ca2177281..3904ca05b 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/util.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/util.py @@ -11,23 +11,23 @@ def connect(self): pass @abc.abstractmethod - def create_instance(self, ins_cfg): + def create_instance(self, ve_cfg): pass @abc.abstractmethod - def start_instance(self, ins_cfg): + def start_instance(self, ve_cfg): pass @abc.abstractmethod - def pause_instance(self, ins_cfg): + def pause_instance(self, ve_cfg): pass @abc.abstractmethod - def delete_instance(self, ins_cfg): + def delete_instance(self, ve_cfg): pass @abc.abstractmethod - def wait_instance_ready(self, ins_cfg, timeout=None): + def wait_instance_ready(self, ve_cfg, timeout=None): pass diff --git a/open-hackathon-server/src/hackathon/expr/__init__.py b/open-hackathon-server/src/hackathon/expr/__init__.py index b585885b0..dcf6412db 100644 --- a/open-hackathon-server/src/hackathon/expr/__init__.py +++ b/open-hackathon-server/src/hackathon/expr/__init__.py @@ -4,4 +4,3 @@ """ from hackathon.expr.expr_mgr import ExprManager -from hackathon.expr.k8s_expr_starter import K8SExprStarter diff --git a/open-hackathon-server/src/hackathon/expr/expr_starter.py b/open-hackathon-server/src/hackathon/expr/expr_starter.py deleted file mode 100644 index 0042446ad..000000000 --- a/open-hackathon-server/src/hackathon/expr/expr_starter.py +++ /dev/null @@ -1,114 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -import sys - -sys.path.append("..") - -from hackathon import Component, RequiredFeature, Context -from hackathon.hmongo.models import Experiment -from hackathon.constants import EStatus, VEStatus - -__all__ = ["ExprStarter"] - - -class ExprStarter(Component): - """Base for experiment starter""" - - template_library = RequiredFeature("template_library") - - def start_expr(self, context): - """To start a new Experiment asynchronously - - :type context: Context - :param context: the execution context. - - """ - expr = Experiment(status=EStatus.INIT, - template=context.template, - user=context.user, - virtual_environments=[], - hackathon=context.hackathon) - expr.save() - - template_content = self.template_library.load_template(context.template) - # context contains complex object, we need create another serializable one with only simple fields - new_context = Context(template_content=template_content, - template_name=context.template.name, - hackathon_id=context.hackathon.id, - experiment_id=expr.id, - pre_alloc_enabled=context.pre_alloc_enabled) - if context.get("user", None): - new_context.user_id = context.user.id - if self._internal_start_expr(new_context): - new_context.experiment = expr - return new_context - self.rollback(new_context) - return None - - def stop_expr(self, context): - """Stop experiment asynchronously - - :type context: Context - :param context: the execution context. - - """ - return self._internal_stop_expr(context) - - def rollback(self, context): - """cancel/rollback a expr which is in error state - - :type context: Context - :param context: the execution context. - - """ - return self._internal_rollback(context) - - def _internal_start_expr(self, context): - raise NotImplementedError() - - def _internal_stop_expr(self, context): - raise NotImplementedError() - - def _internal_rollback(self, context): - raise NotImplementedError() - - def _on_virtual_environment_failed(self, context): - self.rollback(context) - - def _on_virtual_environment_success(self, context): - expr = Experiment.objects(id=context.experiment_id).no_dereference() \ - .only("status", "virtual_environments").first() - if all(ve.status == VEStatus.RUNNING for ve in expr.virtual_environments): - expr.status = EStatus.RUNNING - expr.save() - self._on_expr_started(context) - - self._hooks_on_virtual_environment_success(context) - - def _on_virtual_environment_stopped(self, context): - expr = Experiment.objects(id=context.experiment_id).no_dereference() \ - .only("status", "virtual_environments").first() - ve = expr.virtual_environments.get(name=context.virtual_environment_name) - ve.status = VEStatus.STOPPED - - if all(ve.status == VEStatus.STOPPED for ve in expr.virtual_environments): - expr.status = EStatus.STOPPED - expr.save() - - def _on_virtual_environment_unexpected_error(self, context): - self.log.warn("experiment unexpected error: " + context.experiment_id) - expr = Experiment.objects(id=context.experiment_id).no_dereference() \ - .only("status", "virtual_environments").first() - if "virtual_environment_name" in context: - expr.virtual_environments.get(name=context.virtual_environment_name).status = VEStatus.UNEXPECTED_ERROR - expr.save() - - def _hooks_on_virtual_environment_success(self, context): - pass - - def _on_expr_started(self, context): - # send notice - pass diff --git a/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py b/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py deleted file mode 100644 index 35c5e32d1..000000000 --- a/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py +++ /dev/null @@ -1,334 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" -import yaml -import time -import string -import random - -from hackathon.expr.expr_starter import ExprStarter -from hackathon.hmongo.models import K8sEnvironment -from hackathon.hmongo.models import Hackathon, VirtualEnvironment, Experiment -from hackathon.constants import (VE_PROVIDER, VERemoteProvider, VEStatus, EStatus) -from hackathon.hackathon_response import internal_server_error -from hackathon.constants import K8S_DEPLOYMENT_STATUS -from hackathon.template.template_constants import K8S_UNIT -from hackathon.hk8s.k8s_service_adapter import K8SServiceAdapter - - -class K8SExprStarter(ExprStarter): - def _internal_start_expr(self, context): - hackathon = Hackathon.objects.get(id=context.hackathon_id) - experiment = Experiment.objects.get(id=context.experiment_id) - template_content = context.template_content - - if not experiment or not hackathon: - return internal_server_error('Failed starting k8s: experiment or hackathon not found.') - - user = experiment.user or None - _virtual_envs = [] - _env_name = str(hackathon.name + "-" + template_content.name).lower() - if user: - _virtual_envs = experiment.virtual_environments - _env_name += str("-" + user.name).lower() - _env_name = "{}-{}".format(_env_name, "".join(random.sample(string.lowercase, 6))) - - try: - if not _virtual_envs: - # Get None VirtualEnvironment, create new one: - labels = { - "hacking.kaiyuanshe.cn/hackathon": str(hackathon.id), - "hacking.kaiyuanshe.cn/experiment": str(experiment.id), - "hacking.kaiyuanshe.cn/virtual_environment": _env_name, - } - k8s_env = self.__create_useful_k8s_resource(_env_name, template_content, labels) - - experiment.virtual_environments.append(VirtualEnvironment( - provider=VE_PROVIDER.K8S, - name=_env_name, - k8s_resource=k8s_env, - status=VEStatus.INIT, - remote_provider=VERemoteProvider.Guacamole)) - - experiment.status = EStatus.STARTING - experiment.save() - self.log.debug("virtual_environments %s created, creating k8s..." % _env_name) - self.__schedule_start(context) - except Exception as e: - self.log.error(e) - experiment.status = EStatus.FAILED - experiment.save() - return False - - return True - - def _internal_stop_expr(self, context): - experiment = Experiment.objects.get(id=context.experiment_id) - if not experiment: - return internal_server_error('Failed stop k8s: experiment not found.') - try: - self.__schedule_stop(context) - except Exception as e: - self.log.error(e) - experiment.status = EStatus.FAILED - experiment.save() - return internal_server_error('Failed stopping k8s') - - def _internal_rollback(self, context): - self.__schedule_stop(context) - - def __schedule_start(self, ctx): - self.scheduler.add_once("k8s_service", "schedule_start_k8s_service", context=ctx, - id="schedule_setup_" + str(ctx.experiment_id), seconds=0) - - def __schedule_stop(self, ctx): - self.scheduler.add_once("k8s_service", "schedule_stop_k8s_service", context=ctx, - id="schedule_stop_" + str(ctx.experiment_id), seconds=0) - - def schedule_start_k8s_service(self, context): - experiment = Experiment.objects.get(id=context.experiment_id) - virtual_env = experiment.virtual_environments[0] - k8s_resource = virtual_env.k8s_resource - adapter = self.__get_adapter_from_ctx(K8SServiceAdapter, context) - - try: - for pvc in k8s_resource.persistent_volume_claims: - adapter.create_k8s_pvc(pvc) - - for i, s in enumerate(k8s_resource.services): - svc_name = adapter.create_k8s_service(s) - # overwrite service config and get the public port from K8s - k8s_resource.services[i] = yaml.dump(adapter.get_service_by_name(svc_name)) - - for d in k8s_resource.deployments: - adapter.create_k8s_deployment(d) - - for s in k8s_resource.stateful_sets: - adapter.create_k8s_statefulset(s) - - self.__wait_for_k8s_ready(adapter, k8s_resource.deployments, k8s_resource.stateful_sets) - self.__config_endpoint(experiment, k8s_resource.services) - except Exception as e: - self.log.error("k8s_service_start_failed: {}".format(e)) - - def schedule_stop_k8s_service(self, context): - experiment = Experiment.objects.get(id=context.experiment_id) - virtual_envs = experiment.virtual_environments - adapter = self.__get_adapter_from_ctx(K8SServiceAdapter, context) - try: - for virtual_env in virtual_envs: - for d in virtual_env.deployments: - adapter.delete_k8s_deployment(d['metadata']['name']) - - for pvc in virtual_env.persistent_volume_claims: - adapter.delete_k8s_pvc(pvc['metadata']['name']) - - for s in virtual_env.stateful_sets: - adapter.delete_k8s_statefulset(s['metadata']['name']) - - for s in virtual_env.services: - adapter.delete_k8s_service(s['metadata']['name']) - - self.log.debug("k8s_service_stop: {}".format(context)) - except Exception as e: - self.log.error("k8s_service_stop_failed: {}".format(e)) - experiment.delete() - - @staticmethod - def __create_useful_k8s_resource(env_name, template_content, labels): - """ helper func to generate available and unique resources yaml - - Currently supported resources - - Deployment - - Service - - StatefulSet - - PersistentVolumeClaim - - :param env_name: - :param template_content: - :param labels: - :return: - """ - - k8s_env = K8sEnvironment( - name=env_name, - deployments=[ - yaml.dump(TemplateRender(env_name, "deployment", d, labels).render()) - for d in template_content.get_resource("deployment") - ], - services=[ - yaml.dump(TemplateRender(env_name, "service", s, labels).render()) - for s in template_content.get_resource("service") - ], - statefulsets=[ - yaml.dump(TemplateRender(env_name, "statefulset", s, labels).render()) - for s in template_content.get_resource("statefulset") - ], - persistent_volume_claims=[ - yaml.dump(TemplateRender(env_name, "statefulset", p, labels).render()) - for p in template_content.get_resource("persistentvolumeclaim") - ], - ) - - return k8s_env - - @staticmethod - def __wait_for_k8s_ready(adapter, deployments, stateful_sets): - # TODO Sleep for ready is NOT GOOD IDEA, Why not use watch api? - # Wait up to 30 minutes - end_time = int(time.time()) + 60 * 60 * 30 - - for d_yaml in deployments: - d = yaml.load(d_yaml) - while adapter.get_deployment_status(d['metadata']['name']) != K8S_DEPLOYMENT_STATUS.AVAILABLE: - time.sleep(1) - if int(time.time()) > end_time: - raise RuntimeError("Start deployment error: Timeout") - - for s in stateful_sets: - # TODO check statfulSet status - pass - - def __config_endpoint(self, expr, services): - self.log.debug("experiment started %s successfully. Setting remote parameters." % expr.id) - # set experiment status - # update the status of virtual environment - virtual_env = expr.virtual_environments[0] - template = expr.template - cluster = template.k8s_cluster - ingress = cluster.ingress - if not ingress or not services: - self.log.info("Has no endpoint config") - return - assert isinstance(ingress, list) - svc = None - for s_yaml in services: - s = yaml.load(s_yaml) - if s['spec'].get("type") == "NodePort": - svc = s - break - if not svc: - return - - ports = svc['spec'].get("ports", []) - if not ports: - self.log.info("Has no endpoint config") - return - public_port = ports[0].get("node_port") - if not public_port: - return - - gc = { - K8S_UNIT.REMOTE_PARAMETER_NAME: virtual_env.name, - K8S_UNIT.REMOTE_PARAMETER_DISPLAY_NAME: svc['metadata']['name'], - K8S_UNIT.REMOTE_PARAMETER_HOST_NAME: random.choice(ingress), - K8S_UNIT.REMOTE_PARAMETER_PROTOCOL: "vnc", - K8S_UNIT.REMOTE_PARAMETER_PORT: public_port, - # K8S_UNIT.REMOTE_PARAMETER_USER_NAME: "", - # K8S_UNIT.REMOTE_PARAMETER_PASSWORD: "", - } - self.log.debug("expriment %s remote parameters: %s" % (expr.id, str(gc))) - virtual_env.remote_paras = gc - - virtual_env.status = VEStatus.RUNNING - expr.status = EStatus.RUNNING - expr.save() - - @staticmethod - def __get_adapter_from_ctx(adapter_class, context): - template_content = context.template_content - cluster = template_content.cluster_info - return adapter_class(cluster.api_url, cluster.token, cluster.namespace) - - -class TemplateRender: - """ - Types: - - Deployment - - Service - - StatefulSet - - PersistentVolumeClaim - """ - - def __init__(self, resource_name, resource_type, yml, labels): - self.resource_name = resource_name - self.resource_type = str(resource_type).lower() - self.yaml = yml - self.labels = labels - - def render(self): - - if self.resource_type == "deployment": - return self.__render_deploy() - - if self.resource_type == "service": - return self.__render_svc() - - if self.resource_type == "statefulset": - return self.__render_stateful_set() - - if self.resource_type == "persistentvolumeclaim": - return self.__render_pvc() - - def __render_deploy(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - deploy_labels = metadata.get("labels") or {} - deploy_labels.update(self.labels) - metadata['labels'] = deploy_labels - - spec = self.yaml['spec'] - pod_template = spec['template'] - pod_metadata = pod_template['metadata'] - pod_labels = pod_metadata.get("labels") or {} - pod_labels.update(self.labels) - pod_metadata['labels'] = pod_labels - - if "selector" in spec: - match_labels = spec['selector'].get("matchLabels") or {} - match_labels.update(self.labels) - spec['selector']['matchLabels'] = match_labels - - # Make sure PVC is no conflict - if "volumes" in spec: - for v in spec['volumes']: - if "persistentVolumeClaim" not in v: - continue - pvc = v['persistentVolumeClaim'] - pvc['claimName'] = "{}-{}".format(self.resource_name, pvc['claimName']) - return self.yaml - - def __render_svc(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - svc_labels = metadata.get("labels") or {} - svc_labels.update(self.labels) - metadata['labels'] = svc_labels - - spec = self.yaml['spec'] - label_selector = spec.get("selector") or {} - label_selector.update(self.labels) - return self.yaml - - def __render_stateful_set(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - ss_labels = metadata.get("labels") or {} - ss_labels.update(self.labels) - metadata['labels'] = ss_labels - - spec = self.yaml['spec'] - if "selector" in spec: - match_labels = spec['selector'].get("matchLabels") or {} - match_labels.update(self.labels) - spec['selector']['matchLabels'] = match_labels - return self.yaml - - def __render_pvc(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - pvc_labels = metadata.get("labels") or {} - pvc_labels.update(self.labels) - metadata['labels'] = pvc_labels - return self.yaml diff --git a/open-hackathon-server/src/hackathon/hk8s/__init__.py b/open-hackathon-server/src/hackathon/hk8s/__init__.py deleted file mode 100644 index e9378c2b6..000000000 --- a/open-hackathon-server/src/hackathon/hk8s/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -__author__ = "KaiYuanShe" - -from kubernetes import client, config, utils diff --git a/open-hackathon-server/src/hackathon/hk8s/errors.py b/open-hackathon-server/src/hackathon/hk8s/errors.py deleted file mode 100644 index 329c2d5b6..000000000 --- a/open-hackathon-server/src/hackathon/hk8s/errors.py +++ /dev/null @@ -1,34 +0,0 @@ -class K8sResourceError(Exception): - err_id = "" - err_msg_format = "{}: {}" - - def __init__(self, err_msg): - self._err_msg = err_msg - - @property - def err_msg(self): - return self.err_msg_format.format(self.err_id, self._err_msg) - - -class YmlParseError(K8sResourceError): - err_id = "K8s Yaml parse error" - - -class EnvError(K8sResourceError): - err_id = "K8s environment error" - - -class DeploymentError(EnvError): - err_id = "K8s deployment error" - - -class ServiceError(EnvError): - err_id = "K8s service error" - - -class StatefulSetError(EnvError): - err_id = "K8s StatefulSet error" - - -class PVCError(EnvError): - err_id = "K8s PersistentVolumeClaims error" diff --git a/open-hackathon-server/src/hackathon/hk8s/k8s_service_adapter.py b/open-hackathon-server/src/hackathon/hk8s/k8s_service_adapter.py deleted file mode 100644 index 0b67dcf0b..000000000 --- a/open-hackathon-server/src/hackathon/hk8s/k8s_service_adapter.py +++ /dev/null @@ -1,239 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -from urllib3 import disable_warnings -from urllib3.exceptions import InsecureRequestWarning, HTTPError - -import yaml as yaml_tool -from kubernetes import client -from kubernetes.client.rest import ApiException - -from hackathon.constants import HEALTH, HEALTH_STATUS, K8S_DEPLOYMENT_STATUS -from .service_adapter import ServiceAdapter - -from .errors import DeploymentError, ServiceError, StatefulSetError, PVCError - -__all__ = ["K8SServiceAdapter"] -disable_warnings(InsecureRequestWarning) - - -class K8SServiceAdapter(ServiceAdapter): - def __init__(self, api_url, token, namespace): - configuration = client.Configuration() - configuration.host = api_url - configuration.api_key['authorization'] = 'bearer ' + token - # FIXME import ca cert file? - configuration.verify_ssl = False - - self.namespace = namespace - self.api_url = api_url - self.api_client = client.ApiClient(configuration) - super(K8SServiceAdapter, self).__init__(self.api_client) - - def ping(self, timeout=20): - report = self.report_health(timeout) - return report[HEALTH.STATUS] == HEALTH_STATUS.OK - - def report_health(self, timeout=20): - try: - api_instance = client.CoreV1Api(self.api_client) - api_instance.list_namespaced_pod(self.namespace, timeout_seconds=timeout) - return {HEALTH.STATUS: HEALTH_STATUS.OK} - except ApiException as e: - self.log.error(e) - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: "Get Pod info error: {}".format(e), - } - except HTTPError: - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: "Connect K8s ApiServer {} error: connection timeout".format(self.api_url), - } - - ### - # Deployment - ### - - def list_deployments(self, labels=None, timeout=20): - _deployments = [] - kwargs = {"timeout_seconds": timeout, "watch": False} - if labels and isinstance(labels, dict): - label_selector = ",".join(["{}={}".format(k, v) for k, v in list(labels.items())]) - kwargs['label_selector'] = label_selector - - apps_v1_group = client.AppsV1Api(self.api_client) - try: - ret = apps_v1_group.list_namespaced_deployment(self.namespace, **kwargs) - except ApiException as e: - self.log.error(e) - return [] - - for i in ret.items: - _deployments.append(i.metadata.name) - return _deployments - - def deployment_exists(self, name): - return self.get_deployment_by_name(name, need_raise=False) is not None - - def create_k8s_deployment(self, yaml): - api_instance = client.AppsV1Api(self.api_client) - if isinstance(yaml, str) or isinstance(yaml, str): - # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) - assert isinstance(yaml, dict), "Start a deployment without legal yaml." - metadata = yaml.get("metadata", {}) - deploy_name = metadata.get("name") - - try: - if self.get_deployment_by_name(deploy_name, need_raise=False): - raise DeploymentError("Deployment name was existed.") - - api_instance.create_namespaced_deployment(self.namespace, yaml, async_req=False) - except ApiException as e: - self.log.error("Start deployment error: {}".format(e)) - raise DeploymentError("Start deployment error: {}".format(e)) - return deploy_name - - def get_deployment_by_name(self, deployment_name, need_raise=True): - api_instance = client.AppsV1Api(self.api_client) - try: - _deploy = api_instance.read_namespaced_deployment(deployment_name, self.namespace) - except ApiException: - if need_raise: - raise DeploymentError("Deplotment {} not found".format(deployment_name)) - return None - return _deploy - - def get_deployment_status(self, deployment_name): - _deploy = self.get_deployment_by_name(deployment_name) - _status = _deploy.status - if not _status.replicas: - return K8S_DEPLOYMENT_STATUS.PAUSE - if _status.replicas == _status.available_replicas: - return K8S_DEPLOYMENT_STATUS.AVAILABLE - if _status.unavailable_replicas > 0: - return K8S_DEPLOYMENT_STATUS.ERROR - return K8S_DEPLOYMENT_STATUS.PENDING - - def start_k8s_deployment(self, deployment_name): - _deploy = self.get_deployment_by_name(deployment_name) - api_instance = client.AppsV1Api(self.api_client) - if not _deploy: - raise DeploymentError("Deployment {} not found".format(deployment_name)) - - _spec = _deploy.spec - if _spec.replicas > 0: - return deployment_name - _spec.replicas = 1 - api_instance.patch_namespaced_deployment(deployment_name, self.namespace, _deploy) - self.log.info("Started existed deployment: {}".format(deployment_name)) - - def pause_k8s_deployment(self, deployment_name): - _deploy = self.get_deployment_by_name(deployment_name) - _spec = _deploy.spec - _spec.replicas = 0 - api_instance = client.AppsV1Api(self.api_client) - try: - api_instance.patch_namespaced_deployment(deployment_name, self.namespace, _deploy) - except ApiException as e: - self.log.error("Pause deployment error: {}".format(e)) - raise DeploymentError("Pause {} error {}".format(deployment_name, e)) - self.log.info("Paused existed deployment: {}".format(deployment_name)) - - def delete_k8s_deployment(self, deployment_name): - api_instance = client.AppsV1Api(self.api_client) - try: - api_instance.delete_namespaced_deployment(deployment_name, self.namespace) - except ApiException as e: - self.log.error("Delete deployment error: {}".format(e)) - raise DeploymentError("Delete {} error {}".format(deployment_name, e)) - self.log.info("Deleted existed deployment: {}".format(deployment_name)) - - ### - # Service - ### - - def get_service_by_name(self, service_name, need_raise=True): - api_instance = client.CoreV1Api(self.api_client) - try: - _svc = api_instance.read_namespaced_service(service_name, self.namespace) - except ApiException: - if need_raise: - raise ServiceError("Service {} not found".format(service_name)) - return None - return _svc.to_dict() - - def create_k8s_service(self, yaml): - if isinstance(yaml, str) or isinstance(yaml, str): - # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) - assert isinstance(yaml, dict), "Create a service without legal yaml." - - api_instance = client.CoreV1Api(self.api_client) - try: - svc = api_instance.create_namespaced_service(self.namespace, yaml) - return svc.to_dict()['metadata']['name'] - except ApiException as e: - self.log.error("Create service error: {}".format(e)) - raise ServiceError("Create service error: {}".format(e)) - - def delete_k8s_service(self, service_name): - api_instance = client.CoreV1Api(self.api_client) - try: - api_instance.delete_namespaced_service(service_name, self.namespace) - except ApiException as e: - self.log.error("Delete service error: {}".format(e)) - raise ServiceError("Delete service error: {}".format(e)) - - ### - # StatefulSet - ### - - def create_k8s_statefulset(self, yaml): - if isinstance(yaml, str) or isinstance(yaml, str): - # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) - assert isinstance(yaml, dict), "Create a statefulset without legal yaml." - - api_instance = client.AppsV1Api(self.api_client) - try: - api_instance.create_namespaced_stateful_set(self.namespace, yaml) - except ApiException as e: - self.log.error("Create StatefulSet error: {}".format(e)) - raise StatefulSetError("Create StatefulSet error: {}".format(e)) - - def delete_k8s_statefulset(self, statefulset_name): - api_instance = client.AppsV1Api(self.api_client) - try: - api_instance.delete_namespaced_stateful_set(statefulset_name, self.namespace) - except ApiException as e: - self.log.error("Delete StatefulSet error: {}".format(e)) - raise StatefulSetError("Delete StatefulSet error: {}".format(e)) - - ### - # PersistentVolumeClaim - ### - - def create_k8s_pvc(self, yaml): - if isinstance(yaml, str) or isinstance(yaml, str): - # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) - assert isinstance(yaml, dict), "Create a PVC without legal yaml." - - api_instance = client.CoreV1Api(self.api_client) - try: - api_instance.create_namespaced_persistent_volume_claim(self.namespace, yaml) - except ApiException as e: - self.log.error("Create PVC error: {}".format(e)) - raise PVCError("Create PVC error: {}".format(e)) - - def delete_k8s_pvc(self, pvc_name): - api_instance = client.CoreV1Api(self.api_client) - try: - api_instance.delete_namespaced_persistent_volume_claim(pvc_name, self.namespace) - except ApiException as e: - self.log.error("Delete PVC error: {}".format(e)) - raise PVCError("Delete PVC error: {}".format(e)) diff --git a/open-hackathon-server/src/hackathon/hk8s/service_adapter.py b/open-hackathon-server/src/hackathon/hk8s/service_adapter.py deleted file mode 100644 index db87c56e4..000000000 --- a/open-hackathon-server/src/hackathon/hk8s/service_adapter.py +++ /dev/null @@ -1,23 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -__author__ = "rapidhere" -__all__ = ["ServiceAdapter"] - -from hackathon import Component - - -class ServiceAdapter(Component): - """the abstract ServiceAdapter with proxy pattern - - this adapter delegate the method and properties to the inner proxy, so make - the adpater has all functions that adaptee has - """ - - def __init__(self, service): - self.service = service - - def __getattr__(self, name): - return getattr(self.service, name) diff --git a/open-hackathon-server/src/hackathon/hmongo/models.py b/open-hackathon-server/src/hackathon/hmongo/models.py index e2ea4efaa..b41b4084c 100644 --- a/open-hackathon-server/src/hackathon/hmongo/models.py +++ b/open-hackathon-server/src/hackathon/hmongo/models.py @@ -3,6 +3,7 @@ This file is covered by the LICENSING file in the root of this project. """ +import yaml import hashlib from mongoengine import QuerySet, DateTimeField, DynamicDocument, EmbeddedDocument, StringField, \ BooleanField, IntField, DynamicEmbeddedDocument, EmbeddedDocumentListField, URLField, ListField, \ @@ -466,6 +467,35 @@ class K8sEnvironment(DynamicEmbeddedDocument): services = ListField() stateful_sets = ListField() + @classmethod + def load_from_yaml(cls, name, content): + deployments = [] + services = [] + + # TODO + persistent_volume_claims = [] + stateful_sets = [] + + resources = yaml.safe_load_all(content) + + for r in resources: + if 'Kind' not in r: + continue + + if r['Kind'] == "Deployment": + deployments.append(r) + + if r['Kind'] == "Service": + services.append(r) + + return cls( + name=name, + deployments=deployments, + persistent_volume_claims=persistent_volume_claims, + services=services, + stateful_sets=stateful_sets, + ) + class VirtualEnvironment(DynamicEmbeddedDocument): """ diff --git a/open-hackathon-server/src/hackathon/template/docker_template_unit.py b/open-hackathon-server/src/hackathon/template/docker_template_unit.py index f4a4b0d72..da360932f 100644 --- a/open-hackathon-server/src/hackathon/template/docker_template_unit.py +++ b/open-hackathon-server/src/hackathon/template/docker_template_unit.py @@ -60,7 +60,9 @@ class DockerTemplateUnit(TemplateUnit): """ def __init__(self, config): - super(DockerTemplateUnit, self).__init__(VE_PROVIDER.DOCKER) + # Conversion provider type to K8s + # all docker container will be created on K8s + super(DockerTemplateUnit, self).__init__(VE_PROVIDER.K8S) self.image = DOCKER_UNIT.IMAGE net_configs = config.get(DOCKER_UNIT.NET_CONFIG, []) diff --git a/open-hackathon-server/src/hackathon/template/template_content.py b/open-hackathon-server/src/hackathon/template/template_content.py index 2f0189c47..c909adf63 100644 --- a/open-hackathon-server/src/hackathon/template/template_content.py +++ b/open-hackathon-server/src/hackathon/template/template_content.py @@ -22,9 +22,9 @@ class TemplateContent: def __init__(self, name, description, environment_config): self.name = name self.description = description - self.environment = self.__load_environment(environment_config) + self.unit = self.__load_unit(environment_config) - self.provider = self.environment.provider + self.provider = self.unit.provider # TODO delete self.resource = defaultdict(list) @@ -40,25 +40,25 @@ def load_from_template(cls, template): def docker_image(self): if self.provider != VE_PROVIDER.DOCKER: return "" - return self.environment.image + return self.unit.image @property def network_configs(self): if self.provider != VE_PROVIDER.DOCKER: return [] - return self.environment.network_configs + return self.unit.network_configs @property def yml_template(self): if self.provider != VE_PROVIDER.K8S: return "" - return self.environment.yml_template + return self.unit.yml_template @property def template_args(self): if self.provider != VE_PROVIDER.K8S: return {} - return self.environment.template_args + return self.unit.template_args def is_valid(self): if self.provider is None: @@ -68,10 +68,10 @@ def is_valid(self): return True if self.provider == VE_PROVIDER.K8S: - return self.environment.is_valid() is True + return self.unit.is_valid() is True @classmethod - def __load_environment(cls, environment_config): + def __load_unit(cls, environment_config): provider = int(environment_config[TEMPLATE.VIRTUAL_ENVIRONMENT_PROVIDER]) if provider == VE_PROVIDER.DOCKER: return DockerTemplateUnit(environment_config) diff --git a/open-hackathon-server/src/hackathon/template/template_unit.py b/open-hackathon-server/src/hackathon/template/template_unit.py index 883b56eb0..93fb338ca 100644 --- a/open-hackathon-server/src/hackathon/template/template_unit.py +++ b/open-hackathon-server/src/hackathon/template/template_unit.py @@ -3,8 +3,10 @@ This file is covered by the LICENSING file in the root of this project. """ +import abc -class TemplateUnit(object): + +class TemplateUnit(abc.ABC): """Unit of TemplateContent. Each unit represents a virtual_environment that can be started or stopped independently""" @@ -16,5 +18,6 @@ def __init__(self, provider): """ self.provider = provider + @abc.abstractmethod def gen_k8s_yaml(self, expr_name): raise NotImplemented From 9a77bd00d9a8da63062fb12474b2b1a58cfe14bc Mon Sep 17 00:00:00 2001 From: Hypo Date: Sun, 19 Apr 2020 18:22:24 +0800 Subject: [PATCH 5/8] update celery worker --- .../src/hackathon/cloud_providers/__init__.py | 31 +++++----- .../src/hackathon/cloud_providers/k8s.py | 39 ++++--------- .../src/hackathon/expr/expr_mgr.py | 4 -- .../hackathon/template/template_content.py | 10 ++-- .../src/hackathon/worker/expr_tasks.py | 57 ++++++++++++++++--- 5 files changed, 81 insertions(+), 60 deletions(-) diff --git a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py index 0eebfdd16..fd1830725 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py @@ -6,7 +6,7 @@ from hackathon.hmongo.models import K8sEnvironment, VirtualEnvironment from hackathon.constants import VE_PROVIDER, EStatus, VEStatus, VERemoteProvider -__all__ = ["Provider"] +__all__ = ["Provider", "ProviderError"] LOG = logging.getLogger(__name__) @@ -18,11 +18,11 @@ class ProviderError(Exception): class Provider: - def __init__(self, provider_name, provider_cfg): - if provider_name not in _provider_classes: - raise ProviderError("{} provider not found", provider_name) - self.provider_name = provider_name - self.p_class = _provider_classes[provider_name] + def __init__(self, provider_type, provider_cfg): + if provider_type not in _provider_classes: + raise ProviderError("{} provider not found", provider_type) + self.provider_type = provider_type + self.p_class = _provider_classes[provider_type] self.p = self.p_class(provider_cfg) try: @@ -51,19 +51,19 @@ def create_expr(self, experiment, template_content): experiment.save() LOG.info("create expr spend time: {}".format(time.time() - start_at)) - def start_expr(self, experiment, template_content): + def start_expr(self, experiment): # TODO pass - def pause_expr(self, experiment, template_content): + def pause_expr(self, experiment): # TODO pass - def delete_expr(self, experiment, template_content): + def delete_expr(self, experiment): LOG.info("deleting experiment {}".format(experiment.id)) try: - self.p.delete_instance(self._get_ve_config(experiment, template_content)) + self.p.delete_instance(experiment.virtual_environments[0]) except Exception as e: LOG.error("delete expr error: {}".format(e)) experiment.status = EStatus.FAILED @@ -75,6 +75,9 @@ def delete_expr(self, experiment, template_content): def wait_for_ready(self): self.p.wait_instance_ready() + def get_remote_config(self): + pass + def _get_ve_config(self, experiment, template_content): if not experiment.virtual_environments or len(experiment.virtual_environments) == 0: experiment.virtual_environments = [self._init_ve_config(experiment, template_content)] @@ -83,7 +86,7 @@ def _get_ve_config(self, experiment, template_content): return experiment.virtual_environments[0] def _init_ve_config(self, experiment, template_content): - if self.provider_name == VE_PROVIDER.K8S: + if self.provider_type == VE_PROVIDER.K8S: hackathon = experiment.hackathon unit = template_content.unit env_name = "{}-{}".format(hackathon.name, "".join(random.sample(string.ascii_lowercase, 6))) @@ -96,11 +99,11 @@ def _init_ve_config(self, experiment, template_content): remote_provider=VERemoteProvider.Guacamole, k8s_resource=k8s_env, ) - raise ProviderError("not found {} instance config".format(self.provider_name)) + raise ProviderError("not found {} instance config".format(self.provider_type)) -def registry_provider(provider_name, provider_class): - _provider_classes[provider_name] = provider_class +def registry_provider(provider_type, provider_class): + _provider_classes[provider_type] = provider_class def _registry(): diff --git a/open-hackathon-server/src/hackathon/cloud_providers/k8s.py b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py index a15d06fe9..3506afd98 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/k8s.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py @@ -18,7 +18,11 @@ disable_warnings(InsecureRequestWarning) -class K8sResourceError(Exception): +class K8sError(Exception): + pass + + +class K8sResourceError(K8sError): err_id = "" err_msg_format = "{}: {}" @@ -30,27 +34,19 @@ def err_msg(self): return self.err_msg_format.format(self.err_id, self._err_msg) -class YmlParseError(K8sResourceError): - err_id = "K8s Yaml parse error" - - -class EnvError(K8sResourceError): - err_id = "K8s environment error" - - -class DeploymentError(EnvError): +class DeploymentError(K8sResourceError): err_id = "K8s deployment error" -class ServiceError(EnvError): +class ServiceError(K8sResourceError): err_id = "K8s service error" -class StatefulSetError(EnvError): +class StatefulSetError(K8sResourceError): err_id = "K8s StatefulSet error" -class PVCError(EnvError): +class PVCError(K8sResourceError): err_id = "K8s PersistentVolumeClaims error" @@ -73,25 +69,14 @@ def __init__(self, cfg): self.api_client = None def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT): - report = self.report_health(timeout) - return report[HEALTH.STATUS] == HEALTH_STATUS.OK - - def report_health(self, timeout=20): try: api_instance = client.CoreV1Api(self.api_client) api_instance.list_namespaced_pod(self.namespace, timeout_seconds=timeout) - return {HEALTH.STATUS: HEALTH_STATUS.OK} + return True except ApiException as e: - LOG.error("connect k8s api server error: {}".format(e)) - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: "Get Pod info error: {}".format(e), - } + raise K8sError("Connect k8s api server error: {}".format(e)) except HTTPError: - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: "Connect K8s ApiServer {} error: connection timeout".format(self.api_url), - } + raise K8sError("Connect K8s ApiServer {} error: connection timeout".format(self.api_url)) def create_instance(self, ve_cfg): ins_cfg = ve_cfg.k8s_resource diff --git a/open-hackathon-server/src/hackathon/expr/expr_mgr.py b/open-hackathon-server/src/hackathon/expr/expr_mgr.py index 02015c2d9..3cce440a4 100644 --- a/open-hackathon-server/src/hackathon/expr/expr_mgr.py +++ b/open-hackathon-server/src/hackathon/expr/expr_mgr.py @@ -2,10 +2,6 @@ """ This file is covered by the LICENSING file in the root of this project. """ - -import sys - -sys.path.append("..") from datetime import timedelta from werkzeug.exceptions import PreconditionFailed, NotFound diff --git a/open-hackathon-server/src/hackathon/template/template_content.py b/open-hackathon-server/src/hackathon/template/template_content.py index c909adf63..0da51b049 100644 --- a/open-hackathon-server/src/hackathon/template/template_content.py +++ b/open-hackathon-server/src/hackathon/template/template_content.py @@ -25,6 +25,7 @@ def __init__(self, name, description, environment_config): self.unit = self.__load_unit(environment_config) self.provider = self.unit.provider + self.provider_cfg = {} # TODO delete self.resource = defaultdict(list) @@ -34,7 +35,9 @@ def __init__(self, name, description, environment_config): @classmethod def load_from_template(cls, template): env_cfg = template.unit_config() - return TemplateContent(template.name, template.description, env_cfg) + provider_cfg = template.k8s_cluster + content = TemplateContent(template.name, template.description, env_cfg) + content.provider_cfg.update(provider_cfg.dic) @property def docker_image(self): @@ -80,11 +83,6 @@ def __load_unit(cls, environment_config): else: raise Exception("unsupported virtual environment provider") - # todo delete - def get_resource(self, resource_type): - # always return a list of resource desc dict or empty - return self.resource[resource_type] - # FIXME deprecated this when support K8s ONLY def to_dict(self): dic = { diff --git a/open-hackathon-server/src/hackathon/worker/expr_tasks.py b/open-hackathon-server/src/hackathon/worker/expr_tasks.py index b23523e07..b5202b795 100644 --- a/open-hackathon-server/src/hackathon/worker/expr_tasks.py +++ b/open-hackathon-server/src/hackathon/worker/expr_tasks.py @@ -1,22 +1,61 @@ -from hackathon.hmongo.models import Experiment, Hackathon -from hackathon.constants import EStatus +import logging + from hackathon import RequiredFeature +from hackathon.hmongo.models import Experiment, Hackathon +from hackathon.template.template_content import TemplateContent +from hackathon.cloud_providers import Provider, ProviderError from . import celery_app as worker template_library = RequiredFeature("template_library") +LOG = logging.getLogger(__name__) @worker.task(name="start_new_expr") def start_new_expr(hackathon_id, expr_id, template_content): - pass + hackathon = Hackathon.objects(id=hackathon_id).first() + if not hackathon: + LOG.error("start new expr without hackathon") + experiment = Experiment.objects(id=expr_id).first() + if not experiment: + LOG.error("start new expr without experiment") + if hackathon.templates is None or len(hackathon.templates) == 0: + LOG.error("start new expr without template") -@worker.task(name="stop_expr") -def stop_expr(hackathon_id, expr_id): - pass + unit = template_content.unit + provider = _try_connect(unit.provider, template_content.provider_cfg) + if not provider: + return + provider.create_expr(experiment, template_content) -@worker.task(name="clean_resources") -def clean_resources(hackathon_id): - pass + # todo init Guacamole config + _ = provider.get_remote_config() + + +@worker.task(name="stop_expr") +def stop_expr(hackathon_id, expr_id): + hackathon = Hackathon.objects(id=hackathon_id).first() + if not hackathon: + LOG.error("stop expr without hackathon") + experiment = Experiment.objects(id=expr_id).first() + if not experiment: + LOG.error("stop expr without experiment") + + template = experiment.template + template_content = TemplateContent.load_from_template(template) + unit = template_content.unit + provider = _try_connect(unit.provider, template_content.provider_cfg) + if not provider: + return + + provider.delete_expr(experiment) + + +def _try_connect(provider, provider_cfg): + try: + return Provider(provider, provider_cfg) + except ProviderError as e: + LOG.error("init provider error: {}".format(e)) + return None From 2b021bafc6b314aa526f8c50289da72fc89e15bd Mon Sep 17 00:00:00 2001 From: Hypo Date: Sun, 19 Apr 2020 18:34:07 +0800 Subject: [PATCH 6/8] fix typo --- .../src/hackathon/cloud_providers/__init__.py | 8 ++++---- open-hackathon-server/src/hackathon/worker/__init__.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py index fd1830725..16f75ff6c 100644 --- a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py @@ -102,13 +102,13 @@ def _init_ve_config(self, experiment, template_content): raise ProviderError("not found {} instance config".format(self.provider_type)) -def registry_provider(provider_type, provider_class): +def register_provider(provider_type, provider_class): _provider_classes[provider_type] = provider_class -def _registry(): +def _register(): from .k8s import K8sProvider - registry_provider(VE_PROVIDER.K8S, K8sProvider) + register_provider(VE_PROVIDER.K8S, K8sProvider) -_registry() +_register() diff --git a/open-hackathon-server/src/hackathon/worker/__init__.py b/open-hackathon-server/src/hackathon/worker/__init__.py index 5a1d71b42..d565bf7ed 100644 --- a/open-hackathon-server/src/hackathon/worker/__init__.py +++ b/open-hackathon-server/src/hackathon/worker/__init__.py @@ -31,7 +31,7 @@ def on_success(self, retval, task_id, args, kwargs): celery_app.conf.update( CELERY_BROKER_URL=safe_get_config("CELERY_BROKER_URL", ""), CELERY_RESULT_BACKEND=safe_get_config("CELERY_RESULT_BACKEND", ""), - CELERY_DEFAULT_EXCHANGE='oph', + CELERY_DEFAULT_EXCHANGE='ohp', CELERYD_CONCURRENCY=20, CELERYD_MAX_TASKS_PER_CHILD=50, CELERY_DEFAULT_QUEUE='ohp.tasks.queue', From 9c90f6ee43506bb8488cd9f8035bc4708f3492aa Mon Sep 17 00:00:00 2001 From: Hypo Date: Sun, 19 Apr 2020 21:02:07 +0800 Subject: [PATCH 7/8] add check_and_pre_allocate_expr --- .../src/hackathon/constants.py | 7 +- .../src/hackathon/expr/expr_mgr.py | 45 ++----------- .../src/hackathon/hack/hackathon_manager.py | 67 ++----------------- .../src/hackathon/hmongo/models.py | 22 +++++- .../src/hackathon/worker/expr_tasks.py | 1 - .../src/hackathon/worker/hackathon_tasks.py | 49 ++++++++++++++ 6 files changed, 84 insertions(+), 107 deletions(-) create mode 100644 open-hackathon-server/src/hackathon/worker/hackathon_tasks.py diff --git a/open-hackathon-server/src/hackathon/constants.py b/open-hackathon-server/src/hackathon/constants.py index 40ecf655b..f51a5d9bd 100644 --- a/open-hackathon-server/src/hackathon/constants.py +++ b/open-hackathon-server/src/hackathon/constants.py @@ -76,8 +76,11 @@ class HACKATHON_CONFIG: RECYCLE_MINUTES = "recycle_minutes" PRE_ALLOCATE_ENABLED = "pre_allocate_enabled" PRE_ALLOCATE_NUMBER = "pre_allocate_number" + + # TODO delete this: Use MQ and Worker to limit the number of concurrency PRE_ALLOCATE_INTERVAL_SECONDS = "pre_allocate_interval_second" PRE_ALLOCATE_CONCURRENT = "pre_allocate_concurrent" + FREEDOM_TEAM = "freedom_team" CLOUD_PROVIDER = "cloud_provider" DEV_PLAN_REQUIRED = "dev_plan_required" @@ -110,7 +113,6 @@ class VE_PROVIDER: K8S = 3 - class EStatus: """Status for db model Experiment""" INIT = 0 @@ -164,7 +166,6 @@ class ReservedUser: DefaultSuperAdmin = 1 - class ADStatus: """ For status in db model @@ -275,7 +276,6 @@ class DHS_QUERY_STATE: FAILED = 2 - class ServiceDeploymentSlot: """ the slot of service deployment @@ -288,7 +288,6 @@ class ServiceDeploymentSlot: STAGING = 'staging' - class TCPProtocol: """ the protocol of TCP layer diff --git a/open-hackathon-server/src/hackathon/expr/expr_mgr.py b/open-hackathon-server/src/hackathon/expr/expr_mgr.py index 3cce440a4..9a49a3415 100644 --- a/open-hackathon-server/src/hackathon/expr/expr_mgr.py +++ b/open-hackathon-server/src/hackathon/expr/expr_mgr.py @@ -11,7 +11,7 @@ from hackathon.constants import EStatus, VERemoteProvider, VE_PROVIDER, VEStatus, CLOUD_PROVIDER, HACKATHON_CONFIG from hackathon.hmongo.models import Experiment, User, Hackathon from hackathon.hackathon_response import not_found, ok -from hackathon.worker.expr_tasks import start_new_expr, stop_expr, clean_resources +from hackathon.worker.expr_tasks import start_new_expr, stop_expr __all__ = ["ExprManager"] @@ -129,40 +129,6 @@ def scheduler_recycle_expr(self): except Exception as e: self.log.error(e) - def pre_allocate_expr(self, context): - # TODO: too complex, not check - hackathon_id = context.hackathon_id - self.log.debug("executing pre_allocate_expr for hackathon %s " % hackathon_id) - hackathon = Hackathon.objects(id=hackathon_id).first() - hackathon_templates = hackathon.templates - for template in hackathon_templates: - try: - template = template - pre_num = int(hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_NUMBER, 1)) - query = Q(status=EStatus.STARTING) | Q(status=EStatus.RUNNING) - curr_num = Experiment.objects(user=None, hackathon=hackathon, template=template).filter(query).count() - self.log.debug("pre_alloc_exprs: pre_num is %d, curr_num is %d, remain_num is %d " % - (pre_num, curr_num, pre_num - curr_num)) - - # TODO Should support VE_PROVIDER.K8S only in future after k8s Template is supported - # if template.provider == VE_PROVIDER.K8S: - if curr_num < pre_num: - start_num = Experiment.objects(user=None, template=template, status=EStatus.STARTING).count() - allowed_currency = int(hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_CONCURRENT, 1)) - if start_num >= allowed_currency: - self.log.debug( - "there are already %d Experiments starting, will check later ... " % allowed_currency) - return - else: - remain_num = min(allowed_currency, pre_num) - start_num - self.log.debug( - "no starting template: %s , remain num is %d ... " % (template.name, remain_num)) - self.start_pre_alloc_exprs(template.name, hackathon.name, remain_num) - break - except Exception as e: - self.log.error(e) - self.log.error("check default experiment failed") - def assign_expr_to_admin(self, expr): """assign expr to admin to trun expr into pre_allocate_expr @@ -208,23 +174,24 @@ def __start_new_expr(self, hackathon, template, user): def start_pre_alloc_exprs(self, template_name, hackathon_name=None, pre_alloc_num=0): self.log.debug("start_pre_alloc_exprs: %d " % pre_alloc_num) + try_create = 0 if pre_alloc_num == 0: - return + return try_create hackathon = self.__verify_hackathon(hackathon_name) template = self.__verify_template(hackathon, template_name) template_content = self.template_library.load_template(template) - while pre_alloc_num > 0: + while pre_alloc_num > try_create: expr = Experiment(status=EStatus.INIT, template=template, virtual_environments=[], hackathon=hackathon) expr.save() start_new_expr.delay(str(hackathon.id), str(expr.id), template_content) - pre_alloc_num -= 1 - + try_create += 1 self.log.debug("start_pre_alloc_exprs: finish") + return try_create def __report_expr_status(self, expr, isToConfirmExprStarting=False): # todo check whether need to restart Window-expr if it shutdown diff --git a/open-hackathon-server/src/hackathon/hack/hackathon_manager.py b/open-hackathon-server/src/hackathon/hack/hackathon_manager.py index 4b7f9dbdc..c720cc41a 100644 --- a/open-hackathon-server/src/hackathon/hack/hackathon_manager.py +++ b/open-hackathon-server/src/hackathon/hack/hackathon_manager.py @@ -23,6 +23,7 @@ FILE_TYPE, HACK_TYPE, HACKATHON_STAT, DockerHostServerStatus, HACK_NOTICE_CATEGORY, HACK_NOTICE_EVENT, \ ORGANIZATION_TYPE, CLOUD_PROVIDER from hackathon import RequiredFeature, Component, Context +from hackathon.worker.hackathon_tasks import check_and_pre_allocate_expr docker_host_manager = RequiredFeature("docker_host_manager") __all__ = ["HackathonManager"] @@ -272,7 +273,7 @@ def update_hackathon(self, args): # basic xss prevention if 'description' in update_items and update_items['description']: - #update_items['description'] = self.cleaner.clean_html(update_items['description']) + # update_items['description'] = self.cleaner.clean_html(update_items['description']) self.log.debug("hackathon description :" + update_items['description']) hackathon.modify(**update_items) @@ -653,13 +654,13 @@ def get_hackathon_notice_list(self, body): is_read_filter = Q() order_by_condition = '-update_time' - if hackathon_name: #list notices that belong to specfic hackathon + if hackathon_name: # list notices that belong to specfic hackathon hackathon = Hackathon.objects(name=hackathon_name).only('name').first() if hackathon: hackathon_filter = Q(hackathon=hackathon) else: return not_found('hackathon_name not found') - else: #only list online hackathons' notices or notices that not belong to any hackathon + else: # only list online hackathons' notices or notices that not belong to any hackathon online_hackathon = Hackathon.objects(status=HACK_STATUS.ONLINE) hackathon_filter = Q(hackathon__in=online_hackathon) | Q(hackathon=None) @@ -726,47 +727,13 @@ def schedule_pre_allocate_expr_job(self): next_run_time=next_run_time, minutes=20) - def __is_pre_allocate_enabled(self, hackathon): - if hackathon.event_end_time < self.util.get_now(): - return False - # using registration time for better test before event_start_time - if hackathon.registration_start_time > self.util.get_now(): - return False - if hackathon.status != HACK_STATUS.ONLINE: - return False - if hackathon.config.get(HACKATHON_CONFIG.CLOUD_PROVIDER, CLOUD_PROVIDER.NONE) == CLOUD_PROVIDER.NONE: - return False - return hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_ENABLED, False) - def check_hackathon_for_pre_allocate_expr(self): """Check all hackathon for pre-allocate Add an interval job for hackathon if it's pre-allocate is enabled. Otherwise try to remove the schedule job """ - hackathon_list = Hackathon.objects() - for hack in hackathon_list: - job_id = "pre_allocate_expr_" + str(hack.id) - is_job_exists = self.scheduler.has_job(job_id) - if self.__is_pre_allocate_enabled(hack): - if is_job_exists: - self.log.debug("pre_allocate job already exists for hackathon %s" % str(hack.name)) - continue - - self.log.debug("add pre_allocate job for hackathon %s" % str(hack.name)) - next_run_time = self.util.get_now() + timedelta(seconds=(20 * random.random())) - pre_allocate_interval = self.__get_pre_allocate_interval(hack) - self.scheduler.add_interval(feature="expr_manager", - method="pre_allocate_expr", - id=job_id, - context=Context(hackathon_id=hack.id), - next_run_time=next_run_time, - seconds=pre_allocate_interval - ) - elif is_job_exists: - self.log.debug("remove job for hackathon %s since pre_allocate is disabled" % str(hack.id)) - self.scheduler.remove_job(job_id) - return True + check_and_pre_allocate_expr.delay() def hackathon_online(self, hackathon): req = ok() @@ -825,7 +792,7 @@ def __get_hackathon_detail(self, hackathon, user=None): detail["user"] = self.user_manager.user_display_info(user) detail["user"]["is_admin"] = user.is_super or ( - user_hackathon and user_hackathon.role == HACK_USER_TYPE.ADMIN) + user_hackathon and user_hackathon.role == HACK_USER_TYPE.ADMIN) # TODO: we need to review those items one by one to decide the API output # asset = self.db.find_all_objects_by(UserHackathonAsset, user_id=user.id, hackathon_id=hackathon.id) @@ -928,28 +895,6 @@ def __create_hackathon(self, creator, context): return new_hack - def __get_pre_allocate_interval(self, hackathon): - interval = self.get_basic_property(hackathon, HACKATHON_CONFIG.PRE_ALLOCATE_INTERVAL_SECONDS) - if interval: - return int(interval) - else: - return 300 + random.random() * 50 - - def __get_hackathon_configs(self, hackathon): - - def __internal_get_config(): - configs = {} - for c in hackathon.configs.all(): - configs[c.key] = c.value - return configs - - cache_key = self.__get_config_cache_key(hackathon) - return self.cache.get_cache(key=cache_key, createfunc=__internal_get_config) - - def __get_hackathon_organizers(self, hackathon): - organizers = self.db.find_all_objects_by(HackathonOrganizer, hackathon_id=hackathon.id) - return [o.dic() for o in organizers] - def __parse_update_items(self, args, hackathon): """Parse properties that need to update diff --git a/open-hackathon-server/src/hackathon/hmongo/models.py b/open-hackathon-server/src/hackathon/hmongo/models.py index b41b4084c..89f0a2895 100644 --- a/open-hackathon-server/src/hackathon/hmongo/models.py +++ b/open-hackathon-server/src/hackathon/hmongo/models.py @@ -5,12 +5,13 @@ import yaml import hashlib +from datetime import datetime from mongoengine import QuerySet, DateTimeField, DynamicDocument, EmbeddedDocument, StringField, \ BooleanField, IntField, DynamicEmbeddedDocument, EmbeddedDocumentListField, URLField, ListField, \ EmbeddedDocumentField, ReferenceField, UUIDField, DictField, DynamicField, PULL from hackathon.util import get_now, make_serializable -from hackathon.constants import TEMPLATE_STATUS, HACK_USER_TYPE, VE_PROVIDER +from hackathon.constants import TEMPLATE_STATUS, HACK_USER_TYPE, VE_PROVIDER, HACK_STATUS, HACKATHON_CONFIG from hackathon.hmongo.pagination import Pagination from hackathon import app @@ -265,7 +266,7 @@ class Hackathon(HDocumentBase): location = StringField() description = StringField() banners = ListField() - status = IntField(default=0) # 0-new 1-online 2-offline 3-apply-online + status = IntField(default=HACK_STATUS.DRAFT) # 0-new 1-online 2-offline 3-apply-online creator = ReferenceField(User) config = DictField() # max_enrollment, auto_approve, login_provider type = IntField(default=1) # enum.HACK_TYPE @@ -286,6 +287,23 @@ class Hackathon(HDocumentBase): def __init__(self, **kwargs): super(Hackathon, self).__init__(**kwargs) + @property + def in_progress(self): + now = datetime.utcnow() + if self.event_end_time < now: + return False + if self.registration_start_time > now: + return False + if self.status != HACK_STATUS.ONLINE: + return False + return True + + @property + def enable_pre_allocate(self): + if not self.in_progress: + return False + return self.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_ENABLED, False) + class UserHackathon(HDocumentBase): user = ReferenceField(User) diff --git a/open-hackathon-server/src/hackathon/worker/expr_tasks.py b/open-hackathon-server/src/hackathon/worker/expr_tasks.py index b5202b795..1df9fa2a2 100644 --- a/open-hackathon-server/src/hackathon/worker/expr_tasks.py +++ b/open-hackathon-server/src/hackathon/worker/expr_tasks.py @@ -7,7 +7,6 @@ from . import celery_app as worker -template_library = RequiredFeature("template_library") LOG = logging.getLogger(__name__) diff --git a/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py b/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py new file mode 100644 index 000000000..c19800e01 --- /dev/null +++ b/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py @@ -0,0 +1,49 @@ +import logging +from mongoengine import Q + +from hackathon import RequiredFeature +from hackathon.hmongo.models import Experiment, Hackathon +from hackathon.template.template_content import TemplateContent +from hackathon.cloud_providers import Provider, ProviderError +from hackathon.constants import HACK_STATUS, HACKATHON_CONFIG, EStatus + +from . import celery_app as worker +from .expr_tasks import start_new_expr + +LOG = logging.getLogger(__name__) + +expr_manager = RequiredFeature("expr_manager") + + +@worker.task(name="check_and_pre_allocate_expr") +def check_and_pre_allocate_expr(): + hackathon_list = Hackathon.objects(status=HACK_STATUS.ONLINE) + for hack in hackathon_list: + if not hack.enable_pre_allocate: + continue + + LOG.debug("add pre_allocate job for hackathon %s" % str(hack.name)) + pre_allocate_expr.delay(str(hack.id)) + + +@worker.task(name="pre_allocate_expr") +def pre_allocate_expr(hackathon_id): + LOG.info("pre_allocate_expr for hackathon {}".format(hackathon_id)) + hackathon = Hackathon.objects(id=hackathon_id).first() + if not hackathon: + LOG.error("pre_allocate_expr without hackathon") + return + if not hackathon.templates or len(hackathon.templates) == 0: + LOG.error("hackathon has no template") + + template = hackathon.templates[0] + pre_num = int(hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_NUMBER, 1)) + query = Q(status=EStatus.STARTING) | Q(status=EStatus.RUNNING) + curr_num = Experiment.objects(user=None, hackathon=hackathon, template=template).filter(query).count() + + if curr_num >= pre_num: + LOG.info("start 0 expr, finish") + return + + try_create = expr_manager.start_pre_alloc_exprs(template.name, hackathon.name, pre_num - curr_num) + LOG.info("start {} expr, finish".format(try_create)) From 6352adb42e3f2cc3bf4b0f7373eaa76080eed66c Mon Sep 17 00:00:00 2001 From: Hypo Date: Sun, 19 Apr 2020 21:03:25 +0800 Subject: [PATCH 8/8] clear code --- open-hackathon-server/src/hackathon/expr/expr_mgr.py | 6 +++--- .../src/hackathon/hack/hackathon_manager.py | 11 +++-------- open-hackathon-server/src/hackathon/views/__init__.py | 5 ----- .../src/hackathon/worker/hackathon_tasks.py | 3 --- 4 files changed, 6 insertions(+), 19 deletions(-) diff --git a/open-hackathon-server/src/hackathon/expr/expr_mgr.py b/open-hackathon-server/src/hackathon/expr/expr_mgr.py index 9a49a3415..74deec46c 100644 --- a/open-hackathon-server/src/hackathon/expr/expr_mgr.py +++ b/open-hackathon-server/src/hackathon/expr/expr_mgr.py @@ -7,9 +7,9 @@ from werkzeug.exceptions import PreconditionFailed, NotFound from mongoengine import Q -from hackathon import Component, RequiredFeature, Context -from hackathon.constants import EStatus, VERemoteProvider, VE_PROVIDER, VEStatus, CLOUD_PROVIDER, HACKATHON_CONFIG -from hackathon.hmongo.models import Experiment, User, Hackathon +from hackathon import Component, RequiredFeature +from hackathon.constants import EStatus, VERemoteProvider, VE_PROVIDER, VEStatus, HACKATHON_CONFIG +from hackathon.hmongo.models import Experiment, User from hackathon.hackathon_response import not_found, ok from hackathon.worker.expr_tasks import start_new_expr, stop_expr diff --git a/open-hackathon-server/src/hackathon/hack/hackathon_manager.py b/open-hackathon-server/src/hackathon/hack/hackathon_manager.py index c720cc41a..5f9fa2a7d 100644 --- a/open-hackathon-server/src/hackathon/hack/hackathon_manager.py +++ b/open-hackathon-server/src/hackathon/hack/hackathon_manager.py @@ -2,15 +2,10 @@ """ This file is covered by the LICENSING file in the root of this project. """ - -import random -import sys - -sys.path.append("..") import uuid from datetime import timedelta -from werkzeug.exceptions import PreconditionFailed, InternalServerError, BadRequest +from werkzeug.exceptions import PreconditionFailed, InternalServerError from flask import g, request import lxml from lxml.html.clean import Cleaner @@ -20,8 +15,8 @@ Organization, Award, Team from hackathon.hackathon_response import internal_server_error, ok, not_found, general_error, HTTP_CODE, bad_request from hackathon.constants import HACKATHON_CONFIG, HACK_USER_TYPE, HACK_STATUS, HACK_USER_STATUS, HTTP_HEADER, \ - FILE_TYPE, HACK_TYPE, HACKATHON_STAT, DockerHostServerStatus, HACK_NOTICE_CATEGORY, HACK_NOTICE_EVENT, \ - ORGANIZATION_TYPE, CLOUD_PROVIDER + HACK_TYPE, HACKATHON_STAT, DockerHostServerStatus, HACK_NOTICE_CATEGORY, HACK_NOTICE_EVENT, \ + CLOUD_PROVIDER from hackathon import RequiredFeature, Component, Context from hackathon.worker.hackathon_tasks import check_and_pre_allocate_expr diff --git a/open-hackathon-server/src/hackathon/views/__init__.py b/open-hackathon-server/src/hackathon/views/__init__.py index 04af9f365..230eee09d 100644 --- a/open-hackathon-server/src/hackathon/views/__init__.py +++ b/open-hackathon-server/src/hackathon/views/__init__.py @@ -2,11 +2,6 @@ """ This file is covered by the LICENSING file in the root of this project. """ - -import sys - -sys.path.append("..") - from hackathon import api from hackathon.views.resources import * diff --git a/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py b/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py index c19800e01..f948cecc1 100644 --- a/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py +++ b/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py @@ -3,12 +3,9 @@ from hackathon import RequiredFeature from hackathon.hmongo.models import Experiment, Hackathon -from hackathon.template.template_content import TemplateContent -from hackathon.cloud_providers import Provider, ProviderError from hackathon.constants import HACK_STATUS, HACKATHON_CONFIG, EStatus from . import celery_app as worker -from .expr_tasks import start_new_expr LOG = logging.getLogger(__name__)