From d3e30deab0de211f63c1cbe038e0d1a67d943068 Mon Sep 17 00:00:00 2001 From: superstar54 Date: Wed, 28 Aug 2024 01:21:58 +0200 Subject: [PATCH] Add SchedulerClient, use circus to start scheduler --- aiida_workgraph/cli/cmd_scheduler.py | 217 +++++++++++------- aiida_workgraph/engine/scheduler/__init__.py | 3 + aiida_workgraph/engine/scheduler/client.py | 209 +++++++++++++++++ .../engine/{ => scheduler}/scheduler.py | 0 4 files changed, 342 insertions(+), 87 deletions(-) create mode 100644 aiida_workgraph/engine/scheduler/__init__.py create mode 100644 aiida_workgraph/engine/scheduler/client.py rename aiida_workgraph/engine/{ => scheduler}/scheduler.py (100%) diff --git a/aiida_workgraph/cli/cmd_scheduler.py b/aiida_workgraph/cli/cmd_scheduler.py index d265012e..f452bf7e 100644 --- a/aiida_workgraph/cli/cmd_scheduler.py +++ b/aiida_workgraph/cli/cmd_scheduler.py @@ -1,11 +1,10 @@ from aiida_workgraph.cli.cmd_workgraph import workgraph -from aiida import orm import click -import os from pathlib import Path -from aiida.cmdline.utils import echo -from .cmd_graph import REPAIR_INSTRUCTIONS - +from aiida.cmdline.utils import decorators, echo +from aiida.cmdline.params import options +from aiida_workgraph.engine.scheduler.client import get_scheduler_client +import sys REACT_PORT = "3000" @@ -30,7 +29,7 @@ def scheduler(): @scheduler.command() -def start_worker(): +def worker(): """Start the scheduler application.""" from aiida_workgraph.engine.launch import start_scheduler_worker @@ -40,96 +39,140 @@ def start_worker(): @scheduler.command() -def start(): +@click.option("--foreground", is_flag=True, help="Run in foreground.") +@options.TIMEOUT(default=None, required=False, type=int) +@decorators.with_dbenv() +@decorators.requires_broker +@decorators.check_circus_zmq_version +def start(foreground, timeout): """Start the scheduler application.""" - from aiida_workgraph.engine.scheduler import WorkGraphScheduler - from aiida.engine import submit click.echo("Starting the scheduler process...") - pid_file_path = get_pid_file_path() - # if the PID file already exists, check if the process is running - if pid_file_path.exists(): - with open(pid_file_path, "r") as pid_file: - for line in pid_file: - _, pid = line.strip().split(":") - if pid: - try: - node = orm.load_node(pid) - if node.is_sealed: - click.echo( - "PID file exists but no running scheduler process found." - ) - else: - click.echo( - f"Scheduler process with PID {node.pk} already running." - ) - return - except Exception: - click.echo( - "PID file exists but no running scheduler process found." - ) - - with open(pid_file_path, "w") as pid_file: - node = submit(WorkGraphScheduler) - pid_file.write(f"Scheduler:{node.pk}\n") - click.echo(f"Scheduler process started with PID {node.pk}.") + try: + client = get_scheduler_client() + client.start_daemon(foreground=foreground) + except Exception as exception: + echo.echo(f"Failed to start the scheduler process: {exception}") @scheduler.command() -def stop(): - """Stop the scheduler application.""" - from aiida.engine.processes import control - - pid_file_path = get_pid_file_path() - - if not pid_file_path.exists(): - click.echo("No running scheduler application found.") - return - - with open(pid_file_path, "r") as pid_file: - for line in pid_file: - _, pid = line.strip().split(":") - if pid: - click.confirm( - "Are you sure you want to kill the scheduler process?", abort=True - ) - process = orm.load_node(pid) - try: - message = "Killed through `verdi process kill`" - control.kill_processes( - [process], - timeout=5, - wait=True, - message=message, - ) - except control.ProcessTimeoutException as exception: - echo.echo_critical(f"{exception}\n{REPAIR_INSTRUCTIONS}") - os.remove(pid_file_path) +@click.option("--no-wait", is_flag=True, help="Do not wait for confirmation.") +@click.option("--all", "all_profiles", is_flag=True, help="Stop all daemons.") +@options.TIMEOUT(default=None, required=False, type=int) +@decorators.requires_broker +@click.pass_context +def stop(ctx, no_wait, all_profiles, timeout): + """Stop the daemon. + + Returns exit code 0 if the daemon was shut down successfully (or was not running), non-zero if there was an error. + """ + if all_profiles is True: + profiles = [ + profile + for profile in ctx.obj.config.profiles + if not profile.is_test_profile + ] + else: + profiles = [ctx.obj.profile] + + for profile in profiles: + echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False) + echo.echo(f"{profile.name}", bold=True) + echo.echo("Stopping the daemon... ", nl=False) + try: + client = get_scheduler_client() + client.stop_daemon(wait=not no_wait, timeout=timeout) + except Exception as exception: + echo.echo_error(f"Failed to stop the daemon: {exception}") + + +@scheduler.command(hidden=True) +@click.option("--foreground", is_flag=True, help="Run in foreground.") +@decorators.with_dbenv() +@decorators.requires_broker +@decorators.check_circus_zmq_version +def start_circus(foreground): + """This will actually launch the circus daemon, either daemonized in the background or in the foreground. + + If run in the foreground all logs are redirected to stdout. + + .. note:: this should not be called directly from the commandline! + """ + + get_scheduler_client()._start_daemon(foreground=foreground) @scheduler.command() -def status(): - """Check the status of the scheduler application.""" - from aiida.orm import QueryBuilder - from aiida_workgraph.engine.scheduler import WorkGraphScheduler - - qb = QueryBuilder() - projections = ["id"] - filters = { - "or": [ - {"attributes.sealed": False}, - {"attributes": {"!has_key": "sealed"}}, +@click.option("--all", "all_profiles", is_flag=True, help="Show status of all daemons.") +@options.TIMEOUT(default=None, required=False, type=int) +@click.pass_context +@decorators.requires_loaded_profile() +@decorators.requires_broker +def status(ctx, all_profiles, timeout): + """Print the status of the current daemon or all daemons. + + Returns exit code 0 if all requested daemons are running, else exit code 3. + """ + from tabulate import tabulate + + from aiida.cmdline.utils.common import format_local_time + from aiida.engine.daemon.client import DaemonException + + if all_profiles is True: + profiles = [ + profile + for profile in ctx.obj.config.profiles + if not profile.is_test_profile ] - } - qb.append( - WorkGraphScheduler, - filters=filters, - project=projections, - tag="process", - ) - results = qb.all() - if len(results) == 0: - click.echo("No scheduler found. Please start the scheduler first.") else: - click.echo(f"Scheduler process is running with PID: {results[0][0]}") + profiles = [ctx.obj.profile] + + daemons_running = [] + + for profile in profiles: + client = get_scheduler_client(profile.name) + echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False) + echo.echo(f"{profile.name}", bold=True) + + try: + client.get_status(timeout=timeout) + except DaemonException as exception: + echo.echo_error(str(exception)) + daemons_running.append(False) + continue + + worker_response = client.get_worker_info() + daemon_response = client.get_daemon_info() + + workers = [] + for pid, info in worker_response["info"].items(): + if isinstance(info, dict): + row = [ + pid, + info["mem"], + info["cpu"], + format_local_time(info["create_time"]), + ] + else: + row = [pid, "-", "-", "-"] + workers.append(row) + + if workers: + workers_info = tabulate( + workers, headers=["PID", "MEM %", "CPU %", "started"], tablefmt="simple" + ) + else: + workers_info = ( + "--> No workers are running. Use `verdi daemon incr` to start some!\n" + ) + + start_time = format_local_time(daemon_response["info"]["create_time"]) + echo.echo( + f'Daemon is running as PID {daemon_response["info"]["pid"]} since {start_time}\n' + f"Active workers [{len(workers)}]:\n{workers_info}\n" + "Use `verdi daemon [incr | decr] [num]` to increase / decrease the number of workers" + ) + + if not all(daemons_running): + sys.exit(3) diff --git a/aiida_workgraph/engine/scheduler/__init__.py b/aiida_workgraph/engine/scheduler/__init__.py new file mode 100644 index 00000000..95a95abf --- /dev/null +++ b/aiida_workgraph/engine/scheduler/__init__.py @@ -0,0 +1,3 @@ +from .scheduler import WorkGraphScheduler + +__all__ = ("WorkGraphScheduler",) diff --git a/aiida_workgraph/engine/scheduler/client.py b/aiida_workgraph/engine/scheduler/client.py new file mode 100644 index 00000000..f6ad9593 --- /dev/null +++ b/aiida_workgraph/engine/scheduler/client.py @@ -0,0 +1,209 @@ +from aiida.engine.daemon.client import DaemonClient +import shutil +from aiida.manage.manager import get_manager +from aiida.common.exceptions import ConfigurationError +import os + +WORKGRAPH_BIN = shutil.which("workgraph") + + +class SchedulerClient(DaemonClient): + """Client for interacting with the scheduler daemon.""" + + _DAEMON_NAME = "scheduler-{name}" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @property + def _workgraph_bin(self) -> str: + """Return the absolute path to the ``verdi`` binary. + + :raises ConfigurationError: If the path to ``verdi`` could not be found + """ + if WORKGRAPH_BIN is None: + raise ConfigurationError( + "Unable to find 'verdi' in the path. Make sure that you are working " + "in a virtual environment, or that at least the 'verdi' executable is on the PATH" + ) + + return WORKGRAPH_BIN + + @property + def filepaths(self): + """Return the filepaths used by this profile. + + :return: a dictionary of filepaths + """ + from aiida.manage.configuration.settings import DAEMON_DIR, DAEMON_LOG_DIR + + return { + "circus": { + "log": str( + DAEMON_LOG_DIR / f"circus-scheduler-{self.profile.name}.log" + ), + "pid": str(DAEMON_DIR / f"circus-scheduler-{self.profile.name}.pid"), + "port": str(DAEMON_DIR / f"circus-scheduler-{self.profile.name}.port"), + "socket": { + "file": str( + DAEMON_DIR / f"circus-scheduler-{self.profile.name}.sockets" + ), + "controller": "circus.c.sock", + "pubsub": "circus.p.sock", + "stats": "circus.s.sock", + }, + }, + "daemon": { + "log": str(DAEMON_LOG_DIR / f"aiida-{self.profile.name}.log"), + "pid": str(DAEMON_DIR / f"aiida-{self.profile.name}.pid"), + }, + } + + @property + def circus_log_file(self) -> str: + return self.filepaths["circus"]["log"] + + @property + def circus_pid_file(self) -> str: + return self.filepaths["circus"]["pid"] + + @property + def circus_port_file(self) -> str: + return self.filepaths["circus"]["port"] + + @property + def circus_socket_file(self) -> str: + return self.filepaths["circus"]["socket"]["file"] + + @property + def circus_socket_endpoints(self) -> dict[str, str]: + return self.filepaths["circus"]["socket"] + + @property + def daemon_log_file(self) -> str: + return self.filepaths["daemon"]["log"] + + @property + def daemon_pid_file(self) -> str: + return self.filepaths["daemon"]["pid"] + + def cmd_start_daemon( + self, number_workers: int = 1, foreground: bool = False + ) -> list[str]: + """Return the command to start the daemon. + + :param number_workers: Number of daemon workers to start. + :param foreground: Whether to launch the subprocess in the background or not. + """ + command = [ + self._workgraph_bin, + "-p", + self.profile.name, + "scheduler", + "start-circus", + ] + + if foreground: + command.append("--foreground") + + return command + + @property + def cmd_start_daemon_worker(self) -> list[str]: + """Return the command to start a daemon worker process.""" + return [self._workgraph_bin, "-p", self.profile.name, "scheduler", "worker"] + + def _start_daemon(self, foreground: bool = False) -> None: + """Start the daemon. + + .. warning:: This will daemonize the current process and put it in the background. It is most likely not what + you want to call if you want to start the daemon from the Python API. Instead you probably will want to use + the :meth:`aiida.engine.daemon.client.DaemonClient.start_daemon` function instead. + + :param number_workers: Number of daemon workers to start. + :param foreground: Whether to launch the subprocess in the background or not. + """ + from circus import get_arbiter + from circus import logger as circus_logger + from circus.circusd import daemonize + from circus.pidfile import Pidfile + from circus.util import check_future_exception_and_log, configure_logger + + loglevel = self.loglevel + logoutput = "-" + + if not foreground: + logoutput = self.circus_log_file + + arbiter_config = { + "controller": self.get_controller_endpoint(), + "pubsub_endpoint": self.get_pubsub_endpoint(), + "stats_endpoint": self.get_stats_endpoint(), + "logoutput": logoutput, + "loglevel": loglevel, + "debug": False, + "statsd": True, + "pidfile": self.circus_pid_file, + "watchers": [ + { + "cmd": " ".join(self.cmd_start_daemon_worker), + "name": self.daemon_name, + "numprocesses": 1, + "virtualenv": self.virtualenv, + "copy_env": True, + "stdout_stream": { + "class": "FileStream", + "filename": self.daemon_log_file, + }, + "stderr_stream": { + "class": "FileStream", + "filename": self.daemon_log_file, + }, + "env": self.get_env(), + } + ], + } + + if not foreground: + daemonize() + + arbiter = get_arbiter(**arbiter_config) + pidfile = Pidfile(arbiter.pidfile) + pidfile.create(os.getpid()) + + # Configure the logger + loggerconfig = arbiter.loggerconfig or None + configure_logger(circus_logger, loglevel, logoutput, loggerconfig) + + # Main loop + should_restart = True + + while should_restart: + try: + future = arbiter.start() + should_restart = False + if check_future_exception_and_log(future) is None: + should_restart = arbiter._restarting + except Exception as exception: + # Emergency stop + arbiter.loop.run_sync(arbiter._emergency_stop) + raise exception + except KeyboardInterrupt: + pass + finally: + arbiter = None + if pidfile is not None: + pidfile.unlink() + + +def get_scheduler_client(profile_name: str | None = None) -> "SchedulerClient": + """Return the daemon client for the given profile or the current profile if not specified. + + :param profile_name: Optional profile name to load. + :return: The daemon client. + + :raises aiida.common.MissingConfigurationError: if the configuration file cannot be found. + :raises aiida.common.ProfileConfigurationError: if the given profile does not exist. + """ + profile = get_manager().load_profile(profile_name) + return SchedulerClient(profile) diff --git a/aiida_workgraph/engine/scheduler.py b/aiida_workgraph/engine/scheduler/scheduler.py similarity index 100% rename from aiida_workgraph/engine/scheduler.py rename to aiida_workgraph/engine/scheduler/scheduler.py