diff --git a/open-hackathon-server/src/hackathon/expr/guacctl.sh b/deploy/guacamole/guacctl.sh similarity index 100% rename from open-hackathon-server/src/hackathon/expr/guacctl.sh rename to deploy/guacamole/guacctl.sh diff --git a/open-hackathon-server/requirements.txt b/open-hackathon-server/requirements.txt index f36305528..a9ec942f0 100644 --- a/open-hackathon-server/requirements.txt +++ b/open-hackathon-server/requirements.txt @@ -1,3 +1,4 @@ +amqp==2.5.2 aniso8601==8.0.0 APScheduler==3.0.3 attrs==19.3.0 @@ -15,7 +16,9 @@ azure-servicebus==0.20.1 azure-servicemanagement-legacy==0.20.1 azure-storage==0.20.2 Beaker==1.11.0 +billiard==3.6.3.0 cachetools==4.0.0 +celery==4.4.2 certifi==2019.11.28 chardet==3.0.4 click==7.1.1 @@ -30,6 +33,7 @@ idna==2.9 importlib-metadata==1.5.0 itsdangerous==1.1.0 Jinja2==2.11.1 +kombu==4.6.8 kubernetes==9.0.0 lxml==4.3.0 mailthon==0.1.1 @@ -62,6 +66,7 @@ subprocess32==3.5.4 tzlocal==2.0.0 urllib3==1.25.8 validictory==1.0.0 +vine==1.3.0 wcwidth==0.1.8 websocket-client==0.57.0 Werkzeug==0.15.3 diff --git a/open-hackathon-server/src/hackathon/__init__.py b/open-hackathon-server/src/hackathon/__init__.py index ab858d3ed..3d43f8ea6 100644 --- a/open-hackathon-server/src/hackathon/__init__.py +++ b/open-hackathon-server/src/hackathon/__init__.py @@ -179,9 +179,8 @@ def init_db(): def init_expr_components(): - from .expr import ExprManager, K8SExprStarter + from .expr import ExprManager factory.provide("expr_manager", ExprManager) - factory.provide("k8s_service", K8SExprStarter) def init_voice_verify(): diff --git a/open-hackathon-server/src/hackathon/cloud_providers/__init__.py b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py new file mode 100644 index 000000000..16f75ff6c --- /dev/null +++ b/open-hackathon-server/src/hackathon/cloud_providers/__init__.py @@ -0,0 +1,114 @@ +import time +import random +import string +import logging + +from hackathon.hmongo.models import K8sEnvironment, VirtualEnvironment +from hackathon.constants import VE_PROVIDER, EStatus, VEStatus, VERemoteProvider + +__all__ = ["Provider", "ProviderError"] + +LOG = logging.getLogger(__name__) + +_provider_classes = {} + + +class ProviderError(Exception): + pass + + +class Provider: + def __init__(self, provider_type, provider_cfg): + if provider_type not in _provider_classes: + raise ProviderError("{} provider not found", provider_type) + self.provider_type = provider_type + self.p_class = _provider_classes[provider_type] + self.p = self.p_class(provider_cfg) + + try: + self.p.connect() + except Exception as e: + raise ProviderError("connect Provider Platform error: {}".format(e)) + + def create_expr(self, experiment, template_content): + LOG.info("creating experiment {}".format(experiment.id)) + start_at = time.time() + experiment.status = EStatus.STARTING + experiment.save() + + try: + new_env = self.p.create_instance(self._get_ve_config(experiment, template_content)) + experiment.virtual_environments[0] = new_env + experiment.save() + + self.wait_for_ready() + except Exception as e: + LOG.error("wait for start error: {}".format(e)) + experiment.status = EStatus.FAILED + else: + experiment.virtual_environments[0].status = VEStatus.RUNNING + experiment.status = EStatus.RUNNING + experiment.save() + LOG.info("create expr spend time: {}".format(time.time() - start_at)) + + def start_expr(self, experiment): + # TODO + pass + + def pause_expr(self, experiment): + # TODO + pass + + def delete_expr(self, experiment): + LOG.info("deleting experiment {}".format(experiment.id)) + + try: + self.p.delete_instance(experiment.virtual_environments[0]) + except Exception as e: + LOG.error("delete expr error: {}".format(e)) + experiment.status = EStatus.FAILED + else: + experiment.status = EStatus.STOPPED + experiment.save() + LOG.info("experiment {} deleted".format(experiment.id)) + + def wait_for_ready(self): + self.p.wait_instance_ready() + + def get_remote_config(self): + pass + + def _get_ve_config(self, experiment, template_content): + if not experiment.virtual_environments or len(experiment.virtual_environments) == 0: + experiment.virtual_environments = [self._init_ve_config(experiment, template_content)] + experiment.save() + + return experiment.virtual_environments[0] + + def _init_ve_config(self, experiment, template_content): + if self.provider_type == VE_PROVIDER.K8S: + hackathon = experiment.hackathon + unit = template_content.unit + env_name = "{}-{}".format(hackathon.name, "".join(random.sample(string.ascii_lowercase, 6))) + yaml_content = unit.gen_k8s_yaml(env_name) + k8s_env = K8sEnvironment.load_from_yaml(env_name, yaml_content) + return VirtualEnvironment( + provider=VE_PROVIDER.K8S, + name=env_name, + status=VEStatus.INIT, + remote_provider=VERemoteProvider.Guacamole, + k8s_resource=k8s_env, + ) + raise ProviderError("not found {} instance config".format(self.provider_type)) + + +def register_provider(provider_type, provider_class): + _provider_classes[provider_type] = provider_class + + +def _register(): + from .k8s import K8sProvider + register_provider(VE_PROVIDER.K8S, K8sProvider) + + +_register() diff --git a/open-hackathon-server/src/hackathon/hk8s/k8s_service_adapter.py b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py similarity index 61% rename from open-hackathon-server/src/hackathon/hk8s/k8s_service_adapter.py rename to open-hackathon-server/src/hackathon/cloud_providers/k8s.py index 0b67dcf0b..3506afd98 100644 --- a/open-hackathon-server/src/hackathon/hk8s/k8s_service_adapter.py +++ b/open-hackathon-server/src/hackathon/cloud_providers/k8s.py @@ -1,57 +1,145 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - +import time +import logging +import yaml as yaml_tool from urllib3 import disable_warnings from urllib3.exceptions import InsecureRequestWarning, HTTPError - -import yaml as yaml_tool from kubernetes import client from kubernetes.client.rest import ApiException from hackathon.constants import HEALTH, HEALTH_STATUS, K8S_DEPLOYMENT_STATUS -from .service_adapter import ServiceAdapter -from .errors import DeploymentError, ServiceError, StatefulSetError, PVCError +from .util import BaseProvider -__all__ = ["K8SServiceAdapter"] +LOG = logging.getLogger(__name__) +DEFAULT_CONNECT_TIMEOUT = 60 +DEFAULT_RESYNC_TIME = 10 + +# 忽略自签名 ca 的告警日志 disable_warnings(InsecureRequestWarning) -class K8SServiceAdapter(ServiceAdapter): - def __init__(self, api_url, token, namespace): +class K8sError(Exception): + pass + + +class K8sResourceError(K8sError): + err_id = "" + err_msg_format = "{}: {}" + + def __init__(self, err_msg): + self._err_msg = err_msg + + @property + def err_msg(self): + return self.err_msg_format.format(self.err_id, self._err_msg) + + +class DeploymentError(K8sResourceError): + err_id = "K8s deployment error" + + +class ServiceError(K8sResourceError): + err_id = "K8s service error" + + +class StatefulSetError(K8sResourceError): + err_id = "K8s StatefulSet error" + + +class PVCError(K8sResourceError): + err_id = "K8s PersistentVolumeClaims error" + + +class K8sProvider(BaseProvider): + def __init__(self, cfg): + super(K8sProvider, self).__init__(cfg) + + self.namespace = cfg['namespace'] + self.api_url = cfg['api_url'] + self.token = cfg['token'] + configuration = client.Configuration() - configuration.host = api_url - configuration.api_key['authorization'] = 'bearer ' + token - # FIXME import ca cert file? - configuration.verify_ssl = False + configuration.host = self.api_url + configuration.api_key['authorization'] = 'bearer ' + self.token - self.namespace = namespace - self.api_url = api_url - self.api_client = client.ApiClient(configuration) - super(K8SServiceAdapter, self).__init__(self.api_client) + # TODO support ca + configuration.verify_ssl = False + self.configuration = configuration - def ping(self, timeout=20): - report = self.report_health(timeout) - return report[HEALTH.STATUS] == HEALTH_STATUS.OK + self.api_client = None - def report_health(self, timeout=20): + def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT): try: api_instance = client.CoreV1Api(self.api_client) api_instance.list_namespaced_pod(self.namespace, timeout_seconds=timeout) - return {HEALTH.STATUS: HEALTH_STATUS.OK} + return True except ApiException as e: - self.log.error(e) - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: "Get Pod info error: {}".format(e), - } + raise K8sError("Connect k8s api server error: {}".format(e)) except HTTPError: - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: "Connect K8s ApiServer {} error: connection timeout".format(self.api_url), - } + raise K8sError("Connect K8s ApiServer {} error: connection timeout".format(self.api_url)) + + def create_instance(self, ve_cfg): + ins_cfg = ve_cfg.k8s_resource + try: + for pvc in ins_cfg.persistent_volume_claims: + self.create_k8s_pvc(pvc) + + for i, s in enumerate(ins_cfg.services): + svc_name = self.create_k8s_service(s) + # overwrite service config and get the public port from K8s + ins_cfg.services[i] = yaml_tool.dump(self.get_service_by_name(svc_name)) + + for d in ins_cfg.deployments: + self.create_k8s_deployment(d) + + for s in ins_cfg.stateful_sets: + self.create_k8s_statefulset(s) + except Exception as e: + LOG.error("k8s_service_start_failed: {}".format(e)) + return ins_cfg + + def start_instance(self, ve_cfg): + # TODO + pass + + def pause_instance(self, ve_cfg): + # TODO + pass + + def delete_instance(self, ve_cfg): + ins_cfg = ve_cfg.k8s_resource + for d in ins_cfg.deployments: + self.delete_k8s_deployment(d['metadata']['name']) + + for pvc in ins_cfg.persistent_volume_claims: + self.delete_k8s_pvc(pvc['metadata']['name']) + + for s in ins_cfg.stateful_sets: + self.delete_k8s_statefulset(s['metadata']['name']) + + for s in ins_cfg.services: + self.delete_k8s_service(s['metadata']['name']) + + def wait_instance_ready(self, ve_cfg, timeout=None): + ins_cfg = ve_cfg.k8s_resource + start_at = time.time() + while True: + all_ready = True + for d in ins_cfg.deployments: + if all_ready and self.get_deployment_status(d['metadata']['name']) != K8S_DEPLOYMENT_STATUS.AVAILABLE: + all_ready = False + continue + + for s in ins_cfg.stateful_sets: + # TODO check statfulSet status + pass + + if all_ready: + return all_ready + + time.sleep(DEFAULT_RESYNC_TIME) + if timeout is not None and time.time() > start_at + timeout: + raise RuntimeError("Start deployment error: Timeout") ### # Deployment @@ -68,7 +156,7 @@ def list_deployments(self, labels=None, timeout=20): try: ret = apps_v1_group.list_namespaced_deployment(self.namespace, **kwargs) except ApiException as e: - self.log.error(e) + LOG.error(e) return [] for i in ret.items: @@ -82,7 +170,7 @@ def create_k8s_deployment(self, yaml): api_instance = client.AppsV1Api(self.api_client) if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Start a deployment without legal yaml." metadata = yaml.get("metadata", {}) deploy_name = metadata.get("name") @@ -93,7 +181,7 @@ def create_k8s_deployment(self, yaml): api_instance.create_namespaced_deployment(self.namespace, yaml, async_req=False) except ApiException as e: - self.log.error("Start deployment error: {}".format(e)) + LOG.error("Start deployment error: {}".format(e)) raise DeploymentError("Start deployment error: {}".format(e)) return deploy_name @@ -129,7 +217,7 @@ def start_k8s_deployment(self, deployment_name): return deployment_name _spec.replicas = 1 api_instance.patch_namespaced_deployment(deployment_name, self.namespace, _deploy) - self.log.info("Started existed deployment: {}".format(deployment_name)) + LOG.info("Started existed deployment: {}".format(deployment_name)) def pause_k8s_deployment(self, deployment_name): _deploy = self.get_deployment_by_name(deployment_name) @@ -139,18 +227,18 @@ def pause_k8s_deployment(self, deployment_name): try: api_instance.patch_namespaced_deployment(deployment_name, self.namespace, _deploy) except ApiException as e: - self.log.error("Pause deployment error: {}".format(e)) + LOG.error("Pause deployment error: {}".format(e)) raise DeploymentError("Pause {} error {}".format(deployment_name, e)) - self.log.info("Paused existed deployment: {}".format(deployment_name)) + LOG.info("Paused existed deployment: {}".format(deployment_name)) def delete_k8s_deployment(self, deployment_name): api_instance = client.AppsV1Api(self.api_client) try: api_instance.delete_namespaced_deployment(deployment_name, self.namespace) except ApiException as e: - self.log.error("Delete deployment error: {}".format(e)) + LOG.error("Delete deployment error: {}".format(e)) raise DeploymentError("Delete {} error {}".format(deployment_name, e)) - self.log.info("Deleted existed deployment: {}".format(deployment_name)) + LOG.info("Deleted existed deployment: {}".format(deployment_name)) ### # Service @@ -169,7 +257,7 @@ def get_service_by_name(self, service_name, need_raise=True): def create_k8s_service(self, yaml): if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Create a service without legal yaml." api_instance = client.CoreV1Api(self.api_client) @@ -177,7 +265,7 @@ def create_k8s_service(self, yaml): svc = api_instance.create_namespaced_service(self.namespace, yaml) return svc.to_dict()['metadata']['name'] except ApiException as e: - self.log.error("Create service error: {}".format(e)) + LOG.error("Create service error: {}".format(e)) raise ServiceError("Create service error: {}".format(e)) def delete_k8s_service(self, service_name): @@ -185,7 +273,7 @@ def delete_k8s_service(self, service_name): try: api_instance.delete_namespaced_service(service_name, self.namespace) except ApiException as e: - self.log.error("Delete service error: {}".format(e)) + LOG.error("Delete service error: {}".format(e)) raise ServiceError("Delete service error: {}".format(e)) ### @@ -195,14 +283,14 @@ def delete_k8s_service(self, service_name): def create_k8s_statefulset(self, yaml): if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Create a statefulset without legal yaml." api_instance = client.AppsV1Api(self.api_client) try: api_instance.create_namespaced_stateful_set(self.namespace, yaml) except ApiException as e: - self.log.error("Create StatefulSet error: {}".format(e)) + LOG.error("Create StatefulSet error: {}".format(e)) raise StatefulSetError("Create StatefulSet error: {}".format(e)) def delete_k8s_statefulset(self, statefulset_name): @@ -210,7 +298,7 @@ def delete_k8s_statefulset(self, statefulset_name): try: api_instance.delete_namespaced_stateful_set(statefulset_name, self.namespace) except ApiException as e: - self.log.error("Delete StatefulSet error: {}".format(e)) + LOG.error("Delete StatefulSet error: {}".format(e)) raise StatefulSetError("Delete StatefulSet error: {}".format(e)) ### @@ -220,14 +308,14 @@ def delete_k8s_statefulset(self, statefulset_name): def create_k8s_pvc(self, yaml): if isinstance(yaml, str) or isinstance(yaml, str): # Only support ONE deployment yaml - yaml = yaml_tool.load(yaml) + yaml = yaml_tool.safe_load(yaml) assert isinstance(yaml, dict), "Create a PVC without legal yaml." api_instance = client.CoreV1Api(self.api_client) try: api_instance.create_namespaced_persistent_volume_claim(self.namespace, yaml) except ApiException as e: - self.log.error("Create PVC error: {}".format(e)) + LOG.error("Create PVC error: {}".format(e)) raise PVCError("Create PVC error: {}".format(e)) def delete_k8s_pvc(self, pvc_name): @@ -235,5 +323,5 @@ def delete_k8s_pvc(self, pvc_name): try: api_instance.delete_namespaced_persistent_volume_claim(pvc_name, self.namespace) except ApiException as e: - self.log.error("Delete PVC error: {}".format(e)) + LOG.error("Delete PVC error: {}".format(e)) raise PVCError("Delete PVC error: {}".format(e)) diff --git a/open-hackathon-server/src/hackathon/cloud_providers/util.py b/open-hackathon-server/src/hackathon/cloud_providers/util.py new file mode 100644 index 000000000..3904ca05b --- /dev/null +++ b/open-hackathon-server/src/hackathon/cloud_providers/util.py @@ -0,0 +1,34 @@ +import abc +from collections import namedtuple + + +class BaseProvider(abc.ABC): + def __init__(self, cfg): + self.cfg = cfg + + @abc.abstractmethod + def connect(self): + pass + + @abc.abstractmethod + def create_instance(self, ve_cfg): + pass + + @abc.abstractmethod + def start_instance(self, ve_cfg): + pass + + @abc.abstractmethod + def pause_instance(self, ve_cfg): + pass + + @abc.abstractmethod + def delete_instance(self, ve_cfg): + pass + + @abc.abstractmethod + def wait_instance_ready(self, ve_cfg, timeout=None): + pass + + +Instance = namedtuple("Instance", field_names=[]) diff --git a/open-hackathon-server/src/hackathon/constants.py b/open-hackathon-server/src/hackathon/constants.py index 40ecf655b..f51a5d9bd 100644 --- a/open-hackathon-server/src/hackathon/constants.py +++ b/open-hackathon-server/src/hackathon/constants.py @@ -76,8 +76,11 @@ class HACKATHON_CONFIG: RECYCLE_MINUTES = "recycle_minutes" PRE_ALLOCATE_ENABLED = "pre_allocate_enabled" PRE_ALLOCATE_NUMBER = "pre_allocate_number" + + # TODO delete this: Use MQ and Worker to limit the number of concurrency PRE_ALLOCATE_INTERVAL_SECONDS = "pre_allocate_interval_second" PRE_ALLOCATE_CONCURRENT = "pre_allocate_concurrent" + FREEDOM_TEAM = "freedom_team" CLOUD_PROVIDER = "cloud_provider" DEV_PLAN_REQUIRED = "dev_plan_required" @@ -110,7 +113,6 @@ class VE_PROVIDER: K8S = 3 - class EStatus: """Status for db model Experiment""" INIT = 0 @@ -164,7 +166,6 @@ class ReservedUser: DefaultSuperAdmin = 1 - class ADStatus: """ For status in db model @@ -275,7 +276,6 @@ class DHS_QUERY_STATE: FAILED = 2 - class ServiceDeploymentSlot: """ the slot of service deployment @@ -288,7 +288,6 @@ class ServiceDeploymentSlot: STAGING = 'staging' - class TCPProtocol: """ the protocol of TCP layer diff --git a/open-hackathon-server/src/hackathon/docker/__init__.py b/open-hackathon-server/src/hackathon/docker/__init__.py deleted file mode 100644 index 99d0ef8a4..000000000 --- a/open-hackathon-server/src/hackathon/docker/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" diff --git a/open-hackathon-server/src/hackathon/docker/docker_formation_base.py b/open-hackathon-server/src/hackathon/docker/docker_formation_base.py deleted file mode 100644 index da741bd54..000000000 --- a/open-hackathon-server/src/hackathon/docker/docker_formation_base.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -import abc - - -class DockerFormationBase(object, metaclass=abc.ABCMeta): - @abc.abstractmethod - def start(self, unit, **kwargs): - """start a docker container""" - return - - @abc.abstractmethod - def stop(self, name, **kwargs): - """stop a docker container""" - return - - @abc.abstractmethod - def delete(self, name, **kwargs): - """delete a docker container""" - return - - @abc.abstractmethod - def report_health(self): - """report health status""" - return \ No newline at end of file diff --git a/open-hackathon-server/src/hackathon/docker/hosted_docker.py b/open-hackathon-server/src/hackathon/docker/hosted_docker.py deleted file mode 100644 index 4c5ae23c5..000000000 --- a/open-hackathon-server/src/hackathon/docker/hosted_docker.py +++ /dev/null @@ -1,249 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -import sys - -sys.path.append("..") - -from threading import Lock -import json -import requests -from datetime import timedelta - -from hackathon import RequiredFeature, Component, Context -from hackathon.hmongo.models import DockerContainer, DockerHostServer -from hackathon.constants import HEALTH, HEALTH_STATUS, HACKATHON_CONFIG, CLOUD_PROVIDER - -import collections -def flatten(x): - result = [] - for el in x: - if isinstance(x, collections.Iterable) and not isinstance(el, str): - result.extend(flatten(el)) - else: - result.append(el) - return result - -class HostedDockerFormation(Component): - hackathon_template_manager = RequiredFeature("hackathon_template_manager") - hackathon_manager = RequiredFeature("hackathon_manager") - expr_manager = RequiredFeature("expr_manager") - """ - Docker resource management based on docker remote api v1.18 - Host resource are required. - """ - application_json = {'content-type': 'application/json'} - docker_host_manager = RequiredFeature("docker_host_manager") - - def __init__(self): - self.lock = Lock() - - def report_health(self): - """Report health of DockerHostServers - - :rtype: dict - :return health status item of docker. OK when all servers running, Warning if some of them working, - Error if no server running - """ - try: - # TODO skip hackathons that are offline or ended - hosts = self.db.find_all_objects(DockerHostServer) - alive = 0 - for host in hosts: - if self.ping(host): - alive += 1 - if alive == len(hosts): - return { - HEALTH.STATUS: HEALTH_STATUS.OK - } - elif alive > 0: - return { - HEALTH.STATUS: HEALTH_STATUS.WARNING, - HEALTH.DESCRIPTION: 'at least one docker host servers are down' - } - else: - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: 'all docker host servers are down' - } - except Exception as e: - return { - HEALTH.STATUS: HEALTH_STATUS.ERROR, - HEALTH.DESCRIPTION: e.message - } - - def create_container(self, docker_host, container_config, container_name): - """ - only create a container, in this step, we cannot start a container. - :param docker_host: - :param container_config: - :param container_name: - :return: - """ - containers_url = '%s/containers/create?name=%s' % (self.__get_vm_url(docker_host), container_name) - req = requests.post(containers_url, data=json.dumps(container_config), headers=self.application_json) - self.log.debug(req.content) - # todo check the http code first - container = json.loads(req.content) - if container is None: - raise AssertionError("container is none") - return container - - def start_container(self, docker_host, container_id): - """ - start a container - :param docker_host: - :param container_id: - :return: - """ - url = '%s/containers/%s/start' % (self.__get_vm_url(docker_host), container_id) - req = requests.post(url, headers=self.application_json) - self.log.debug(req.content) - - def stop_container(self, host_server, container_name): - """ - delete a container - :param name: - :param docker_host: - :return: - """ - containers_url = '%s/containers/%s?force=1' % (self.__get_vm_url(host_server), container_name) - req = requests.delete(containers_url) - return req - - def pull_image(self, context): - # todo fix pull_image? - docker_host_id, image_name, tag = context.docker_host, context.image_name, context.tag - docker_host = self.db.find_first_object_by(DockerHostServer, id=docker_host_id) - if not docker_host: - return - pull_image_url = self.__get_vm_url(docker_host) + "/images/create?fromImage=" + image_name + '&tag=' + tag - self.log.debug(" send request to pull image:" + pull_image_url) - return requests.post(pull_image_url) - - def get_pulled_images(self, docker_host): - get_images_url = self.__get_vm_url(docker_host) + "/images/json?all=0" - current_images_info = json.loads(requests.get(get_images_url).content) # [{},{},{}] - current_images_tags = [x['RepoTags'] for x in current_images_info] # [[],[],[]] - return flatten(current_images_tags) # [ imange:tag, image:tag ] - - def ensure_images(self): - hackathons = self.hackathon_manager.get_online_hackathons() - list(map(lambda h: self.__ensure_images_for_hackathon(h), hackathons)) - - def is_container_running(self, docker_container): - """check container's running status on docker host - - if status is Running or Restarting returns True , else returns False - - :type docker_container: DockerContainer - :param docker_container: the container that you want to check - - :type: bool - :return True: the container running status is running or restarting , else returns False - - """ - docker_host = docker_container.host_server - if docker_host: - container_info = self.__get_container_info_by_container_id(docker_host, docker_container.container_id) - if container_info is None: - return False - return container_info['State']['Running'] or container_info['State']['Restarting'] - else: - return False - - def ping(self, docker_host, timeout=20): - """Ping docker host to check running status - - :type docker_host : DockerHostServer - :param docker_host: the hots that you want to check docker service running status - - :type: bool - :return: True: running status is OK, else return False - - """ - try: - ping_url = '%s/_ping' % self.__get_vm_url(docker_host) - req = requests.get(ping_url, timeout=timeout) - return req.status_code == 200 and req.content == 'OK' - except Exception as e: - self.log.error(e) - return False - - def get_containers_detail_by_ve(self, virtual_environment): - """Get all containers' detail from "Database" filtered by related virtual_environment - - :rtype: dict - :return: get the info of all containers - - """ - container = virtual_environment.docker_container - if container: - return self.util.make_serializable(container.to_mongo().to_dict()) - return {} - - def list_containers(self, docker_host, timeout=20): - """ - return: json(as list form) through "Docker restful API" - """ - containers_url = '%s/containers/json' % self.__get_vm_url(docker_host) - req = requests.get(containers_url, timeout=timeout) - self.log.debug(req.content) - return self.util.convert(json.loads(req.content)) - - def get_container_by_name(self, container_name, docker_host): - containers = self.list_containers(docker_host) - return next((c for c in containers if container_name in c["Names"] or '/' + container_name in c["Names"]), None) - - # --------------------------------------------- helper function ---------------------------------------------# - - def __get_vm_url(self, docker_host): - return 'http://%s:%d' % (docker_host.public_dns, docker_host.public_docker_api_port) - - def __get_schedule_job_id(self, hackathon): - return "pull_images_for_hackathon_%s" % hackathon.id - - def __ensure_images_for_hackathon(self, hackathon): - job_id = self.__get_schedule_job_id(hackathon) - job_exist = self.scheduler.has_job(job_id) - if hackathon.event_end_time < self.util.get_now(): - if job_exist: - self.scheduler.remove_job(job_id) - return - else: - if job_exist: - self.log.debug("job %s existed" % job_id) - else: - self.log.debug( - "adding schedule job to ensure images for hackathon %s" % hackathon.name) - next_run_time = self.util.get_now() + timedelta(seconds=3) - context = Context(hackathon_id=hackathon.id) - self.scheduler.add_interval(feature="hackathon_template_manager", - method="pull_images_for_hackathon", - id=job_id, - context=context, - next_run_time=next_run_time, - minutes=60) - - def __get_container_info_by_container_id(self, docker_host, container_id): - """get a container info by container_id from a docker host - - :param: the docker host which you want to search container from - - :type container_id: str|unicode - :param as a parameter that you want to search container though docker remote API - - :return dic object of the container info if not None - """ - try: - get_container_url = self.__get_vm_url(docker_host) + "/containers/%s/json?all=0" % container_id - req = requests.get(get_container_url) - if 300 > req.status_code >= 200: - container_info = json.loads(req.content) - return container_info - return None - except Exception as ex: - self.log.error(ex) - return None diff --git a/open-hackathon-server/src/hackathon/expr/__init__.py b/open-hackathon-server/src/hackathon/expr/__init__.py index b585885b0..dcf6412db 100644 --- a/open-hackathon-server/src/hackathon/expr/__init__.py +++ b/open-hackathon-server/src/hackathon/expr/__init__.py @@ -4,4 +4,3 @@ """ from hackathon.expr.expr_mgr import ExprManager -from hackathon.expr.k8s_expr_starter import K8SExprStarter diff --git a/open-hackathon-server/src/hackathon/expr/docker_expr_starter.py b/open-hackathon-server/src/hackathon/expr/docker_expr_starter.py deleted file mode 100644 index 46b7fbcb3..000000000 --- a/open-hackathon-server/src/hackathon/expr/docker_expr_starter.py +++ /dev/null @@ -1,106 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - - -import sys - -sys.path.append("..") - -import random -import string -import pexpect -from os.path import abspath, dirname, realpath - -from hackathon.hmongo.models import Experiment, VirtualEnvironment -from hackathon.constants import VE_PROVIDER, VEStatus, VERemoteProvider, EStatus -from hackathon.expr.expr_starter import ExprStarter - - -class DockerExprStarter(ExprStarter): - def _internal_rollback(self, context): - # currently rollback share the same process as stop - self._internal_stop_expr(context) - - def _stop_virtual_environment(self, virtual_environment, experiment, context): - pass - - def _internal_start_expr(self, context): - for unit in context.template_content.units: - try: - self.__start_virtual_environment(context, unit) - except Exception as e: - self.log.error(e) - self._on_virtual_environment_failed(context) - - def _internal_start_virtual_environment(self, context): - raise NotImplementedError() - - def _get_docker_proxy(self): - raise NotImplementedError() - - def _internal_stop_expr(self, context): - expr = Experiment.objects(id=context.experiment_id).first() - if not expr: - return - - if len(expr.virtual_environments) == 0: - expr.status = EStatus.ROLL_BACKED - expr.save() - return - - # delete containers and change expr status - for ve in expr.virtual_environments: - context = context.copy() # create new context for every virtual_environment - context.virtual_environment_name = ve.name - self._stop_virtual_environment(ve, expr, context) - - def __start_virtual_environment(self, context, docker_template_unit): - origin_name = docker_template_unit.get_name() - prefix = str(context.experiment_id)[0:9] - suffix = "".join(random.sample(string.ascii_letters + string.digits, 8)) - new_name = '%s-%s-%s' % (prefix, origin_name, suffix.lower()) - docker_template_unit.set_name(new_name) - self.log.debug("starting to start container: %s" % new_name) - - # db document for VirtualEnvironment - ve = VirtualEnvironment(provider=VE_PROVIDER.DOCKER, - name=new_name, - image=docker_template_unit.get_image_with_tag(), - status=VEStatus.INIT, - remote_provider=VERemoteProvider.Guacamole) - # create a new context for current ve only - context = context.copy() - experiment = Experiment.objects(id=context.experiment_id).no_dereference().only("virtual_environments").first() - experiment.virtual_environments.append(ve) - experiment.save() - - # start container remotely , use hosted docker - context.virtual_environment_name = ve.name - context.unit = docker_template_unit - self._internal_start_virtual_environment(context) - - def _enable_guacd_file_transfer(self, context): - """ - This function should be invoked after container is started in hosted_docker.py - :param ve: virtual environment - """ - expr = Experiment.objects(id=context.experiment_id).no_dereference().first() - virtual_env = expr.virtual_environments.get(name=context.virtual_environment_name) - remote = virtual_env.remote_paras - - p = pexpect.spawn("scp -P %s %s %s@%s:/usr/local/sbin/guacctl" % - (remote["port"], - abspath("%s/../expr/guacctl" % dirname(realpath(__file__))), - remote["username"], - remote["hostname"])) - i = p.expect([pexpect.TIMEOUT, 'yes/no', 'password: ']) - if i == 1: - p.sendline("yes") - i = p.expect([pexpect.TIMEOUT, 'password:']) - - if i != 0: - p.sendline(remote["password"]) - p.expect(pexpect.EOF) - p.close() diff --git a/open-hackathon-server/src/hackathon/expr/expr_mgr.py b/open-hackathon-server/src/hackathon/expr/expr_mgr.py index a008e9f97..74deec46c 100644 --- a/open-hackathon-server/src/hackathon/expr/expr_mgr.py +++ b/open-hackathon-server/src/hackathon/expr/expr_mgr.py @@ -2,20 +2,16 @@ """ This file is covered by the LICENSING file in the root of this project. """ - -import sys - -sys.path.append("..") from datetime import timedelta from werkzeug.exceptions import PreconditionFailed, NotFound from mongoengine import Q -from hackathon import Component, RequiredFeature, Context -from hackathon.constants import EStatus, VERemoteProvider, VE_PROVIDER, VEStatus, ReservedUser, \ - HACK_NOTICE_EVENT, HACK_NOTICE_CATEGORY, CLOUD_PROVIDER, HACKATHON_CONFIG -from hackathon.hmongo.models import Experiment, User, Hackathon, UserHackathon, Template +from hackathon import Component, RequiredFeature +from hackathon.constants import EStatus, VERemoteProvider, VE_PROVIDER, VEStatus, HACKATHON_CONFIG +from hackathon.hmongo.models import Experiment, User from hackathon.hackathon_response import not_found, ok +from hackathon.worker.expr_tasks import start_new_expr, stop_expr __all__ = ["ExprManager"] @@ -78,9 +74,8 @@ def stop_expr(self, expr_id): self.log.debug("begin to stop %s" % str(expr_id)) expr = Experiment.objects(id=expr_id).first() if expr is not None: - starter = self.get_starter(expr.hackathon, expr.template) - if starter: - starter.stop_expr(Context(experiment_id=expr.id, experiment=expr)) + hackathon = expr.hackathon + stop_expr.delay(str(hackathon.id), str(expr.id)) self.log.debug("experiment %s ended success" % expr_id) return ok('OK') else: @@ -88,22 +83,9 @@ def stop_expr(self, expr_id): def get_expr_status_and_confirm_starting(self, expr_id): expr = Experiment.objects(id=expr_id).first() - if expr: - return self.__report_expr_status(expr, isToConfirmExprStarting=True) - else: + if not expr: return not_found('Experiment Not found') - - def check_expr_status(self, experiment): - # update experiment status - virtual_environment_list = experiment.virtual_environments - if all(x.status == VEStatus.RUNNING for x in virtual_environment_list) \ - and len(virtual_environment_list) == experiment.template.virtual_environment_count: - experiment.status = EStatus.RUNNING - experiment.save() - try: - self.template_library.template_verified(experiment.template.id) - except: - pass + return self.__report_expr_status(expr, isToConfirmExprStarting=True) def get_expr_list_by_hackathon_id(self, hackathon, context): # get a list of all experiments' detail @@ -147,40 +129,6 @@ def scheduler_recycle_expr(self): except Exception as e: self.log.error(e) - def pre_allocate_expr(self, context): - # TODO: too complex, not check - hackathon_id = context.hackathon_id - self.log.debug("executing pre_allocate_expr for hackathon %s " % hackathon_id) - hackathon = Hackathon.objects(id=hackathon_id).first() - hackathon_templates = hackathon.templates - for template in hackathon_templates: - try: - template = template - pre_num = int(hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_NUMBER, 1)) - query = Q(status=EStatus.STARTING) | Q(status=EStatus.RUNNING) - curr_num = Experiment.objects(user=None, hackathon=hackathon, template=template).filter(query).count() - self.log.debug("pre_alloc_exprs: pre_num is %d, curr_num is %d, remain_num is %d " % - (pre_num, curr_num, pre_num - curr_num)) - - # TODO Should support VE_PROVIDER.K8S only in future after k8s Template is supported - # if template.provider == VE_PROVIDER.K8S: - if curr_num < pre_num: - start_num = Experiment.objects(user=None, template=template, status=EStatus.STARTING).count() - allowed_currency = int(hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_CONCURRENT, 1)) - if start_num >= allowed_currency: - self.log.debug( - "there are already %d Experiments starting, will check later ... " % allowed_currency) - return - else: - remain_num = min(allowed_currency, pre_num) - start_num - self.log.debug( - "no starting template: %s , remain num is %d ... " % (template.name, remain_num)) - self.start_pre_alloc_exprs(None, template.name, hackathon.name, remain_num) - break - except Exception as e: - self.log.error(e) - self.log.error("check default experiment failed") - def assign_expr_to_admin(self, expr): """assign expr to admin to trun expr into pre_allocate_expr @@ -210,68 +158,40 @@ def __verify_hackathon(self, hackathon_name): else: raise NotFound("Hackathon with name %s not found" % hackathon_name) - def get_starter(self, hackathon, template): - # load expr starter - starter = None - if not hackathon or not template: - return starter - - # TODO Interim workaround for kubernetes, need real implementation - if hackathon.config.get('cloud_provider') == CLOUD_PROVIDER.KUBERNETES: - return RequiredFeature("k8s_service") - - if template.provider == VE_PROVIDER.DOCKER: - raise NotImplementedError() - elif template.provider == VE_PROVIDER.AZURE: - raise NotImplementedError() - elif template.provider == VE_PROVIDER.K8S: - starter = RequiredFeature("k8s_service") - - return starter - def __start_new_expr(self, hackathon, template, user): - starter = self.get_starter(hackathon, template) - - if not starter: - raise PreconditionFailed("either template not supported or hackathon resource not configured") - - context = starter.start_expr(Context( + expr = Experiment( + status=EStatus.INIT, template=template, user=user, - hackathon=hackathon, - pre_alloc_enabled=False)) + virtual_environments=[], + hackathon=hackathon) + expr.save() - return self.__report_expr_status(context.experiment) + template_content = self.template_library.load_template(template) + start_new_expr.delay(str(hackathon.id), str(expr.id), template_content) - def start_pre_alloc_exprs(self, user, template_name, hackathon_name=None, pre_alloc_num=0): + return self.__report_expr_status(expr) + + def start_pre_alloc_exprs(self, template_name, hackathon_name=None, pre_alloc_num=0): self.log.debug("start_pre_alloc_exprs: %d " % pre_alloc_num) + try_create = 0 if pre_alloc_num == 0: - return + return try_create hackathon = self.__verify_hackathon(hackathon_name) template = self.__verify_template(hackathon, template_name) + template_content = self.template_library.load_template(template) - starter = self.get_starter(hackathon, template) - if not starter: - raise PreconditionFailed("either template not supported or hackathon resource not configured") - - while pre_alloc_num > 0: - context = starter.start_expr(Context( - template=template, - user=user, - hackathon=hackathon, - pre_alloc_enabled=True)) - - if context == None: - self.log.debug("pre_alloc_num left: %d " % pre_alloc_num) - break - else: - self.__report_expr_status(context.experiment) - pre_alloc_num -= 1 - - def on_expr_started(self, experiment): - hackathon = experiment.hackathon - user = experiment.user + while pre_alloc_num > try_create: + expr = Experiment(status=EStatus.INIT, + template=template, + virtual_environments=[], + hackathon=hackathon) + expr.save() + start_new_expr.delay(str(hackathon.id), str(expr.id), template_content) + try_create += 1 + self.log.debug("start_pre_alloc_exprs: finish") + return try_create def __report_expr_status(self, expr, isToConfirmExprStarting=False): # todo check whether need to restart Window-expr if it shutdown @@ -399,12 +319,8 @@ def roll_back(self, expr_id): self.log.warn("rollback failed due to experiment not found") return - starter = self.get_starter(expr.hackathon, expr.template) - if not starter: - self.log.warn("rollback failed due to no starter found") - return - - return starter.rollback(Context(experiment=expr)) + hackathon = expr.hackathon + stop_expr.delay(str(hackathon.id), str(expr.id)) def __get_expr_with_detail(self, experiment): self.__check_expr_real_status(experiment) diff --git a/open-hackathon-server/src/hackathon/expr/expr_starter.py b/open-hackathon-server/src/hackathon/expr/expr_starter.py deleted file mode 100644 index 1d7e60236..000000000 --- a/open-hackathon-server/src/hackathon/expr/expr_starter.py +++ /dev/null @@ -1,117 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -import sys - -sys.path.append("..") - -from hackathon import Component, RequiredFeature, Context -from hackathon.hmongo.models import Experiment -from hackathon.constants import EStatus, VEStatus - -__all__ = ["ExprStarter"] - - -class ExprStarter(Component): - """Base for experiment starter""" - - template_library = RequiredFeature("template_library") - - def start_expr(self, context): - """To start a new Experiment asynchronously - - :type context: Context - :param context: the execution context. - - """ - expr = Experiment(status=EStatus.INIT, - template=context.template, - user=context.user, - virtual_environments=[], - hackathon=context.hackathon) - expr.save() - - template_content = self.template_library.load_template(context.template) - expr.status = EStatus.STARTING - expr.save() - - # context contains complex object, we need create another serializable one with only simple fields - new_context = Context(template_content=template_content, - template_name=context.template.name, - hackathon_id=context.hackathon.id, - experiment_id=expr.id, - pre_alloc_enabled = context.pre_alloc_enabled) - if context.get("user", None): - new_context.user_id = context.user.id - if self._internal_start_expr(new_context): - new_context.experiment = expr - return new_context - self.rollback(new_context) - return None - - def stop_expr(self, context): - """Stop experiment asynchronously - - :type context: Context - :param context: the execution context. - - """ - return self._internal_stop_expr(context) - - def rollback(self, context): - """cancel/rollback a expr which is in error state - - :type context: Context - :param context: the execution context. - - """ - return self._internal_rollback(context) - - def _internal_start_expr(self, context): - raise NotImplementedError() - - def _internal_stop_expr(self, context): - raise NotImplementedError() - - def _internal_rollback(self, context): - raise NotImplementedError() - - def _on_virtual_environment_failed(self, context): - self.rollback(context) - - def _on_virtual_environment_success(self, context): - expr = Experiment.objects(id=context.experiment_id).no_dereference() \ - .only("status", "virtual_environments").first() - if all(ve.status == VEStatus.RUNNING for ve in expr.virtual_environments): - expr.status = EStatus.RUNNING - expr.save() - self._on_expr_started(context) - - self._hooks_on_virtual_environment_success(context) - - def _on_virtual_environment_stopped(self, context): - expr = Experiment.objects(id=context.experiment_id).no_dereference() \ - .only("status", "virtual_environments").first() - ve = expr.virtual_environments.get(name=context.virtual_environment_name) - ve.status = VEStatus.STOPPED - - if all(ve.status == VEStatus.STOPPED for ve in expr.virtual_environments): - expr.status = EStatus.STOPPED - expr.save() - - def _on_virtual_environment_unexpected_error(self, context): - self.log.warn("experiment unexpected error: " + context.experiment_id) - expr = Experiment.objects(id=context.experiment_id).no_dereference() \ - .only("status", "virtual_environments").first() - if "virtual_environment_name" in context: - expr.virtual_environments.get(name=context.virtual_environment_name).status = VEStatus.UNEXPECTED_ERROR - expr.save() - - def _hooks_on_virtual_environment_success(self, context): - pass - - def _on_expr_started(self, context): - # send notice - pass diff --git a/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py b/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py deleted file mode 100644 index 8b9d1de02..000000000 --- a/open-hackathon-server/src/hackathon/expr/k8s_expr_starter.py +++ /dev/null @@ -1,334 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" -import yaml -import time -import string -import random - -from hackathon.expr.expr_starter import ExprStarter -from hackathon.hmongo.models import K8sEnvironment -from hackathon.hmongo.models import Hackathon, VirtualEnvironment, Experiment -from hackathon.constants import (VE_PROVIDER, VERemoteProvider, VEStatus, EStatus) -from hackathon.hackathon_response import internal_server_error -from hackathon.constants import K8S_DEPLOYMENT_STATUS -from hackathon.template.template_constants import K8S_UNIT -from hackathon.hk8s.k8s_service_adapter import K8SServiceAdapter - - -class K8SExprStarter(ExprStarter): - def _internal_start_expr(self, context): - hackathon = Hackathon.objects.get(id=context.hackathon_id) - experiment = Experiment.objects.get(id=context.experiment_id) - template_content = context.template_content - - if not experiment or not hackathon: - return internal_server_error('Failed starting k8s: experiment or hackathon not found.') - - user = experiment.user or None - _virtual_envs = [] - _env_name = str(hackathon.name + "-" + template_content.name).lower() - if user: - _virtual_envs = experiment.virtual_environments - _env_name += str("-" + user.name).lower() - _env_name = "{}-{}".format(_env_name, "".join(random.sample(string.lowercase, 6))) - - try: - if not _virtual_envs: - # Get None VirtualEnvironment, create new one: - labels = { - "hacking.kaiyuanshe.cn/hackathon": str(hackathon.id), - "hacking.kaiyuanshe.cn/experiment": str(experiment.id), - "hacking.kaiyuanshe.cn/virtual_environment": _env_name, - } - k8s_env = self.__create_useful_k8s_resource(_env_name, template_content, labels) - - experiment.virtual_environments.append(VirtualEnvironment( - provider=VE_PROVIDER.K8S, - name=_env_name, - k8s_resource=k8s_env, - status=VEStatus.INIT, - remote_provider=VERemoteProvider.Guacamole)) - - experiment.status = EStatus.INIT - experiment.save() - self.log.debug("virtual_environments %s created, creating k8s..." % _env_name) - self.__schedule_start(context) - except Exception as e: - self.log.error(e) - experiment.status = EStatus.FAILED - experiment.save() - return False - - return True - - def _internal_stop_expr(self, context): - experiment = Experiment.objects.get(id=context.experiment_id) - if not experiment: - return internal_server_error('Failed stop k8s: experiment not found.') - try: - self.__schedule_stop(context) - except Exception as e: - self.log.error(e) - experiment.status = EStatus.FAILED - experiment.save() - return internal_server_error('Failed stopping k8s') - - def _internal_rollback(self, context): - self.__schedule_stop(context) - - def __schedule_start(self, ctx): - self.scheduler.add_once("k8s_service", "schedule_start_k8s_service", context=ctx, - id="schedule_setup_" + str(ctx.experiment_id), seconds=0) - - def __schedule_stop(self, ctx): - self.scheduler.add_once("k8s_service", "schedule_stop_k8s_service", context=ctx, - id="schedule_stop_" + str(ctx.experiment_id), seconds=0) - - def schedule_start_k8s_service(self, context): - experiment = Experiment.objects.get(id=context.experiment_id) - virtual_env = experiment.virtual_environments[0] - k8s_resource = virtual_env.k8s_resource - adapter = self.__get_adapter_from_ctx(K8SServiceAdapter, context) - - try: - for pvc in k8s_resource.persistent_volume_claims: - adapter.create_k8s_pvc(pvc) - - for i, s in enumerate(k8s_resource.services): - svc_name = adapter.create_k8s_service(s) - # overwrite service config and get the public port from K8s - k8s_resource.services[i] = yaml.dump(adapter.get_service_by_name(svc_name)) - - for d in k8s_resource.deployments: - adapter.create_k8s_deployment(d) - - for s in k8s_resource.stateful_sets: - adapter.create_k8s_statefulset(s) - - self.__wait_for_k8s_ready(adapter, k8s_resource.deployments, k8s_resource.stateful_sets) - self.__config_endpoint(experiment, k8s_resource.services) - except Exception as e: - self.log.error("k8s_service_start_failed: {}".format(e)) - - def schedule_stop_k8s_service(self, context): - experiment = Experiment.objects.get(id=context.experiment_id) - virtual_envs = experiment.virtual_environments - adapter = self.__get_adapter_from_ctx(K8SServiceAdapter, context) - try: - for virtual_env in virtual_envs: - for d in virtual_env.deployments: - adapter.delete_k8s_deployment(d['metadata']['name']) - - for pvc in virtual_env.persistent_volume_claims: - adapter.delete_k8s_pvc(pvc['metadata']['name']) - - for s in virtual_env.stateful_sets: - adapter.delete_k8s_statefulset(s['metadata']['name']) - - for s in virtual_env.services: - adapter.delete_k8s_service(s['metadata']['name']) - - self.log.debug("k8s_service_stop: {}".format(context)) - except Exception as e: - self.log.error("k8s_service_stop_failed: {}".format(e)) - experiment.delete() - - @staticmethod - def __create_useful_k8s_resource(env_name, template_content, labels): - """ helper func to generate available and unique resources yaml - - Currently supported resources - - Deployment - - Service - - StatefulSet - - PersistentVolumeClaim - - :param env_name: - :param template_content: - :param labels: - :return: - """ - - k8s_env = K8sEnvironment( - name=env_name, - deployments=[ - yaml.dump(TemplateRender(env_name, "deployment", d, labels).render()) - for d in template_content.get_resource("deployment") - ], - services=[ - yaml.dump(TemplateRender(env_name, "service", s, labels).render()) - for s in template_content.get_resource("service") - ], - statefulsets=[ - yaml.dump(TemplateRender(env_name, "statefulset", s, labels).render()) - for s in template_content.get_resource("statefulset") - ], - persistent_volume_claims=[ - yaml.dump(TemplateRender(env_name, "statefulset", p, labels).render()) - for p in template_content.get_resource("persistentvolumeclaim") - ], - ) - - return k8s_env - - @staticmethod - def __wait_for_k8s_ready(adapter, deployments, stateful_sets): - # TODO Sleep for ready is NOT GOOD IDEA, Why not use watch api? - # Wait up to 30 minutes - end_time = int(time.time()) + 60 * 60 * 30 - - for d_yaml in deployments: - d = yaml.load(d_yaml) - while adapter.get_deployment_status(d['metadata']['name']) != K8S_DEPLOYMENT_STATUS.AVAILABLE: - time.sleep(1) - if int(time.time()) > end_time: - raise RuntimeError("Start deployment error: Timeout") - - for s in stateful_sets: - # TODO check statfulSet status - pass - - def __config_endpoint(self, expr, services): - self.log.debug("experiment started %s successfully. Setting remote parameters." % expr.id) - # set experiment status - # update the status of virtual environment - virtual_env = expr.virtual_environments[0] - template = expr.template - cluster = template.k8s_cluster - ingress = cluster.ingress - if not ingress or not services: - self.log.info("Has no endpoint config") - return - assert isinstance(ingress, list) - svc = None - for s_yaml in services: - s = yaml.load(s_yaml) - if s['spec'].get("type") == "NodePort": - svc = s - break - if not svc: - return - - ports = svc['spec'].get("ports", []) - if not ports: - self.log.info("Has no endpoint config") - return - public_port = ports[0].get("node_port") - if not public_port: - return - - gc = { - K8S_UNIT.REMOTE_PARAMETER_NAME: virtual_env.name, - K8S_UNIT.REMOTE_PARAMETER_DISPLAY_NAME: svc['metadata']['name'], - K8S_UNIT.REMOTE_PARAMETER_HOST_NAME: random.choice(ingress), - K8S_UNIT.REMOTE_PARAMETER_PROTOCOL: "vnc", - K8S_UNIT.REMOTE_PARAMETER_PORT: public_port, - # K8S_UNIT.REMOTE_PARAMETER_USER_NAME: "", - # K8S_UNIT.REMOTE_PARAMETER_PASSWORD: "", - } - self.log.debug("expriment %s remote parameters: %s" % (expr.id, str(gc))) - virtual_env.remote_paras = gc - - virtual_env.status = VEStatus.RUNNING - expr.status = EStatus.RUNNING - expr.save() - - @staticmethod - def __get_adapter_from_ctx(adapter_class, context): - template_content = context.template_content - cluster = template_content.cluster_info - return adapter_class(cluster.api_url, cluster.token, cluster.namespace) - - -class TemplateRender: - """ - Types: - - Deployment - - Service - - StatefulSet - - PersistentVolumeClaim - """ - - def __init__(self, resource_name, resource_type, yml, labels): - self.resource_name = resource_name - self.resource_type = str(resource_type).lower() - self.yaml = yml - self.labels = labels - - def render(self): - - if self.resource_type == "deployment": - return self.__render_deploy() - - if self.resource_type == "service": - return self.__render_svc() - - if self.resource_type == "statefulset": - return self.__render_stateful_set() - - if self.resource_type == "persistentvolumeclaim": - return self.__render_pvc() - - def __render_deploy(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - deploy_labels = metadata.get("labels") or {} - deploy_labels.update(self.labels) - metadata['labels'] = deploy_labels - - spec = self.yaml['spec'] - pod_template = spec['template'] - pod_metadata = pod_template['metadata'] - pod_labels = pod_metadata.get("labels") or {} - pod_labels.update(self.labels) - pod_metadata['labels'] = pod_labels - - if "selector" in spec: - match_labels = spec['selector'].get("matchLabels") or {} - match_labels.update(self.labels) - spec['selector']['matchLabels'] = match_labels - - # Make sure PVC is no conflict - if "volumes" in spec: - for v in spec['volumes']: - if "persistentVolumeClaim" not in v: - continue - pvc = v['persistentVolumeClaim'] - pvc['claimName'] = "{}-{}".format(self.resource_name, pvc['claimName']) - return self.yaml - - def __render_svc(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - svc_labels = metadata.get("labels") or {} - svc_labels.update(self.labels) - metadata['labels'] = svc_labels - - spec = self.yaml['spec'] - label_selector = spec.get("selector") or {} - label_selector.update(self.labels) - return self.yaml - - def __render_stateful_set(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - ss_labels = metadata.get("labels") or {} - ss_labels.update(self.labels) - metadata['labels'] = ss_labels - - spec = self.yaml['spec'] - if "selector" in spec: - match_labels = spec['selector'].get("matchLabels") or {} - match_labels.update(self.labels) - spec['selector']['matchLabels'] = match_labels - return self.yaml - - def __render_pvc(self): - metadata = self.yaml['metadata'] - metadata["name"] = "{}-{}".format(self.resource_name, metadata["name"]) - pvc_labels = metadata.get("labels") or {} - pvc_labels.update(self.labels) - metadata['labels'] = pvc_labels - return self.yaml diff --git a/open-hackathon-server/src/hackathon/hack/hackathon_manager.py b/open-hackathon-server/src/hackathon/hack/hackathon_manager.py index 4b7f9dbdc..5f9fa2a7d 100644 --- a/open-hackathon-server/src/hackathon/hack/hackathon_manager.py +++ b/open-hackathon-server/src/hackathon/hack/hackathon_manager.py @@ -2,15 +2,10 @@ """ This file is covered by the LICENSING file in the root of this project. """ - -import random -import sys - -sys.path.append("..") import uuid from datetime import timedelta -from werkzeug.exceptions import PreconditionFailed, InternalServerError, BadRequest +from werkzeug.exceptions import PreconditionFailed, InternalServerError from flask import g, request import lxml from lxml.html.clean import Cleaner @@ -20,9 +15,10 @@ Organization, Award, Team from hackathon.hackathon_response import internal_server_error, ok, not_found, general_error, HTTP_CODE, bad_request from hackathon.constants import HACKATHON_CONFIG, HACK_USER_TYPE, HACK_STATUS, HACK_USER_STATUS, HTTP_HEADER, \ - FILE_TYPE, HACK_TYPE, HACKATHON_STAT, DockerHostServerStatus, HACK_NOTICE_CATEGORY, HACK_NOTICE_EVENT, \ - ORGANIZATION_TYPE, CLOUD_PROVIDER + HACK_TYPE, HACKATHON_STAT, DockerHostServerStatus, HACK_NOTICE_CATEGORY, HACK_NOTICE_EVENT, \ + CLOUD_PROVIDER from hackathon import RequiredFeature, Component, Context +from hackathon.worker.hackathon_tasks import check_and_pre_allocate_expr docker_host_manager = RequiredFeature("docker_host_manager") __all__ = ["HackathonManager"] @@ -272,7 +268,7 @@ def update_hackathon(self, args): # basic xss prevention if 'description' in update_items and update_items['description']: - #update_items['description'] = self.cleaner.clean_html(update_items['description']) + # update_items['description'] = self.cleaner.clean_html(update_items['description']) self.log.debug("hackathon description :" + update_items['description']) hackathon.modify(**update_items) @@ -653,13 +649,13 @@ def get_hackathon_notice_list(self, body): is_read_filter = Q() order_by_condition = '-update_time' - if hackathon_name: #list notices that belong to specfic hackathon + if hackathon_name: # list notices that belong to specfic hackathon hackathon = Hackathon.objects(name=hackathon_name).only('name').first() if hackathon: hackathon_filter = Q(hackathon=hackathon) else: return not_found('hackathon_name not found') - else: #only list online hackathons' notices or notices that not belong to any hackathon + else: # only list online hackathons' notices or notices that not belong to any hackathon online_hackathon = Hackathon.objects(status=HACK_STATUS.ONLINE) hackathon_filter = Q(hackathon__in=online_hackathon) | Q(hackathon=None) @@ -726,47 +722,13 @@ def schedule_pre_allocate_expr_job(self): next_run_time=next_run_time, minutes=20) - def __is_pre_allocate_enabled(self, hackathon): - if hackathon.event_end_time < self.util.get_now(): - return False - # using registration time for better test before event_start_time - if hackathon.registration_start_time > self.util.get_now(): - return False - if hackathon.status != HACK_STATUS.ONLINE: - return False - if hackathon.config.get(HACKATHON_CONFIG.CLOUD_PROVIDER, CLOUD_PROVIDER.NONE) == CLOUD_PROVIDER.NONE: - return False - return hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_ENABLED, False) - def check_hackathon_for_pre_allocate_expr(self): """Check all hackathon for pre-allocate Add an interval job for hackathon if it's pre-allocate is enabled. Otherwise try to remove the schedule job """ - hackathon_list = Hackathon.objects() - for hack in hackathon_list: - job_id = "pre_allocate_expr_" + str(hack.id) - is_job_exists = self.scheduler.has_job(job_id) - if self.__is_pre_allocate_enabled(hack): - if is_job_exists: - self.log.debug("pre_allocate job already exists for hackathon %s" % str(hack.name)) - continue - - self.log.debug("add pre_allocate job for hackathon %s" % str(hack.name)) - next_run_time = self.util.get_now() + timedelta(seconds=(20 * random.random())) - pre_allocate_interval = self.__get_pre_allocate_interval(hack) - self.scheduler.add_interval(feature="expr_manager", - method="pre_allocate_expr", - id=job_id, - context=Context(hackathon_id=hack.id), - next_run_time=next_run_time, - seconds=pre_allocate_interval - ) - elif is_job_exists: - self.log.debug("remove job for hackathon %s since pre_allocate is disabled" % str(hack.id)) - self.scheduler.remove_job(job_id) - return True + check_and_pre_allocate_expr.delay() def hackathon_online(self, hackathon): req = ok() @@ -825,7 +787,7 @@ def __get_hackathon_detail(self, hackathon, user=None): detail["user"] = self.user_manager.user_display_info(user) detail["user"]["is_admin"] = user.is_super or ( - user_hackathon and user_hackathon.role == HACK_USER_TYPE.ADMIN) + user_hackathon and user_hackathon.role == HACK_USER_TYPE.ADMIN) # TODO: we need to review those items one by one to decide the API output # asset = self.db.find_all_objects_by(UserHackathonAsset, user_id=user.id, hackathon_id=hackathon.id) @@ -928,28 +890,6 @@ def __create_hackathon(self, creator, context): return new_hack - def __get_pre_allocate_interval(self, hackathon): - interval = self.get_basic_property(hackathon, HACKATHON_CONFIG.PRE_ALLOCATE_INTERVAL_SECONDS) - if interval: - return int(interval) - else: - return 300 + random.random() * 50 - - def __get_hackathon_configs(self, hackathon): - - def __internal_get_config(): - configs = {} - for c in hackathon.configs.all(): - configs[c.key] = c.value - return configs - - cache_key = self.__get_config_cache_key(hackathon) - return self.cache.get_cache(key=cache_key, createfunc=__internal_get_config) - - def __get_hackathon_organizers(self, hackathon): - organizers = self.db.find_all_objects_by(HackathonOrganizer, hackathon_id=hackathon.id) - return [o.dic() for o in organizers] - def __parse_update_items(self, args, hackathon): """Parse properties that need to update diff --git a/open-hackathon-server/src/hackathon/hk8s/__init__.py b/open-hackathon-server/src/hackathon/hk8s/__init__.py deleted file mode 100644 index e9378c2b6..000000000 --- a/open-hackathon-server/src/hackathon/hk8s/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -__author__ = "KaiYuanShe" - -from kubernetes import client, config, utils diff --git a/open-hackathon-server/src/hackathon/hk8s/errors.py b/open-hackathon-server/src/hackathon/hk8s/errors.py deleted file mode 100644 index 329c2d5b6..000000000 --- a/open-hackathon-server/src/hackathon/hk8s/errors.py +++ /dev/null @@ -1,34 +0,0 @@ -class K8sResourceError(Exception): - err_id = "" - err_msg_format = "{}: {}" - - def __init__(self, err_msg): - self._err_msg = err_msg - - @property - def err_msg(self): - return self.err_msg_format.format(self.err_id, self._err_msg) - - -class YmlParseError(K8sResourceError): - err_id = "K8s Yaml parse error" - - -class EnvError(K8sResourceError): - err_id = "K8s environment error" - - -class DeploymentError(EnvError): - err_id = "K8s deployment error" - - -class ServiceError(EnvError): - err_id = "K8s service error" - - -class StatefulSetError(EnvError): - err_id = "K8s StatefulSet error" - - -class PVCError(EnvError): - err_id = "K8s PersistentVolumeClaims error" diff --git a/open-hackathon-server/src/hackathon/hk8s/service_adapter.py b/open-hackathon-server/src/hackathon/hk8s/service_adapter.py deleted file mode 100644 index db87c56e4..000000000 --- a/open-hackathon-server/src/hackathon/hk8s/service_adapter.py +++ /dev/null @@ -1,23 +0,0 @@ -# -*- coding: utf-8 -*- -""" -This file is covered by the LICENSING file in the root of this project. -""" - -__author__ = "rapidhere" -__all__ = ["ServiceAdapter"] - -from hackathon import Component - - -class ServiceAdapter(Component): - """the abstract ServiceAdapter with proxy pattern - - this adapter delegate the method and properties to the inner proxy, so make - the adpater has all functions that adaptee has - """ - - def __init__(self, service): - self.service = service - - def __getattr__(self, name): - return getattr(self.service, name) diff --git a/open-hackathon-server/src/hackathon/hmongo/models.py b/open-hackathon-server/src/hackathon/hmongo/models.py index e2ea4efaa..89f0a2895 100644 --- a/open-hackathon-server/src/hackathon/hmongo/models.py +++ b/open-hackathon-server/src/hackathon/hmongo/models.py @@ -3,13 +3,15 @@ This file is covered by the LICENSING file in the root of this project. """ +import yaml import hashlib +from datetime import datetime from mongoengine import QuerySet, DateTimeField, DynamicDocument, EmbeddedDocument, StringField, \ BooleanField, IntField, DynamicEmbeddedDocument, EmbeddedDocumentListField, URLField, ListField, \ EmbeddedDocumentField, ReferenceField, UUIDField, DictField, DynamicField, PULL from hackathon.util import get_now, make_serializable -from hackathon.constants import TEMPLATE_STATUS, HACK_USER_TYPE, VE_PROVIDER +from hackathon.constants import TEMPLATE_STATUS, HACK_USER_TYPE, VE_PROVIDER, HACK_STATUS, HACKATHON_CONFIG from hackathon.hmongo.pagination import Pagination from hackathon import app @@ -264,7 +266,7 @@ class Hackathon(HDocumentBase): location = StringField() description = StringField() banners = ListField() - status = IntField(default=0) # 0-new 1-online 2-offline 3-apply-online + status = IntField(default=HACK_STATUS.DRAFT) # 0-new 1-online 2-offline 3-apply-online creator = ReferenceField(User) config = DictField() # max_enrollment, auto_approve, login_provider type = IntField(default=1) # enum.HACK_TYPE @@ -285,6 +287,23 @@ class Hackathon(HDocumentBase): def __init__(self, **kwargs): super(Hackathon, self).__init__(**kwargs) + @property + def in_progress(self): + now = datetime.utcnow() + if self.event_end_time < now: + return False + if self.registration_start_time > now: + return False + if self.status != HACK_STATUS.ONLINE: + return False + return True + + @property + def enable_pre_allocate(self): + if not self.in_progress: + return False + return self.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_ENABLED, False) + class UserHackathon(HDocumentBase): user = ReferenceField(User) @@ -466,6 +485,35 @@ class K8sEnvironment(DynamicEmbeddedDocument): services = ListField() stateful_sets = ListField() + @classmethod + def load_from_yaml(cls, name, content): + deployments = [] + services = [] + + # TODO + persistent_volume_claims = [] + stateful_sets = [] + + resources = yaml.safe_load_all(content) + + for r in resources: + if 'Kind' not in r: + continue + + if r['Kind'] == "Deployment": + deployments.append(r) + + if r['Kind'] == "Service": + services.append(r) + + return cls( + name=name, + deployments=deployments, + persistent_volume_claims=persistent_volume_claims, + services=services, + stateful_sets=stateful_sets, + ) + class VirtualEnvironment(DynamicEmbeddedDocument): """ diff --git a/open-hackathon-server/src/hackathon/template/docker_template_unit.py b/open-hackathon-server/src/hackathon/template/docker_template_unit.py index f4a4b0d72..da360932f 100644 --- a/open-hackathon-server/src/hackathon/template/docker_template_unit.py +++ b/open-hackathon-server/src/hackathon/template/docker_template_unit.py @@ -60,7 +60,9 @@ class DockerTemplateUnit(TemplateUnit): """ def __init__(self, config): - super(DockerTemplateUnit, self).__init__(VE_PROVIDER.DOCKER) + # Conversion provider type to K8s + # all docker container will be created on K8s + super(DockerTemplateUnit, self).__init__(VE_PROVIDER.K8S) self.image = DOCKER_UNIT.IMAGE net_configs = config.get(DOCKER_UNIT.NET_CONFIG, []) diff --git a/open-hackathon-server/src/hackathon/template/template_content.py b/open-hackathon-server/src/hackathon/template/template_content.py index 2f0189c47..0da51b049 100644 --- a/open-hackathon-server/src/hackathon/template/template_content.py +++ b/open-hackathon-server/src/hackathon/template/template_content.py @@ -22,9 +22,10 @@ class TemplateContent: def __init__(self, name, description, environment_config): self.name = name self.description = description - self.environment = self.__load_environment(environment_config) + self.unit = self.__load_unit(environment_config) - self.provider = self.environment.provider + self.provider = self.unit.provider + self.provider_cfg = {} # TODO delete self.resource = defaultdict(list) @@ -34,31 +35,33 @@ def __init__(self, name, description, environment_config): @classmethod def load_from_template(cls, template): env_cfg = template.unit_config() - return TemplateContent(template.name, template.description, env_cfg) + provider_cfg = template.k8s_cluster + content = TemplateContent(template.name, template.description, env_cfg) + content.provider_cfg.update(provider_cfg.dic) @property def docker_image(self): if self.provider != VE_PROVIDER.DOCKER: return "" - return self.environment.image + return self.unit.image @property def network_configs(self): if self.provider != VE_PROVIDER.DOCKER: return [] - return self.environment.network_configs + return self.unit.network_configs @property def yml_template(self): if self.provider != VE_PROVIDER.K8S: return "" - return self.environment.yml_template + return self.unit.yml_template @property def template_args(self): if self.provider != VE_PROVIDER.K8S: return {} - return self.environment.template_args + return self.unit.template_args def is_valid(self): if self.provider is None: @@ -68,10 +71,10 @@ def is_valid(self): return True if self.provider == VE_PROVIDER.K8S: - return self.environment.is_valid() is True + return self.unit.is_valid() is True @classmethod - def __load_environment(cls, environment_config): + def __load_unit(cls, environment_config): provider = int(environment_config[TEMPLATE.VIRTUAL_ENVIRONMENT_PROVIDER]) if provider == VE_PROVIDER.DOCKER: return DockerTemplateUnit(environment_config) @@ -80,11 +83,6 @@ def __load_environment(cls, environment_config): else: raise Exception("unsupported virtual environment provider") - # todo delete - def get_resource(self, resource_type): - # always return a list of resource desc dict or empty - return self.resource[resource_type] - # FIXME deprecated this when support K8s ONLY def to_dict(self): dic = { diff --git a/open-hackathon-server/src/hackathon/template/template_unit.py b/open-hackathon-server/src/hackathon/template/template_unit.py index 883b56eb0..93fb338ca 100644 --- a/open-hackathon-server/src/hackathon/template/template_unit.py +++ b/open-hackathon-server/src/hackathon/template/template_unit.py @@ -3,8 +3,10 @@ This file is covered by the LICENSING file in the root of this project. """ +import abc -class TemplateUnit(object): + +class TemplateUnit(abc.ABC): """Unit of TemplateContent. Each unit represents a virtual_environment that can be started or stopped independently""" @@ -16,5 +18,6 @@ def __init__(self, provider): """ self.provider = provider + @abc.abstractmethod def gen_k8s_yaml(self, expr_name): raise NotImplemented diff --git a/open-hackathon-server/src/hackathon/views/__init__.py b/open-hackathon-server/src/hackathon/views/__init__.py index 04af9f365..230eee09d 100644 --- a/open-hackathon-server/src/hackathon/views/__init__.py +++ b/open-hackathon-server/src/hackathon/views/__init__.py @@ -2,11 +2,6 @@ """ This file is covered by the LICENSING file in the root of this project. """ - -import sys - -sys.path.append("..") - from hackathon import api from hackathon.views.resources import * diff --git a/open-hackathon-server/src/hackathon/worker/__init__.py b/open-hackathon-server/src/hackathon/worker/__init__.py new file mode 100644 index 000000000..d565bf7ed --- /dev/null +++ b/open-hackathon-server/src/hackathon/worker/__init__.py @@ -0,0 +1,41 @@ +import os +import logging +from celery import Celery, Task + +from hackathon.util import safe_get_config + +LOG = logging.getLogger(__name__) + +celery_always_eager = os.getenv("DEBUG", "false") == "true" +LOG.debug("Create celery app and celery_always_eager is {}".format( + celery_always_eager)) + + +class OhpTask(Task): + def run(self, *args, **kwargs): + pass + + def on_failure(self, exc, task_id, args, kwargs, einfo): + super(OhpTask, self).on_failure(exc, task_id, args, kwargs, einfo) + + def on_success(self, retval, task_id, args, kwargs): + super(OhpTask, self).on_success(retval, task_id, args, kwargs) + + +celery_app = Celery( + "ohp_worker", + include=[ + "worker.expr_tasks", + ], + task_cls=OhpTask) +celery_app.conf.update( + CELERY_BROKER_URL=safe_get_config("CELERY_BROKER_URL", ""), + CELERY_RESULT_BACKEND=safe_get_config("CELERY_RESULT_BACKEND", ""), + CELERY_DEFAULT_EXCHANGE='ohp', + CELERYD_CONCURRENCY=20, + CELERYD_MAX_TASKS_PER_CHILD=50, + CELERY_DEFAULT_QUEUE='ohp.tasks.queue', + CELERY_DEFAULT_ROUTING_KEY='ohp.tasks', + CELERY_ALWAYS_EAGER=celery_always_eager, + CELERY_TASK_ALWAYS_EAGER=celery_always_eager, +) diff --git a/open-hackathon-server/src/hackathon/worker/expr_tasks.py b/open-hackathon-server/src/hackathon/worker/expr_tasks.py new file mode 100644 index 000000000..1df9fa2a2 --- /dev/null +++ b/open-hackathon-server/src/hackathon/worker/expr_tasks.py @@ -0,0 +1,60 @@ +import logging + +from hackathon import RequiredFeature +from hackathon.hmongo.models import Experiment, Hackathon +from hackathon.template.template_content import TemplateContent +from hackathon.cloud_providers import Provider, ProviderError + +from . import celery_app as worker + +LOG = logging.getLogger(__name__) + + +@worker.task(name="start_new_expr") +def start_new_expr(hackathon_id, expr_id, template_content): + hackathon = Hackathon.objects(id=hackathon_id).first() + if not hackathon: + LOG.error("start new expr without hackathon") + experiment = Experiment.objects(id=expr_id).first() + if not experiment: + LOG.error("start new expr without experiment") + + if hackathon.templates is None or len(hackathon.templates) == 0: + LOG.error("start new expr without template") + + unit = template_content.unit + provider = _try_connect(unit.provider, template_content.provider_cfg) + if not provider: + return + + provider.create_expr(experiment, template_content) + + # todo init Guacamole config + _ = provider.get_remote_config() + + +@worker.task(name="stop_expr") +def stop_expr(hackathon_id, expr_id): + hackathon = Hackathon.objects(id=hackathon_id).first() + if not hackathon: + LOG.error("stop expr without hackathon") + experiment = Experiment.objects(id=expr_id).first() + if not experiment: + LOG.error("stop expr without experiment") + + template = experiment.template + template_content = TemplateContent.load_from_template(template) + unit = template_content.unit + provider = _try_connect(unit.provider, template_content.provider_cfg) + if not provider: + return + + provider.delete_expr(experiment) + + +def _try_connect(provider, provider_cfg): + try: + return Provider(provider, provider_cfg) + except ProviderError as e: + LOG.error("init provider error: {}".format(e)) + return None diff --git a/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py b/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py new file mode 100644 index 000000000..f948cecc1 --- /dev/null +++ b/open-hackathon-server/src/hackathon/worker/hackathon_tasks.py @@ -0,0 +1,46 @@ +import logging +from mongoengine import Q + +from hackathon import RequiredFeature +from hackathon.hmongo.models import Experiment, Hackathon +from hackathon.constants import HACK_STATUS, HACKATHON_CONFIG, EStatus + +from . import celery_app as worker + +LOG = logging.getLogger(__name__) + +expr_manager = RequiredFeature("expr_manager") + + +@worker.task(name="check_and_pre_allocate_expr") +def check_and_pre_allocate_expr(): + hackathon_list = Hackathon.objects(status=HACK_STATUS.ONLINE) + for hack in hackathon_list: + if not hack.enable_pre_allocate: + continue + + LOG.debug("add pre_allocate job for hackathon %s" % str(hack.name)) + pre_allocate_expr.delay(str(hack.id)) + + +@worker.task(name="pre_allocate_expr") +def pre_allocate_expr(hackathon_id): + LOG.info("pre_allocate_expr for hackathon {}".format(hackathon_id)) + hackathon = Hackathon.objects(id=hackathon_id).first() + if not hackathon: + LOG.error("pre_allocate_expr without hackathon") + return + if not hackathon.templates or len(hackathon.templates) == 0: + LOG.error("hackathon has no template") + + template = hackathon.templates[0] + pre_num = int(hackathon.config.get(HACKATHON_CONFIG.PRE_ALLOCATE_NUMBER, 1)) + query = Q(status=EStatus.STARTING) | Q(status=EStatus.RUNNING) + curr_num = Experiment.objects(user=None, hackathon=hackathon, template=template).filter(query).count() + + if curr_num >= pre_num: + LOG.info("start 0 expr, finish") + return + + try_create = expr_manager.start_pre_alloc_exprs(template.name, hackathon.name, pre_num - curr_num) + LOG.info("start {} expr, finish".format(try_create)) diff --git a/open-hackathon-server/src/tests/apitest/test_experiment_api.py b/open-hackathon-server/src/tests/apitest/test_experiment_api.py index 60f026f91..71b4e3cbb 100644 --- a/open-hackathon-server/src/tests/apitest/test_experiment_api.py +++ b/open-hackathon-server/src/tests/apitest/test_experiment_api.py @@ -2,8 +2,15 @@ class TestUserExperiment(ApiTestCase): - def test_start_experiment(self): - pass - def test_stop_experiment(self): - pass + def test_start_experiment(self, user1): + self.client.post("/api/user/experiment") + + def test_stop_experiment(self, user1): + self.client.delete("/api/user/experiment") + + def test_get_experiment(self, user1): + self.client.get("/api/user/experiment") + + def test_experiment_heart_beat(self, user1): + self.client.put("/api/user/experiment") diff --git a/open-hackathon-server/src/tests/worker/__init__.py b/open-hackathon-server/src/tests/worker/__init__.py new file mode 100644 index 000000000..e69de29bb